/src/rocksdb/db/db_impl/db_impl.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | #pragma once |
10 | | |
11 | | #include <atomic> |
12 | | #include <cstdint> |
13 | | #include <deque> |
14 | | #include <functional> |
15 | | #include <limits> |
16 | | #include <list> |
17 | | #include <map> |
18 | | #include <set> |
19 | | #include <string> |
20 | | #include <unordered_map> |
21 | | #include <utility> |
22 | | #include <vector> |
23 | | |
24 | | #include "db/column_family.h" |
25 | | #include "db/compaction/compaction_iterator.h" |
26 | | #include "db/compaction/compaction_job.h" |
27 | | #include "db/error_handler.h" |
28 | | #include "db/event_helpers.h" |
29 | | #include "db/external_sst_file_ingestion_job.h" |
30 | | #include "db/flush_job.h" |
31 | | #include "db/flush_scheduler.h" |
32 | | #include "db/import_column_family_job.h" |
33 | | #include "db/internal_stats.h" |
34 | | #include "db/log_writer.h" |
35 | | #include "db/logs_with_prep_tracker.h" |
36 | | #include "db/memtable_list.h" |
37 | | #include "db/periodic_task_scheduler.h" |
38 | | #include "db/post_memtable_callback.h" |
39 | | #include "db/pre_release_callback.h" |
40 | | #include "db/range_del_aggregator.h" |
41 | | #include "db/read_callback.h" |
42 | | #include "db/seqno_to_time_mapping.h" |
43 | | #include "db/snapshot_checker.h" |
44 | | #include "db/snapshot_impl.h" |
45 | | #include "db/trim_history_scheduler.h" |
46 | | #include "db/version_edit.h" |
47 | | #include "db/wal_manager.h" |
48 | | #include "db/write_controller.h" |
49 | | #include "db/write_thread.h" |
50 | | #include "logging/event_logger.h" |
51 | | #include "memtable/wbwi_memtable.h" |
52 | | #include "monitoring/instrumented_mutex.h" |
53 | | #include "options/db_options.h" |
54 | | #include "port/port.h" |
55 | | #include "rocksdb/attribute_groups.h" |
56 | | #include "rocksdb/db.h" |
57 | | #include "rocksdb/env.h" |
58 | | #include "rocksdb/memtablerep.h" |
59 | | #include "rocksdb/status.h" |
60 | | #include "rocksdb/trace_reader_writer.h" |
61 | | #include "rocksdb/transaction_log.h" |
62 | | #include "rocksdb/user_write_callback.h" |
63 | | #include "rocksdb/utilities/replayer.h" |
64 | | #include "rocksdb/utilities/write_batch_with_index.h" |
65 | | #include "rocksdb/write_buffer_manager.h" |
66 | | #include "table/merging_iterator.h" |
67 | | #include "util/autovector.h" |
68 | | #include "util/hash.h" |
69 | | #include "util/repeatable_thread.h" |
70 | | #include "util/stop_watch.h" |
71 | | #include "util/thread_local.h" |
72 | | |
73 | | namespace ROCKSDB_NAMESPACE { |
74 | | |
75 | | class Arena; |
76 | | class ArenaWrappedDBIter; |
77 | | class InMemoryStatsHistoryIterator; |
78 | | class MemTable; |
79 | | class PersistentStatsHistoryIterator; |
80 | | class TableCache; |
81 | | class TaskLimiterToken; |
82 | | class Version; |
83 | | class VersionEdit; |
84 | | class VersionSet; |
85 | | class WriteCallback; |
86 | | struct JobContext; |
87 | | struct ExternalSstFileInfo; |
88 | | struct MemTableInfo; |
89 | | |
90 | | // Class to maintain directories for all database paths other than main one. |
91 | | class Directories { |
92 | | public: |
93 | | IOStatus SetDirectories(FileSystem* fs, const std::string& dbname, |
94 | | const std::string& wal_dir, |
95 | | const std::vector<DbPath>& data_paths); |
96 | | |
97 | 0 | FSDirectory* GetDataDir(size_t path_id) const { |
98 | 0 | assert(path_id < data_dirs_.size()); |
99 | 0 | FSDirectory* ret_dir = data_dirs_[path_id].get(); |
100 | 0 | if (ret_dir == nullptr) { |
101 | | // Should use db_dir_ |
102 | 0 | return db_dir_.get(); |
103 | 0 | } |
104 | 0 | return ret_dir; |
105 | 0 | } |
106 | | |
107 | 0 | FSDirectory* GetWalDir() { |
108 | 0 | if (wal_dir_) { |
109 | 0 | return wal_dir_.get(); |
110 | 0 | } |
111 | 0 | return db_dir_.get(); |
112 | 0 | } |
113 | | |
114 | 124k | FSDirectory* GetDbDir() { return db_dir_.get(); } |
115 | | |
116 | 40.2k | IOStatus Close(const IOOptions& options, IODebugContext* dbg) { |
117 | | // close all directories for all database paths |
118 | 40.2k | IOStatus s = IOStatus::OK(); |
119 | | |
120 | | // The default implementation for Close() in Directory/FSDirectory class |
121 | | // "NotSupported" status, the upper level interface should be able to |
122 | | // handle this error so that Close() does not fail after upgrading when |
123 | | // run on FileSystems that have not implemented `Directory::Close()` or |
124 | | // `FSDirectory::Close()` yet |
125 | | |
126 | 40.2k | if (db_dir_) { |
127 | 40.2k | IOStatus temp_s = db_dir_->Close(options, dbg); |
128 | 40.2k | if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) { |
129 | 0 | s = std::move(temp_s); |
130 | 0 | } |
131 | 40.2k | } |
132 | | |
133 | | // Attempt to close everything even if one fails |
134 | 40.2k | s.PermitUncheckedError(); |
135 | | |
136 | 40.2k | if (wal_dir_) { |
137 | 0 | IOStatus temp_s = wal_dir_->Close(options, dbg); |
138 | 0 | if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) { |
139 | 0 | s = std::move(temp_s); |
140 | 0 | } |
141 | 0 | } |
142 | | |
143 | 40.2k | s.PermitUncheckedError(); |
144 | | |
145 | 40.2k | for (auto& data_dir_ptr : data_dirs_) { |
146 | 40.2k | if (data_dir_ptr) { |
147 | 0 | IOStatus temp_s = data_dir_ptr->Close(options, dbg); |
148 | 0 | if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) { |
149 | 0 | s = std::move(temp_s); |
150 | 0 | } |
151 | 0 | } |
152 | 40.2k | } |
153 | | |
154 | | // Ready for caller |
155 | 40.2k | s.MustCheck(); |
156 | 40.2k | return s; |
157 | 40.2k | } |
158 | | |
159 | | private: |
160 | | std::unique_ptr<FSDirectory> db_dir_; |
161 | | std::vector<std::unique_ptr<FSDirectory>> data_dirs_; |
162 | | std::unique_ptr<FSDirectory> wal_dir_; |
163 | | }; |
164 | | |
165 | | struct DBOpenLogRecordReadReporter : public log::Reader::Reporter { |
166 | | Env* env; |
167 | | Logger* info_log; |
168 | | const char* fname; |
169 | | Status* status; // nullptr if immutable_db_options_.paranoid_checks==false |
170 | | bool* old_log_record; |
171 | | void Corruption(size_t bytes, const Status& s, |
172 | | uint64_t log_number = kMaxSequenceNumber) override; |
173 | | |
174 | | void OldLogRecord(size_t bytes) override; |
175 | | |
176 | 0 | uint64_t GetCorruptedLogNumber() const { return corrupted_wal_number_; } |
177 | | |
178 | | private: |
179 | | uint64_t corrupted_wal_number_ = kMaxSequenceNumber; |
180 | | }; |
181 | | |
182 | | // While DB is the public interface of RocksDB, and DBImpl is the actual |
183 | | // class implementing it. It's the entrance of the core RocksdB engine. |
184 | | // All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a |
185 | | // DBImpl internally. |
186 | | // Other than functions implementing the DB interface, some public |
187 | | // functions are there for other internal components to call. For |
188 | | // example, TransactionDB directly calls DBImpl::WriteImpl() and |
189 | | // BlobDB directly calls DBImpl::GetImpl(). Some other functions |
190 | | // are for sub-components to call. For example, ColumnFamilyHandleImpl |
191 | | // calls DBImpl::FindObsoleteFiles(). |
192 | | // |
193 | | // Since it's a very large class, the definition of the functions is |
194 | | // divided in several db_impl_*.cc files, besides db_impl.cc. |
195 | | class DBImpl : public DB { |
196 | | public: |
197 | | DBImpl(const DBOptions& options, const std::string& dbname, |
198 | | const bool seq_per_batch = false, const bool batch_per_txn = true, |
199 | | bool read_only = false); |
200 | | // No copying allowed |
201 | | DBImpl(const DBImpl&) = delete; |
202 | | void operator=(const DBImpl&) = delete; |
203 | | |
204 | | virtual ~DBImpl(); |
205 | | |
206 | | // ---- Implementations of the DB interface ---- |
207 | | |
208 | | using DB::Resume; |
209 | | Status Resume() override; |
210 | | |
211 | | using DB::Put; |
212 | | Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, |
213 | | const Slice& key, const Slice& value) override; |
214 | | Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, |
215 | | const Slice& key, const Slice& ts, const Slice& value) override; |
216 | | |
217 | | using DB::PutEntity; |
218 | | Status PutEntity(const WriteOptions& options, |
219 | | ColumnFamilyHandle* column_family, const Slice& key, |
220 | | const WideColumns& columns) override; |
221 | | Status PutEntity(const WriteOptions& options, const Slice& key, |
222 | | const AttributeGroups& attribute_groups) override; |
223 | | |
224 | | using DB::Merge; |
225 | | Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, |
226 | | const Slice& key, const Slice& value) override; |
227 | | Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, |
228 | | const Slice& key, const Slice& ts, const Slice& value) override; |
229 | | |
230 | | using DB::Delete; |
231 | | Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, |
232 | | const Slice& key) override; |
233 | | Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, |
234 | | const Slice& key, const Slice& ts) override; |
235 | | |
236 | | using DB::SingleDelete; |
237 | | Status SingleDelete(const WriteOptions& options, |
238 | | ColumnFamilyHandle* column_family, |
239 | | const Slice& key) override; |
240 | | Status SingleDelete(const WriteOptions& options, |
241 | | ColumnFamilyHandle* column_family, const Slice& key, |
242 | | const Slice& ts) override; |
243 | | |
244 | | using DB::DeleteRange; |
245 | | Status DeleteRange(const WriteOptions& options, |
246 | | ColumnFamilyHandle* column_family, const Slice& begin_key, |
247 | | const Slice& end_key) override; |
248 | | Status DeleteRange(const WriteOptions& options, |
249 | | ColumnFamilyHandle* column_family, const Slice& begin_key, |
250 | | const Slice& end_key, const Slice& ts) override; |
251 | | |
252 | | using DB::Write; |
253 | | Status Write(const WriteOptions& options, WriteBatch* updates) override; |
254 | | |
255 | | using DB::WriteWithCallback; |
256 | | Status WriteWithCallback(const WriteOptions& options, WriteBatch* updates, |
257 | | UserWriteCallback* user_write_cb) override; |
258 | | |
259 | | Status IngestWriteBatchWithIndex( |
260 | | const WriteOptions& options, |
261 | | std::shared_ptr<WriteBatchWithIndex> wbwi) override; |
262 | | |
263 | | using DB::Get; |
264 | | Status Get(const ReadOptions& _read_options, |
265 | | ColumnFamilyHandle* column_family, const Slice& key, |
266 | | PinnableSlice* value, std::string* timestamp) override; |
267 | | |
268 | | using DB::GetEntity; |
269 | | Status GetEntity(const ReadOptions& options, |
270 | | ColumnFamilyHandle* column_family, const Slice& key, |
271 | | PinnableWideColumns* columns) override; |
272 | | Status GetEntity(const ReadOptions& options, const Slice& key, |
273 | | PinnableAttributeGroups* result) override; |
274 | | |
275 | | using DB::GetMergeOperands; |
276 | | Status GetMergeOperands(const ReadOptions& options, |
277 | | ColumnFamilyHandle* column_family, const Slice& key, |
278 | | PinnableSlice* merge_operands, |
279 | | GetMergeOperandsOptions* get_merge_operands_options, |
280 | 0 | int* number_of_operands) override { |
281 | 0 | GetImplOptions get_impl_options; |
282 | 0 | get_impl_options.column_family = column_family; |
283 | 0 | get_impl_options.merge_operands = merge_operands; |
284 | 0 | get_impl_options.get_merge_operands_options = get_merge_operands_options; |
285 | 0 | get_impl_options.number_of_operands = number_of_operands; |
286 | 0 | get_impl_options.get_value = false; |
287 | 0 | return GetImpl(options, key, get_impl_options); |
288 | 0 | } |
289 | | |
290 | | using DB::MultiGet; |
291 | | // This MultiGet is a batched version, which may be faster than calling Get |
292 | | // multiple times, especially if the keys have some spatial locality that |
293 | | // enables them to be queried in the same SST files/set of files. The larger |
294 | | // the batch size, the more scope for batching and performance improvement |
295 | | // The values and statuses parameters are arrays with number of elements |
296 | | // equal to keys.size(). This allows the storage for those to be alloacted |
297 | | // by the caller on the stack for small batches |
298 | | void MultiGet(const ReadOptions& _read_options, const size_t num_keys, |
299 | | ColumnFamilyHandle** column_families, const Slice* keys, |
300 | | PinnableSlice* values, std::string* timestamps, |
301 | | Status* statuses, const bool sorted_input = false) override; |
302 | | |
303 | | void MultiGetWithCallback( |
304 | | const ReadOptions& _read_options, ColumnFamilyHandle* column_family, |
305 | | ReadCallback* callback, |
306 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys); |
307 | | |
308 | | using DB::MultiGetEntity; |
309 | | |
310 | | void MultiGetEntity(const ReadOptions& options, |
311 | | ColumnFamilyHandle* column_family, size_t num_keys, |
312 | | const Slice* keys, PinnableWideColumns* results, |
313 | | Status* statuses, bool sorted_input) override; |
314 | | |
315 | | void MultiGetEntity(const ReadOptions& options, size_t num_keys, |
316 | | ColumnFamilyHandle** column_families, const Slice* keys, |
317 | | PinnableWideColumns* results, Status* statuses, |
318 | | bool sorted_input) override; |
319 | | void MultiGetEntity(const ReadOptions& options, size_t num_keys, |
320 | | const Slice* keys, |
321 | | PinnableAttributeGroups* results) override; |
322 | | |
323 | | void MultiGetEntityWithCallback( |
324 | | const ReadOptions& read_options, ColumnFamilyHandle* column_family, |
325 | | ReadCallback* callback, |
326 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys); |
327 | | |
328 | | Status CreateColumnFamily(const ColumnFamilyOptions& cf_options, |
329 | | const std::string& column_family, |
330 | 14.6k | ColumnFamilyHandle** handle) override { |
331 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
332 | 14.6k | return CreateColumnFamily(ReadOptions(), WriteOptions(), cf_options, |
333 | 14.6k | column_family, handle); |
334 | 14.6k | } |
335 | | virtual Status CreateColumnFamily(const ReadOptions& read_options, |
336 | | const WriteOptions& write_options, |
337 | | const ColumnFamilyOptions& cf_options, |
338 | | const std::string& column_family, |
339 | | ColumnFamilyHandle** handle); |
340 | | Status CreateColumnFamilies( |
341 | | const ColumnFamilyOptions& cf_options, |
342 | | const std::vector<std::string>& column_family_names, |
343 | 0 | std::vector<ColumnFamilyHandle*>* handles) override { |
344 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
345 | 0 | return CreateColumnFamilies(ReadOptions(), WriteOptions(), cf_options, |
346 | 0 | column_family_names, handles); |
347 | 0 | } |
348 | | virtual Status CreateColumnFamilies( |
349 | | const ReadOptions& read_options, const WriteOptions& write_options, |
350 | | const ColumnFamilyOptions& cf_options, |
351 | | const std::vector<std::string>& column_family_names, |
352 | | std::vector<ColumnFamilyHandle*>* handles); |
353 | | |
354 | | Status CreateColumnFamilies( |
355 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
356 | 0 | std::vector<ColumnFamilyHandle*>* handles) override { |
357 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
358 | 0 | return CreateColumnFamilies(ReadOptions(), WriteOptions(), column_families, |
359 | 0 | handles); |
360 | 0 | } |
361 | | virtual Status CreateColumnFamilies( |
362 | | const ReadOptions& read_options, const WriteOptions& write_options, |
363 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
364 | | std::vector<ColumnFamilyHandle*>* handles); |
365 | | Status DropColumnFamily(ColumnFamilyHandle* column_family) override; |
366 | | Status DropColumnFamilies( |
367 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
368 | | |
369 | | // Returns false if key doesn't exist in the database and true if it may. |
370 | | // If value_found is not passed in as null, then return the value if found in |
371 | | // memory. On return, if value was found, then value_found will be set to true |
372 | | // , otherwise false. |
373 | | using DB::KeyMayExist; |
374 | | bool KeyMayExist(const ReadOptions& options, |
375 | | ColumnFamilyHandle* column_family, const Slice& key, |
376 | | std::string* value, std::string* timestamp, |
377 | | bool* value_found = nullptr) override; |
378 | | |
379 | | using DB::NewIterator; |
380 | | Iterator* NewIterator(const ReadOptions& _read_options, |
381 | | ColumnFamilyHandle* column_family) override; |
382 | | Status NewIterators(const ReadOptions& _read_options, |
383 | | const std::vector<ColumnFamilyHandle*>& column_families, |
384 | | std::vector<Iterator*>* iterators) override; |
385 | | |
386 | | using DB::NewMultiScan; |
387 | | std::unique_ptr<MultiScan> NewMultiScan( |
388 | | const ReadOptions& _read_options, ColumnFamilyHandle* column_family, |
389 | | const MultiScanArgs& scan_opts) override; |
390 | | |
391 | | const Snapshot* GetSnapshot() override; |
392 | | void ReleaseSnapshot(const Snapshot* snapshot) override; |
393 | | |
394 | | std::unique_ptr<Iterator> NewCoalescingIterator( |
395 | | const ReadOptions& options, |
396 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
397 | | |
398 | | std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator( |
399 | | const ReadOptions& options, |
400 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
401 | | |
402 | | // Create a timestamped snapshot. This snapshot can be shared by multiple |
403 | | // readers. If any of them uses it for write conflict checking, then |
404 | | // is_write_conflict_boundary is true. For simplicity, set it to true by |
405 | | // default. |
406 | | std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot( |
407 | | SequenceNumber snapshot_seq, uint64_t ts); |
408 | | std::shared_ptr<const SnapshotImpl> GetTimestampedSnapshot(uint64_t ts) const; |
409 | | void ReleaseTimestampedSnapshotsOlderThan( |
410 | | uint64_t ts, size_t* remaining_total_ss = nullptr); |
411 | | Status GetTimestampedSnapshots(uint64_t ts_lb, uint64_t ts_ub, |
412 | | std::vector<std::shared_ptr<const Snapshot>>& |
413 | | timestamped_snapshots) const; |
414 | | |
415 | | using DB::GetProperty; |
416 | | bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property, |
417 | | std::string* value) override; |
418 | | using DB::GetMapProperty; |
419 | | bool GetMapProperty(ColumnFamilyHandle* column_family, const Slice& property, |
420 | | std::map<std::string, std::string>* value) override; |
421 | | using DB::GetIntProperty; |
422 | | bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property, |
423 | | uint64_t* value) override; |
424 | | using DB::GetAggregatedIntProperty; |
425 | | bool GetAggregatedIntProperty(const Slice& property, |
426 | | uint64_t* aggregated_value) override; |
427 | | using DB::GetApproximateSizes; |
428 | | Status GetApproximateSizes(const SizeApproximationOptions& options, |
429 | | ColumnFamilyHandle* column_family, |
430 | | const Range* range, int n, |
431 | | uint64_t* sizes) override; |
432 | | using DB::GetApproximateMemTableStats; |
433 | | void GetApproximateMemTableStats(ColumnFamilyHandle* column_family, |
434 | | const Range& range, uint64_t* const count, |
435 | | uint64_t* const size) override; |
436 | | using DB::CompactRange; |
437 | | Status CompactRange(const CompactRangeOptions& options, |
438 | | ColumnFamilyHandle* column_family, const Slice* begin, |
439 | | const Slice* end) override; |
440 | | |
441 | | using DB::CompactFiles; |
442 | | Status CompactFiles( |
443 | | const CompactionOptions& compact_options, |
444 | | ColumnFamilyHandle* column_family, |
445 | | const std::vector<std::string>& input_file_names, const int output_level, |
446 | | const int output_path_id = -1, |
447 | | std::vector<std::string>* const output_file_names = nullptr, |
448 | | CompactionJobInfo* compaction_job_info = nullptr) override; |
449 | | |
450 | | Status PauseBackgroundWork() override; |
451 | | Status ContinueBackgroundWork() override; |
452 | | |
453 | | Status EnableAutoCompaction( |
454 | | const std::vector<ColumnFamilyHandle*>& column_family_handles) override; |
455 | | |
456 | | void EnableManualCompaction() override; |
457 | | void DisableManualCompaction() override; |
458 | | void AbortAllCompactions() override; |
459 | | void ResumeAllCompactions() override; |
460 | | |
461 | | using DB::SetOptions; |
462 | | Status SetOptions( |
463 | | const std::unordered_map<ColumnFamilyHandle*, |
464 | | std::unordered_map<std::string, std::string>>& |
465 | | column_families_opts_map) override; |
466 | | |
467 | | Status SetDBOptions( |
468 | | const std::unordered_map<std::string, std::string>& options_map) override; |
469 | | |
470 | | using DB::NumberLevels; |
471 | | int NumberLevels(ColumnFamilyHandle* column_family) override; |
472 | | using DB::Level0StopWriteTrigger; |
473 | | int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) override; |
474 | | const std::string& GetName() const override; |
475 | | Env* GetEnv() const override; |
476 | | FileSystem* GetFileSystem() const override; |
477 | | using DB::GetOptions; |
478 | | Options GetOptions(ColumnFamilyHandle* column_family) const override; |
479 | | using DB::GetDBOptions; |
480 | | DBOptions GetDBOptions() const override; |
481 | | using DB::Flush; |
482 | | Status Flush(const FlushOptions& options, |
483 | | ColumnFamilyHandle* column_family) override; |
484 | | Status Flush( |
485 | | const FlushOptions& options, |
486 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
487 | 0 | Status FlushWAL(bool sync) override { |
488 | 0 | FlushWALOptions options; |
489 | 0 | options.sync = sync; |
490 | 0 | return FlushWAL(options); |
491 | 0 | } |
492 | | |
493 | | Status FlushWAL(const FlushWALOptions& options) override; |
494 | | |
495 | | virtual Status FlushWAL(const WriteOptions& write_options, bool sync); |
496 | | bool WALBufferIsEmpty(); |
497 | | Status SyncWAL() override; |
498 | | Status LockWAL() override; |
499 | | Status UnlockWAL() override; |
500 | | |
501 | | SequenceNumber GetLatestSequenceNumber() const override; |
502 | | |
503 | | // IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire |
504 | | // and release db_mutex |
505 | | Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family, |
506 | | std::string ts_low) override; |
507 | | |
508 | | // GetFullHistoryTsLow(ColumnFamilyHandle*, std::string*) will acquire and |
509 | | // release db_mutex |
510 | | Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family, |
511 | | std::string* ts_low) override; |
512 | | |
513 | | Status GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family, |
514 | | std::string* newest_timestamp) override; |
515 | | |
516 | | Status GetDbIdentity(std::string& identity) const override; |
517 | | |
518 | | virtual Status GetDbIdentityFromIdentityFile(const IOOptions& opts, |
519 | | std::string* identity) const; |
520 | | |
521 | | Status GetDbSessionId(std::string& session_id) const override; |
522 | | |
523 | | ColumnFamilyHandle* DefaultColumnFamily() const override; |
524 | | |
525 | | ColumnFamilyHandle* PersistentStatsColumnFamily() const; |
526 | | |
527 | | Status Close() override; |
528 | | |
529 | | Status DisableFileDeletions() override; |
530 | | |
531 | | Status EnableFileDeletions() override; |
532 | | |
533 | | virtual bool IsFileDeletionsEnabled() const; |
534 | | |
535 | | Status GetStatsHistory( |
536 | | uint64_t start_time, uint64_t end_time, |
537 | | std::unique_ptr<StatsHistoryIterator>* stats_iterator) override; |
538 | | |
539 | | using DB::ResetStats; |
540 | | Status ResetStats() override; |
541 | | // All the returned filenames start with "/" |
542 | | Status GetLiveFiles(std::vector<std::string>&, uint64_t* manifest_file_size, |
543 | | bool flush_memtable = true) override; |
544 | | Status GetSortedWalFiles(VectorWalPtr& files) override; |
545 | | Status GetSortedWalFilesImpl(VectorWalPtr& files, bool need_seqnos); |
546 | | |
547 | | // Get the known flushed sizes of WALs that might still be written to |
548 | | // or have pending sync. |
549 | | // NOTE: unlike alive_wal_files_, this function includes WALs that might |
550 | | // be obsolete (but not obsolete to a pending Checkpoint) and not yet fully |
551 | | // synced. |
552 | | Status GetOpenWalSizes(std::map<uint64_t, uint64_t>& number_to_size); |
553 | | Status GetCurrentWalFile(std::unique_ptr<WalFile>* current_wal_file) override; |
554 | | Status GetCreationTimeOfOldestFile(uint64_t* creation_time) override; |
555 | | |
556 | | Status GetUpdatesSince( |
557 | | SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter, |
558 | | const TransactionLogIterator::ReadOptions& read_options = |
559 | | TransactionLogIterator::ReadOptions()) override; |
560 | | Status DeleteFilesInRanges(ColumnFamilyHandle* column_family, |
561 | | const RangeOpt* ranges, size_t n, |
562 | | bool include_end = true); |
563 | | |
564 | | void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) override; |
565 | | |
566 | | Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) override; |
567 | | |
568 | | Status GetLiveFilesStorageInfo( |
569 | | const LiveFilesStorageInfoOptions& opts, |
570 | | std::vector<LiveFileStorageInfo>* files) override; |
571 | | |
572 | | // Obtains the meta data of the specified column family of the DB. |
573 | | // TODO(yhchiang): output parameter is placed in the end in this codebase. |
574 | | void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family, |
575 | | ColumnFamilyMetaData* metadata) override; |
576 | | |
577 | | // Get column family metadata with filtering based on key range and level |
578 | | void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family, |
579 | | const GetColumnFamilyMetaDataOptions& options, |
580 | | ColumnFamilyMetaData* metadata) override; |
581 | | |
582 | | void GetAllColumnFamilyMetaData( |
583 | | std::vector<ColumnFamilyMetaData>* metadata) override; |
584 | | |
585 | | Status SuggestCompactRange(ColumnFamilyHandle* column_family, |
586 | | const Slice* begin, const Slice* end) override; |
587 | | |
588 | | Status PromoteL0(ColumnFamilyHandle* column_family, |
589 | | int target_level) override; |
590 | | |
591 | | using DB::IngestExternalFile; |
592 | | Status IngestExternalFile( |
593 | | ColumnFamilyHandle* column_family, |
594 | | const std::vector<std::string>& external_files, |
595 | | const IngestExternalFileOptions& ingestion_options) override; |
596 | | |
597 | | using DB::IngestExternalFiles; |
598 | | Status IngestExternalFiles( |
599 | | const std::vector<IngestExternalFileArg>& args) override; |
600 | | |
601 | | using DB::CreateColumnFamilyWithImport; |
602 | | Status CreateColumnFamilyWithImport( |
603 | | const ColumnFamilyOptions& options, const std::string& column_family_name, |
604 | | const ImportColumnFamilyOptions& import_options, |
605 | | const std::vector<const ExportImportFilesMetaData*>& metadatas, |
606 | | ColumnFamilyHandle** handle) override; |
607 | | |
608 | | using DB::ClipColumnFamily; |
609 | | Status ClipColumnFamily(ColumnFamilyHandle* column_family, |
610 | | const Slice& begin_key, |
611 | | const Slice& end_key) override; |
612 | | |
613 | | using DB::VerifyFileChecksums; |
614 | | Status VerifyFileChecksums(const ReadOptions& read_options) override; |
615 | | |
616 | | using DB::VerifyChecksum; |
617 | | Status VerifyChecksum(const ReadOptions& /*read_options*/) override; |
618 | | // Verify the checksums of files in db. Currently only tables are checked. |
619 | | // |
620 | | // read_options: controls file I/O behavior, e.g. read ahead size while |
621 | | // reading all the live table files. |
622 | | // |
623 | | // use_file_checksum: if false, verify the block checksums of all live table |
624 | | // in db. Otherwise, obtain the file checksums and compare |
625 | | // with the MANIFEST. Currently, file checksums are |
626 | | // recomputed by reading all table files. |
627 | | // |
628 | | // Returns: OK if there is no file whose file or block checksum mismatches. |
629 | | Status VerifyChecksumInternal(const ReadOptions& read_options, |
630 | | bool use_file_checksum); |
631 | | |
632 | | Status VerifyFullFileChecksum(const std::string& file_checksum_expected, |
633 | | const std::string& func_name_expected, |
634 | | const std::string& fpath, |
635 | | const ReadOptions& read_options); |
636 | | |
637 | | using DB::StartTrace; |
638 | | Status StartTrace(const TraceOptions& options, |
639 | | std::unique_ptr<TraceWriter>&& trace_writer) override; |
640 | | |
641 | | using DB::EndTrace; |
642 | | Status EndTrace() override; |
643 | | |
644 | | using DB::NewDefaultReplayer; |
645 | | Status NewDefaultReplayer(const std::vector<ColumnFamilyHandle*>& handles, |
646 | | std::unique_ptr<TraceReader>&& reader, |
647 | | std::unique_ptr<Replayer>* replayer) override; |
648 | | |
649 | | using DB::StartBlockCacheTrace; |
650 | | Status StartBlockCacheTrace( |
651 | | const TraceOptions& trace_options, |
652 | | std::unique_ptr<TraceWriter>&& trace_writer) override; |
653 | | |
654 | | Status StartBlockCacheTrace( |
655 | | const BlockCacheTraceOptions& options, |
656 | | std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) override; |
657 | | |
658 | | using DB::EndBlockCacheTrace; |
659 | | Status EndBlockCacheTrace() override; |
660 | | |
661 | | using DB::StartIOTrace; |
662 | | Status StartIOTrace(const TraceOptions& options, |
663 | | std::unique_ptr<TraceWriter>&& trace_writer) override; |
664 | | |
665 | | using DB::EndIOTrace; |
666 | | Status EndIOTrace() override; |
667 | | |
668 | | using DB::GetPropertiesOfAllTables; |
669 | | Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, |
670 | | TablePropertiesCollection* props) override; |
671 | | Status GetPropertiesOfTablesInRange( |
672 | | ColumnFamilyHandle* column_family, const Range* range, std::size_t n, |
673 | | TablePropertiesCollection* props) override; |
674 | | |
675 | | Status GetPropertiesOfTablesByLevel( |
676 | | ColumnFamilyHandle* column_family, |
677 | | std::vector<std::unique_ptr<TablePropertiesCollection>>* props_by_level) |
678 | | override; |
679 | | |
680 | | // ---- End of implementations of the DB interface ---- |
681 | | SystemClock* GetSystemClock() const; |
682 | | |
683 | | struct GetImplOptions { |
684 | | ColumnFamilyHandle* column_family = nullptr; |
685 | | PinnableSlice* value = nullptr; |
686 | | PinnableWideColumns* columns = nullptr; |
687 | | std::string* timestamp = nullptr; |
688 | | bool* value_found = nullptr; |
689 | | ReadCallback* callback = nullptr; |
690 | | bool* is_blob_index = nullptr; |
691 | | // If true return value associated with key via value pointer else return |
692 | | // all merge operands for key via merge_operands pointer |
693 | | bool get_value = true; |
694 | | // Pointer to an array of size |
695 | | // get_merge_operands_options.expected_max_number_of_operands allocated by |
696 | | // user |
697 | | PinnableSlice* merge_operands = nullptr; |
698 | | GetMergeOperandsOptions* get_merge_operands_options = nullptr; |
699 | | int* number_of_operands = nullptr; |
700 | | }; |
701 | | |
702 | | Status GetImpl(const ReadOptions& read_options, |
703 | | ColumnFamilyHandle* column_family, const Slice& key, |
704 | | PinnableSlice* value); |
705 | | |
706 | | Status GetImpl(const ReadOptions& read_options, |
707 | | ColumnFamilyHandle* column_family, const Slice& key, |
708 | | PinnableSlice* value, std::string* timestamp); |
709 | | |
710 | | // Function that Get and KeyMayExist call with no_io true or false |
711 | | // Note: 'value_found' from KeyMayExist propagates here |
712 | | // This function is also called by GetMergeOperands |
713 | | // If get_impl_options.get_value = true get value associated with |
714 | | // get_impl_options.key via get_impl_options.value |
715 | | // If get_impl_options.get_value = false get merge operands associated with |
716 | | // get_impl_options.key via get_impl_options.merge_operands |
717 | | virtual Status GetImpl(const ReadOptions& options, const Slice& key, |
718 | | GetImplOptions& get_impl_options); |
719 | | |
720 | | // If `snapshot` == kMaxSequenceNumber, set a recent one inside the file. |
721 | | ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, |
722 | | ColumnFamilyHandleImpl* cfh, |
723 | | SuperVersion* sv, SequenceNumber snapshot, |
724 | | ReadCallback* read_callback, |
725 | | bool expose_blob_index = false, |
726 | | bool allow_refresh = true); |
727 | | |
728 | 22.1k | virtual SequenceNumber GetLastPublishedSequence() const { |
729 | 22.1k | if (last_seq_same_as_publish_seq_) { |
730 | 22.1k | return versions_->LastSequence(); |
731 | 22.1k | } else { |
732 | 0 | return versions_->LastPublishedSequence(); |
733 | 0 | } |
734 | 22.1k | } |
735 | | |
736 | | // REQUIRES: joined the main write queue if two_write_queues is disabled, and |
737 | | // the second write queue otherwise. |
738 | | virtual void SetLastPublishedSequence(SequenceNumber seq); |
739 | | // Returns LastSequence in last_seq_same_as_publish_seq_ |
740 | | // mode and LastAllocatedSequence otherwise. This is useful when visiblility |
741 | | // depends also on data written to the WAL but not to the memtable. |
742 | | SequenceNumber TEST_GetLastVisibleSequence() const; |
743 | | |
744 | | // Similar to Write() but will call the callback once on the single write |
745 | | // thread to determine whether it is safe to perform the write. |
746 | | virtual Status WriteWithCallback(const WriteOptions& write_options, |
747 | | WriteBatch* my_batch, |
748 | | WriteCallback* callback, |
749 | | UserWriteCallback* user_write_cb = nullptr); |
750 | | |
751 | | // Returns the sequence number that is guaranteed to be smaller than or equal |
752 | | // to the sequence number of any key that could be inserted into the current |
753 | | // memtables. It can then be assumed that any write with a larger(or equal) |
754 | | // sequence number will be present in this memtable or a later memtable. |
755 | | // |
756 | | // If the earliest sequence number could not be determined, |
757 | | // kMaxSequenceNumber will be returned. |
758 | | // |
759 | | // If include_history=true, will also search Memtables in MemTableList |
760 | | // History. |
761 | | SequenceNumber GetEarliestMemTableSequenceNumber(SuperVersion* sv, |
762 | | bool include_history); |
763 | | |
764 | | // For a given key, check to see if there are any records for this key |
765 | | // in the memtables, including memtable history. If cache_only is false, |
766 | | // SST files will also be checked. |
767 | | // |
768 | | // `key` should NOT have user-defined timestamp appended to user key even if |
769 | | // timestamp is enabled. |
770 | | // |
771 | | // If a key is found, *found_record_for_key will be set to true and |
772 | | // *seq will be set to the stored sequence number for the latest |
773 | | // operation on this key or kMaxSequenceNumber if unknown. If user-defined |
774 | | // timestamp is enabled for this column family and timestamp is not nullptr, |
775 | | // then *timestamp will be set to the stored timestamp for the latest |
776 | | // operation on this key. |
777 | | // If no key is found, *found_record_for_key will be set to false. |
778 | | // |
779 | | // Note: If cache_only=false, it is possible for *seq to be set to 0 if |
780 | | // the sequence number has been cleared from the record. If the caller is |
781 | | // holding an active db snapshot, we know the missing sequence must be less |
782 | | // than the snapshot's sequence number (sequence numbers are only cleared |
783 | | // when there are no earlier active snapshots). |
784 | | // |
785 | | // If NotFound is returned and found_record_for_key is set to false, then no |
786 | | // record for this key was found. If the caller is holding an active db |
787 | | // snapshot, we know that no key could have existing after this snapshot |
788 | | // (since we do not compact keys that have an earlier snapshot). |
789 | | // |
790 | | // Only records newer than or at `lower_bound_seq` are guaranteed to be |
791 | | // returned. Memtables and files may not be checked if it only contains data |
792 | | // older than `lower_bound_seq`. |
793 | | // |
794 | | // Returns OK or NotFound on success, |
795 | | // other status on unexpected error. |
796 | | // TODO(andrewkr): this API need to be aware of range deletion operations |
797 | | Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, |
798 | | bool cache_only, |
799 | | SequenceNumber lower_bound_seq, |
800 | | SequenceNumber* seq, std::string* timestamp, |
801 | | bool* found_record_for_key, |
802 | | bool* is_blob_index); |
803 | | |
804 | | Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key, |
805 | | const Slice& lower_bound, const Slice upper_bound); |
806 | | Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, |
807 | | const Slice& lower_bound, |
808 | | const Slice upper_bound); |
809 | | |
810 | | // Similar to GetSnapshot(), but also lets the db know that this snapshot |
811 | | // will be used for transaction write-conflict checking. The DB can then |
812 | | // make sure not to compact any keys that would prevent a write-conflict from |
813 | | // being detected. |
814 | | const Snapshot* GetSnapshotForWriteConflictBoundary(); |
815 | | |
816 | | // max_file_num_to_ignore allows bottom level compaction to filter out newly |
817 | | // compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will |
818 | | // disable the filtering |
819 | | // If `final_output_level` is not nullptr, it is set to manual compaction's |
820 | | // output level if returned status is OK, and it may or may not be set to |
821 | | // manual compaction's output level if returned status is not OK. |
822 | | Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, |
823 | | int output_level, |
824 | | const CompactRangeOptions& compact_range_options, |
825 | | const Slice* begin, const Slice* end, |
826 | | bool exclusive, bool disallow_trivial_move, |
827 | | uint64_t max_file_num_to_ignore, |
828 | | const std::string& trim_ts, |
829 | | int* final_output_level = nullptr); |
830 | | |
831 | | // Return an internal iterator over the current state of the database. |
832 | | // The keys of this iterator are internal keys (see format.h). |
833 | | // The returned iterator should be deleted when no longer needed. |
834 | | // If allow_unprepared_value is true, the returned iterator may defer reading |
835 | | // the value and so will require PrepareValue() to be called before value(); |
836 | | // allow_unprepared_value = false is convenient when this optimization is not |
837 | | // useful, e.g. when reading the whole column family. |
838 | | // |
839 | | // read_options.ignore_range_deletions determines whether range tombstones are |
840 | | // processed in the returned interator internally, i.e., whether range |
841 | | // tombstone covered keys are in this iterator's output. |
842 | | // @param read_options Must outlive the returned iterator. |
843 | | InternalIterator* NewInternalIterator( |
844 | | const ReadOptions& read_options, Arena* arena, SequenceNumber sequence, |
845 | | ColumnFamilyHandle* column_family = nullptr, |
846 | | bool allow_unprepared_value = false); |
847 | | |
848 | | // Note: to support DB iterator refresh, memtable range tombstones in the |
849 | | // underlying merging iterator needs to be refreshed. If db_iter is not |
850 | | // nullptr, db_iter->SetMemtableRangetombstoneIter() is called with the |
851 | | // memtable range tombstone iterator used by the underlying merging iterator. |
852 | | // This range tombstone iterator can be refreshed later by db_iter. |
853 | | // @param read_options Must outlive the returned iterator. |
854 | | InternalIterator* NewInternalIterator(const ReadOptions& read_options, |
855 | | ColumnFamilyData* cfd, |
856 | | SuperVersion* super_version, |
857 | | Arena* arena, SequenceNumber sequence, |
858 | | bool allow_unprepared_value, |
859 | | ArenaWrappedDBIter* db_iter = nullptr); |
860 | | |
861 | 0 | LogsWithPrepTracker* logs_with_prep_tracker() { |
862 | 0 | return &logs_with_prep_tracker_; |
863 | 0 | } |
864 | | |
865 | | struct BGJobLimits { |
866 | | int max_flushes; |
867 | | int max_compactions; |
868 | | }; |
869 | | // Returns maximum background flushes and compactions allowed to be scheduled |
870 | | BGJobLimits GetBGJobLimits() const; |
871 | | // Need a static version that can be called during SanitizeOptions(). |
872 | | static BGJobLimits GetBGJobLimits(int max_background_flushes, |
873 | | int max_background_compactions, |
874 | | int max_background_jobs, |
875 | | bool parallelize_compactions); |
876 | | |
877 | | // move logs pending closing from job_context to the DB queue and |
878 | | // schedule a purge |
879 | | void ScheduleBgLogWriterClose(JobContext* job_context); |
880 | | |
881 | | uint64_t MinLogNumberToKeep(); |
882 | | |
883 | | uint64_t MinLogNumberToRecycle(); |
884 | | |
885 | | // Returns the lower bound file number for SSTs that won't be deleted, even if |
886 | | // they're obsolete. This lower bound is used internally to prevent newly |
887 | | // created flush/compaction output files from being deleted before they're |
888 | | // installed. This technique avoids the need for tracking the exact numbers of |
889 | | // files pending creation, although it prevents more files than necessary from |
890 | | // being deleted. |
891 | | uint64_t MinObsoleteSstNumberToKeep(); |
892 | | |
893 | | uint64_t GetObsoleteSstFilesSize(); |
894 | | |
895 | | uint64_t MinOptionsFileNumberToKeep(); |
896 | | |
897 | | // Returns the list of live files in 'live' and the list |
898 | | // of all files in the filesystem in 'candidate_files'. |
899 | | // If force == false and the last call was less than |
900 | | // db_options_.delete_obsolete_files_period_micros microseconds ago, |
901 | | // it will not fill up the job_context |
902 | | void FindObsoleteFiles(JobContext* job_context, bool force, |
903 | | bool no_full_scan = false); |
904 | | |
905 | | // Diffs the files listed in filenames and those that do not |
906 | | // belong to live files are possibly removed. Also, removes all the |
907 | | // files in sst_delete_files and log_delete_files. |
908 | | // It is not necessary to hold the mutex when invoking this method. |
909 | | // If FindObsoleteFiles() was run, we need to also run |
910 | | // PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true |
911 | | void PurgeObsoleteFiles(JobContext& background_contet, |
912 | | bool schedule_only = false); |
913 | | |
914 | | // Schedule a background job to actually delete obsolete files. |
915 | | void SchedulePurge(); |
916 | | |
917 | 0 | const SnapshotList& snapshots() const { return snapshots_; } |
918 | | |
919 | | // load list of snapshots to `snap_vector` that is no newer than `max_seq` |
920 | | // in ascending order. |
921 | | // `oldest_write_conflict_snapshot` is filled with the oldest snapshot |
922 | | // which satisfies SnapshotImpl.is_write_conflict_boundary_ = true. |
923 | | void LoadSnapshots(std::vector<SequenceNumber>* snap_vector, |
924 | | SequenceNumber* oldest_write_conflict_snapshot, |
925 | 0 | const SequenceNumber& max_seq) const { |
926 | 0 | InstrumentedMutexLock l(mutex()); |
927 | 0 | snapshots().GetAll(snap_vector, oldest_write_conflict_snapshot, max_seq); |
928 | 0 | } |
929 | | |
930 | 12.0k | const ImmutableDBOptions& immutable_db_options() const { |
931 | 12.0k | return immutable_db_options_; |
932 | 12.0k | } |
933 | | |
934 | | // Cancel all background jobs, including flush, compaction, background |
935 | | // purging, stats dumping threads, etc. If `wait` = true, wait for the |
936 | | // running jobs to abort or finish before returning. Otherwise, only |
937 | | // sends the signals. |
938 | | void CancelAllBackgroundWork(bool wait); |
939 | | |
940 | | // Find Super version and reference it. Based on options, it might return |
941 | | // the thread local cached one. |
942 | | // Call ReturnAndCleanupSuperVersion() when it is no longer needed. |
943 | | SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd); |
944 | | |
945 | | // Similar to the previous function but looks up based on a column family id. |
946 | | // nullptr will be returned if this column family no longer exists. |
947 | | // REQUIRED: this function should only be called on the write thread or if the |
948 | | // mutex is held. |
949 | | SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id); |
950 | | |
951 | | // Un-reference the super version and clean it up if it is the last reference. |
952 | | void CleanupSuperVersion(SuperVersion* sv); |
953 | | |
954 | | // Un-reference the super version and return it to thread local cache if |
955 | | // needed. If it is the last reference of the super version. Clean it up |
956 | | // after un-referencing it. |
957 | | void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, SuperVersion* sv); |
958 | | |
959 | | // Similar to the previous function but looks up based on a column family id. |
960 | | // nullptr will be returned if this column family no longer exists. |
961 | | // REQUIRED: this function should only be called on the write thread. |
962 | | void ReturnAndCleanupSuperVersion(uint32_t colun_family_id, SuperVersion* sv); |
963 | | |
964 | | // REQUIRED: this function should only be called on the write thread or if the |
965 | | // mutex is held. Return value only valid until next call to this function or |
966 | | // mutex is released. |
967 | | ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id); |
968 | | |
969 | | // Same as above, should called without mutex held and not on write thread. |
970 | | std::unique_ptr<ColumnFamilyHandle> GetColumnFamilyHandleUnlocked( |
971 | | uint32_t column_family_id); |
972 | | |
973 | | // Returns the number of currently running flushes. |
974 | | // REQUIREMENT: mutex_ must be held when calling this function. |
975 | 0 | int num_running_flushes() { |
976 | 0 | mutex_.AssertHeld(); |
977 | 0 | return num_running_flushes_; |
978 | 0 | } |
979 | | |
980 | 0 | const WriteController& write_controller() { return write_controller_; } |
981 | | |
982 | | // hollow transactions shell used for recovery. |
983 | | // these will then be passed to TransactionDB so that |
984 | | // locks can be reacquired before writing can resume. |
985 | | struct RecoveredTransaction { |
986 | | std::string name_; |
987 | | bool unprepared_; |
988 | | |
989 | | struct BatchInfo { |
990 | | uint64_t log_number_; |
991 | | // TODO(lth): For unprepared, the memory usage here can be big for |
992 | | // unprepared transactions. This is only useful for rollbacks, and we |
993 | | // can in theory just keep keyset for that. |
994 | | WriteBatch* batch_; |
995 | | // Number of sub-batches. A new sub-batch is created if txn attempts to |
996 | | // insert a duplicate key,seq to memtable. This is currently used in |
997 | | // WritePreparedTxn/WriteUnpreparedTxn. |
998 | | size_t batch_cnt_; |
999 | | }; |
1000 | | |
1001 | | // This maps the seq of the first key in the batch to BatchInfo, which |
1002 | | // contains WriteBatch and other information relevant to the batch. |
1003 | | // |
1004 | | // For WriteUnprepared, batches_ can have size greater than 1, but for |
1005 | | // other write policies, it must be of size 1. |
1006 | | std::map<SequenceNumber, BatchInfo> batches_; |
1007 | | |
1008 | | explicit RecoveredTransaction(const uint64_t log, const std::string& name, |
1009 | | WriteBatch* batch, SequenceNumber seq, |
1010 | | size_t batch_cnt, bool unprepared) |
1011 | 0 | : name_(name), unprepared_(unprepared) { |
1012 | 0 | batches_[seq] = {log, batch, batch_cnt}; |
1013 | 0 | } |
1014 | | |
1015 | 0 | ~RecoveredTransaction() { |
1016 | 0 | for (auto& it : batches_) { |
1017 | 0 | delete it.second.batch_; |
1018 | 0 | } |
1019 | 0 | } |
1020 | | |
1021 | | void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch, |
1022 | 0 | size_t batch_cnt, bool unprepared) { |
1023 | 0 | assert(batches_.count(seq) == 0); |
1024 | 0 | batches_[seq] = {log_number, batch, batch_cnt}; |
1025 | | // Prior state must be unprepared, since the prepare batch must be the |
1026 | | // last batch. |
1027 | 0 | assert(unprepared_); |
1028 | 0 | unprepared_ = unprepared; |
1029 | 0 | } |
1030 | | }; |
1031 | | |
1032 | 67.4k | bool allow_2pc() const { return immutable_db_options_.allow_2pc; } |
1033 | | |
1034 | | std::unordered_map<std::string, RecoveredTransaction*> |
1035 | 0 | recovered_transactions() { |
1036 | 0 | return recovered_transactions_; |
1037 | 0 | } |
1038 | | |
1039 | 0 | RecoveredTransaction* GetRecoveredTransaction(const std::string& name) { |
1040 | 0 | auto it = recovered_transactions_.find(name); |
1041 | 0 | if (it == recovered_transactions_.end()) { |
1042 | 0 | return nullptr; |
1043 | 0 | } else { |
1044 | 0 | return it->second; |
1045 | 0 | } |
1046 | 0 | } |
1047 | | |
1048 | | void InsertRecoveredTransaction(const uint64_t log, const std::string& name, |
1049 | | WriteBatch* batch, SequenceNumber seq, |
1050 | 0 | size_t batch_cnt, bool unprepared_batch) { |
1051 | | // For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple |
1052 | | // times for every unprepared batch encountered during recovery. |
1053 | | // |
1054 | | // If the transaction is prepared, then the last call to |
1055 | | // InsertRecoveredTransaction will have unprepared_batch = false. |
1056 | 0 | auto rtxn = recovered_transactions_.find(name); |
1057 | 0 | if (rtxn == recovered_transactions_.end()) { |
1058 | 0 | recovered_transactions_[name] = new RecoveredTransaction( |
1059 | 0 | log, name, batch, seq, batch_cnt, unprepared_batch); |
1060 | 0 | } else { |
1061 | 0 | rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch); |
1062 | 0 | } |
1063 | 0 | logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log); |
1064 | 0 | } |
1065 | | |
1066 | 0 | void DeleteRecoveredTransaction(const std::string& name) { |
1067 | 0 | auto it = recovered_transactions_.find(name); |
1068 | 0 | assert(it != recovered_transactions_.end()); |
1069 | 0 | auto* trx = it->second; |
1070 | 0 | recovered_transactions_.erase(it); |
1071 | 0 | for (const auto& info : trx->batches_) { |
1072 | 0 | logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed( |
1073 | 0 | info.second.log_number_); |
1074 | 0 | } |
1075 | 0 | delete trx; |
1076 | 0 | } |
1077 | | |
1078 | 0 | void DeleteAllRecoveredTransactions() { |
1079 | 0 | for (auto it = recovered_transactions_.begin(); |
1080 | 0 | it != recovered_transactions_.end(); ++it) { |
1081 | 0 | delete it->second; |
1082 | 0 | } |
1083 | 0 | recovered_transactions_.clear(); |
1084 | 0 | } |
1085 | | |
1086 | 0 | void AddToLogsToFreeQueue(log::Writer* log_writer) { |
1087 | 0 | mutex_.AssertHeld(); |
1088 | 0 | wals_to_free_queue_.push_back(log_writer); |
1089 | 0 | } |
1090 | | |
1091 | 0 | void AddSuperVersionsToFreeQueue(SuperVersion* sv) { |
1092 | 0 | superversions_to_free_queue_.push_back(sv); |
1093 | 0 | } |
1094 | | |
1095 | | void SetSnapshotChecker(SnapshotChecker* snapshot_checker); |
1096 | | |
1097 | | // Fill JobContext with snapshot information needed by flush and compaction. |
1098 | | void InitSnapshotContext(JobContext* job_context); |
1099 | | |
1100 | | // Not thread-safe. |
1101 | | void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback); |
1102 | | |
1103 | 52.6k | InstrumentedMutex* mutex() const { return &mutex_; } |
1104 | | |
1105 | | // Initialize a brand new DB. The DB directory is expected to be empty before |
1106 | | // calling it. Push new manifest file name into `new_filenames`. |
1107 | | Status NewDB(std::vector<std::string>* new_filenames); |
1108 | | |
1109 | | // This is to be used only by internal rocksdb classes. |
1110 | | static Status Open(const DBOptions& db_options, const std::string& name, |
1111 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
1112 | | std::vector<ColumnFamilyHandle*>* handles, |
1113 | | std::unique_ptr<DB>* dbptr, const bool seq_per_batch, |
1114 | | const bool batch_per_txn, const bool is_retry, |
1115 | | bool* can_retry); |
1116 | | |
1117 | | static IOStatus CreateAndNewDirectory( |
1118 | | FileSystem* fs, const std::string& dirname, |
1119 | | std::unique_ptr<FSDirectory>* directory); |
1120 | | |
1121 | | // find stats map from stats_history_ with smallest timestamp in |
1122 | | // the range of [start_time, end_time) |
1123 | | bool FindStatsByTime(uint64_t start_time, uint64_t end_time, |
1124 | | uint64_t* new_time, |
1125 | | std::map<std::string, uint64_t>* stats_map); |
1126 | | |
1127 | | // Print information of all tombstones of all iterators to the std::string |
1128 | | // This is only used by ldb. The output might be capped. Tombstones |
1129 | | // printed out are not guaranteed to be in any order. |
1130 | | Status TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family, |
1131 | | int max_entries_to_print, |
1132 | | std::string* out_str); |
1133 | | |
1134 | 0 | VersionSet* GetVersionSet() const { return versions_.get(); } |
1135 | | |
1136 | | Status WaitForCompact( |
1137 | | const WaitForCompactOptions& wait_for_compact_options) override; |
1138 | | |
1139 | | #ifndef NDEBUG |
1140 | | // Compact any files in the named level that overlap [*begin, *end] |
1141 | | Status TEST_CompactRange(int level, const Slice* begin, const Slice* end, |
1142 | | ColumnFamilyHandle* column_family = nullptr, |
1143 | | bool disallow_trivial_move = false); |
1144 | | |
1145 | | Status TEST_SwitchWAL(); |
1146 | | |
1147 | | bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; } |
1148 | | |
1149 | | bool TEST_IsLogGettingFlushed() { |
1150 | | return alive_wal_files_.begin()->getting_flushed; |
1151 | | } |
1152 | | |
1153 | | Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr); |
1154 | | |
1155 | | // Force current memtable contents to be flushed. |
1156 | | Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false, |
1157 | | ColumnFamilyHandle* cfh = nullptr); |
1158 | | |
1159 | | Status TEST_FlushMemTable(ColumnFamilyData* cfd, |
1160 | | const FlushOptions& flush_opts); |
1161 | | |
1162 | | // Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This |
1163 | | // is because in certain cases, we can flush column families, wait for the |
1164 | | // flush to complete, but delete the column family handle before the wait |
1165 | | // finishes. For example in CompactRange. |
1166 | | Status TEST_AtomicFlushMemTables( |
1167 | | const autovector<ColumnFamilyData*>& provided_candidate_cfds, |
1168 | | const FlushOptions& flush_opts); |
1169 | | |
1170 | | // Wait for background threads to complete scheduled work. |
1171 | | Status TEST_WaitForBackgroundWork(); |
1172 | | |
1173 | | // Wait for memtable compaction |
1174 | | Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); |
1175 | | |
1176 | | Status TEST_WaitForCompact(); |
1177 | | Status TEST_WaitForCompact( |
1178 | | const WaitForCompactOptions& wait_for_compact_options); |
1179 | | |
1180 | | // Wait for any background purge |
1181 | | Status TEST_WaitForPurge(); |
1182 | | |
1183 | | // Get the background error status |
1184 | | Status TEST_GetBGError(); |
1185 | | |
1186 | | bool TEST_IsRecoveryInProgress(); |
1187 | | |
1188 | | // Return the maximum overlapping data (in bytes) at next level for any |
1189 | | // file at a level >= 1. |
1190 | | uint64_t TEST_MaxNextLevelOverlappingBytes( |
1191 | | ColumnFamilyHandle* column_family = nullptr); |
1192 | | |
1193 | | // Return the current manifest file no. |
1194 | | uint64_t TEST_Current_Manifest_FileNo(); |
1195 | | |
1196 | | // Returns the number that'll be assigned to the next file that's created. |
1197 | | uint64_t TEST_Current_Next_FileNo(); |
1198 | | |
1199 | | // get total level0 file size. Only for testing. |
1200 | | uint64_t TEST_GetLevel0TotalSize(); |
1201 | | |
1202 | | void TEST_GetFilesMetaData( |
1203 | | ColumnFamilyHandle* column_family, |
1204 | | std::vector<std::vector<FileMetaData>>* metadata, |
1205 | | std::vector<std::shared_ptr<BlobFileMetaData>>* blob_metadata = nullptr); |
1206 | | |
1207 | | void TEST_LockMutex(); |
1208 | | |
1209 | | void TEST_UnlockMutex(); |
1210 | | |
1211 | | InstrumentedMutex* TEST_Mutex() { return &mutex_; } |
1212 | | |
1213 | | void TEST_SignalAllBgCv(); |
1214 | | |
1215 | | // REQUIRES: mutex locked |
1216 | | void* TEST_BeginWrite(); |
1217 | | |
1218 | | // REQUIRES: mutex locked |
1219 | | // pass the pointer that you got from TEST_BeginWrite() |
1220 | | void TEST_EndWrite(void* w); |
1221 | | |
1222 | | uint64_t TEST_MaxTotalInMemoryState() const { |
1223 | | return max_total_in_memory_state_; |
1224 | | } |
1225 | | |
1226 | | size_t TEST_LogsToFreeSize(); |
1227 | | |
1228 | | uint64_t TEST_LogfileNumber(); |
1229 | | |
1230 | | uint64_t TEST_wals_total_size() const { |
1231 | | return wals_total_size_.LoadRelaxed(); |
1232 | | } |
1233 | | |
1234 | | void TEST_GetAllBlockCaches(std::unordered_set<const Cache*>* cache_set); |
1235 | | |
1236 | | // Return the lastest MutableCFOptions of a column family |
1237 | | Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family, |
1238 | | MutableCFOptions* mutable_cf_options); |
1239 | | |
1240 | | Cache* TEST_table_cache() { return table_cache_.get(); } |
1241 | | |
1242 | | WriteController& TEST_write_controler() { return write_controller_; } |
1243 | | |
1244 | | uint64_t TEST_FindMinLogContainingOutstandingPrep(); |
1245 | | uint64_t TEST_FindMinPrepLogReferencedByMemTable(); |
1246 | | size_t TEST_PreparedSectionCompletedSize(); |
1247 | | size_t TEST_LogsWithPrepSize(); |
1248 | | |
1249 | | int TEST_BGCompactionsAllowed() const; |
1250 | | int TEST_BGFlushesAllowed() const; |
1251 | | int TEST_NumRunningBottomCompactions() const; |
1252 | | size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; |
1253 | | void TEST_WaitForPeriodicTaskRun(std::function<void()> callback) const; |
1254 | | SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const; |
1255 | | const autovector<uint64_t>& TEST_GetFilesToQuarantine() const; |
1256 | | size_t TEST_EstimateInMemoryStatsHistorySize() const; |
1257 | | |
1258 | | uint64_t TEST_GetCurrentLogNumber() const { |
1259 | | InstrumentedMutexLock l(mutex()); |
1260 | | assert(!logs_.empty()); |
1261 | | return logs_.back().number; |
1262 | | } |
1263 | | |
1264 | | void TEST_DeleteObsoleteFiles(); |
1265 | | |
1266 | | const std::unordered_set<uint64_t>& TEST_GetFilesGrabbedForPurge() const { |
1267 | | return files_grabbed_for_purge_; |
1268 | | } |
1269 | | |
1270 | | const PeriodicTaskScheduler& TEST_GetPeriodicTaskScheduler() const; |
1271 | | |
1272 | | static Status TEST_ValidateOptions(const DBOptions& db_options) { |
1273 | | return ValidateOptions(db_options); |
1274 | | } |
1275 | | #endif // NDEBUG |
1276 | | |
1277 | | // In certain configurations, verify that the table/blob file cache only |
1278 | | // contains entries for live files, to check for effective leaks of open |
1279 | | // files. This can only be called when purging of obsolete files has |
1280 | | // "settled," such as during parts of DB Close(). |
1281 | | void TEST_VerifyNoObsoleteFilesCached(bool db_mutex_already_held) const; |
1282 | | |
1283 | | // persist stats to column family "_persistent_stats" |
1284 | | void PersistStats(); |
1285 | | |
1286 | | // dump rocksdb.stats to LOG |
1287 | | void DumpStats(); |
1288 | | |
1289 | | // flush LOG out of application buffer |
1290 | | void FlushInfoLog(); |
1291 | | |
1292 | | // For the background timer job |
1293 | | void RecordSeqnoToTimeMapping(); |
1294 | | |
1295 | | // Compactions rely on an event triggers like flush/compaction/SetOptions. |
1296 | | // We need to trigger periodic compactions even when there is no such trigger. |
1297 | | // This function checks and schedules available compactions and will run |
1298 | | // periodically. |
1299 | | void TriggerPeriodicCompaction(); |
1300 | | |
1301 | | // REQUIRES: DB mutex held |
1302 | | std::pair<SequenceNumber, uint64_t> GetSeqnoToTimeSample() const; |
1303 | | |
1304 | | // REQUIRES: DB mutex held or during open |
1305 | | void EnsureSeqnoToTimeMapping(const MinAndMaxPreserveSeconds& preserve_secs); |
1306 | | |
1307 | | // Only called during open |
1308 | | void PrepopulateSeqnoToTimeMapping( |
1309 | | const MinAndMaxPreserveSeconds& preserve_secs); |
1310 | | |
1311 | | // Interface to block and signal the DB in case of stalling writes by |
1312 | | // WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface. |
1313 | | // When DB needs to be blocked or signalled by WriteBufferManager, |
1314 | | // state_ is changed accordingly. |
1315 | | class WBMStallInterface : public StallInterface { |
1316 | | public: |
1317 | | enum State { |
1318 | | BLOCKED = 0, |
1319 | | RUNNING, |
1320 | | }; |
1321 | | |
1322 | 40.2k | WBMStallInterface() : state_cv_(&state_mutex_) { |
1323 | 40.2k | MutexLock lock(&state_mutex_); |
1324 | 40.2k | state_ = State::RUNNING; |
1325 | 40.2k | } |
1326 | | |
1327 | 0 | void SetState(State state) { |
1328 | 0 | MutexLock lock(&state_mutex_); |
1329 | 0 | state_ = state; |
1330 | 0 | } |
1331 | | |
1332 | | // Change the state_ to State::BLOCKED and wait until its state is |
1333 | | // changed by WriteBufferManager. When stall is cleared, Signal() is |
1334 | | // called to change the state and unblock the DB. |
1335 | 0 | void Block() override { |
1336 | 0 | MutexLock lock(&state_mutex_); |
1337 | 0 | while (state_ == State::BLOCKED) { |
1338 | 0 | TEST_SYNC_POINT("WBMStallInterface::BlockDB"); |
1339 | 0 | state_cv_.Wait(); |
1340 | 0 | } |
1341 | 0 | } |
1342 | | |
1343 | | // Called from WriteBufferManager. This function changes the state_ |
1344 | | // to State::RUNNING indicating the stall is cleared and DB can proceed. |
1345 | 40.2k | void Signal() override { |
1346 | 40.2k | { |
1347 | 40.2k | MutexLock lock(&state_mutex_); |
1348 | 40.2k | state_ = State::RUNNING; |
1349 | 40.2k | } |
1350 | 40.2k | state_cv_.Signal(); |
1351 | 40.2k | } |
1352 | | |
1353 | | private: |
1354 | | // Conditional variable and mutex to block and |
1355 | | // signal the DB during stalling process. |
1356 | | port::Mutex state_mutex_; |
1357 | | port::CondVar state_cv_; |
1358 | | // state represting whether DB is running or blocked because of stall by |
1359 | | // WriteBufferManager. |
1360 | | State state_; |
1361 | | }; |
1362 | | |
1363 | | static void TEST_ResetDbSessionIdGen(); |
1364 | | static std::string GenerateDbSessionId(Env* env); |
1365 | | |
1366 | 0 | bool seq_per_batch() const { return seq_per_batch_; } |
1367 | | |
1368 | | protected: |
1369 | | const std::string dbname_; |
1370 | | // TODO(peterd): unify with VersionSet::db_id_ |
1371 | | std::string db_id_; |
1372 | | // db_session_id_ is an identifier that gets reset |
1373 | | // every time the DB is opened |
1374 | | std::string db_session_id_; |
1375 | | std::unique_ptr<VersionSet> versions_; |
1376 | | // Flag to check whether we allocated and own the info log file |
1377 | | bool own_info_log_; |
1378 | | Status init_logger_creation_s_; |
1379 | | const DBOptions initial_db_options_; |
1380 | | Env* const env_; |
1381 | | std::shared_ptr<IOTracer> io_tracer_; |
1382 | | const ImmutableDBOptions immutable_db_options_; |
1383 | | FileSystemPtr fs_; |
1384 | | MutableDBOptions mutable_db_options_; |
1385 | | Statistics* stats_; |
1386 | | std::unordered_map<std::string, RecoveredTransaction*> |
1387 | | recovered_transactions_; |
1388 | | std::unique_ptr<Tracer> tracer_; |
1389 | | InstrumentedMutex trace_mutex_; |
1390 | | BlockCacheTracer block_cache_tracer_; |
1391 | | |
1392 | | // constant false canceled flag, used when the compaction is not manual |
1393 | | const std::atomic<bool> kManualCompactionCanceledFalse_{false}; |
1394 | | |
1395 | | // State below is protected by mutex_ |
1396 | | // With two_write_queues enabled, some of the variables that accessed during |
1397 | | // WriteToWAL need different synchronization: wal_empty_, alive_wal_files_, |
1398 | | // logs_, cur_wal_number_. Refer to the definition of each variable below for |
1399 | | // more description. |
1400 | | // |
1401 | | // Protects access to most ColumnFamilyData methods, see more in comment for |
1402 | | // each method. |
1403 | | // |
1404 | | // `mutex_` can be a hot lock in some workloads, so it deserves dedicated |
1405 | | // cachelines. |
1406 | | mutable CacheAlignedInstrumentedMutex mutex_; |
1407 | | |
1408 | | ColumnFamilyHandleImpl* default_cf_handle_ = nullptr; |
1409 | | InternalStats* default_cf_internal_stats_ = nullptr; |
1410 | | |
1411 | | // table_cache_ provides its own synchronization |
1412 | | std::shared_ptr<Cache> table_cache_; |
1413 | | |
1414 | | ErrorHandler error_handler_; |
1415 | | |
1416 | | // Unified interface for logging events |
1417 | | EventLogger event_logger_; |
1418 | | |
1419 | | // only used for dynamically adjusting max_total_wal_size. it is a sum of |
1420 | | // [write_buffer_size * max_write_buffer_number] over all column families |
1421 | | std::atomic<uint64_t> max_total_in_memory_state_ = 0; |
1422 | | |
1423 | | // The options to access storage files |
1424 | | const FileOptions file_options_; |
1425 | | |
1426 | | // Additonal options for compaction and flush |
1427 | | FileOptions file_options_for_compaction_; |
1428 | | |
1429 | | std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_; |
1430 | | |
1431 | | // Increase the sequence number after writing each batch, whether memtable is |
1432 | | // disabled for that or not. Otherwise the sequence number is increased after |
1433 | | // writing each key into memtable. This implies that when disable_memtable is |
1434 | | // set, the seq is not increased at all. |
1435 | | // |
1436 | | // Default: false |
1437 | | const bool seq_per_batch_; |
1438 | | // This determines during recovery whether we expect one writebatch per |
1439 | | // recovered transaction, or potentially multiple writebatches per |
1440 | | // transaction. For WriteUnprepared, this is set to false, since multiple |
1441 | | // batches can exist per transaction. |
1442 | | // |
1443 | | // Default: true |
1444 | | const bool batch_per_txn_; |
1445 | | |
1446 | | // Each flush or compaction gets its own job id. this counter makes sure |
1447 | | // they're unique |
1448 | | std::atomic<int> next_job_id_ = 1; |
1449 | | |
1450 | | std::atomic<bool> shutting_down_ = false; |
1451 | | |
1452 | | // No new background jobs can be queued if true. This is used to prevent new |
1453 | | // background jobs from being queued after WaitForCompact() completes waiting |
1454 | | // all background jobs then attempts to close when close_db_ option is true. |
1455 | | bool reject_new_background_jobs_ = false; |
1456 | | |
1457 | | // RecoveryContext struct stores the context about version edits along |
1458 | | // with corresponding column_family_data and column_family_options. |
1459 | | class RecoveryContext { |
1460 | | public: |
1461 | 40.2k | ~RecoveryContext() { |
1462 | 54.8k | for (auto& edit_list : edit_lists_) { |
1463 | 153k | for (auto* edit : edit_list) { |
1464 | 153k | delete edit; |
1465 | 153k | } |
1466 | 54.8k | } |
1467 | 40.2k | } |
1468 | | |
1469 | 153k | void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) { |
1470 | 153k | assert(cfd != nullptr); |
1471 | 153k | if (map_.find(cfd->GetID()) == map_.end()) { |
1472 | 54.8k | uint32_t size = static_cast<uint32_t>(map_.size()); |
1473 | 54.8k | map_.emplace(cfd->GetID(), size); |
1474 | 54.8k | cfds_.emplace_back(cfd); |
1475 | 54.8k | edit_lists_.emplace_back(autovector<VersionEdit*>()); |
1476 | 54.8k | } |
1477 | 153k | uint32_t i = map_[cfd->GetID()]; |
1478 | 153k | edit_lists_[i].emplace_back(new VersionEdit(edit)); |
1479 | 153k | } |
1480 | | |
1481 | | std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index; |
1482 | | autovector<ColumnFamilyData*> cfds_; |
1483 | | autovector<autovector<VersionEdit*>> edit_lists_; |
1484 | | // All existing data files (SST files and Blob files) found during DB::Open. |
1485 | | std::vector<std::string> existing_data_files_; |
1486 | | bool is_new_db_ = false; |
1487 | | }; |
1488 | | |
1489 | | // Persist options to options file. Must be holding options_mutex_. |
1490 | | // Will lock DB mutex if !db_mutex_already_held. |
1491 | | Status WriteOptionsFile(const WriteOptions& write_options, |
1492 | | bool db_mutex_already_held); |
1493 | | |
1494 | | Status CompactRangeInternal(const CompactRangeOptions& options, |
1495 | | ColumnFamilyHandle* column_family, |
1496 | | const Slice* begin, const Slice* end, |
1497 | | const std::string& trim_ts); |
1498 | | |
1499 | | // The following two functions can only be called when: |
1500 | | // 1. WriteThread::Writer::EnterUnbatched() is used. |
1501 | | // 2. db_mutex is NOT held |
1502 | | Status RenameTempFileToOptionsFile(const std::string& file_name, |
1503 | | bool is_remote_compaction_enabled); |
1504 | | Status DeleteObsoleteOptionsFiles(); |
1505 | | |
1506 | | void NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds, |
1507 | | FlushReason flush_reason); |
1508 | | |
1509 | | void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, |
1510 | | const MutableCFOptions& mutable_cf_options, |
1511 | | int job_id, FlushReason flush_reason); |
1512 | | |
1513 | | void NotifyOnFlushCompleted( |
1514 | | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, |
1515 | | std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info); |
1516 | | |
1517 | | void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, |
1518 | | const Status& st, |
1519 | | const CompactionJobStats& job_stats, int job_id); |
1520 | | |
1521 | | void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction* c, |
1522 | | const Status& st, |
1523 | | const CompactionJobStats& job_stats, |
1524 | | int job_id); |
1525 | | void NotifyOnMemTableSealed(ColumnFamilyData* cfd, |
1526 | | const MemTableInfo& mem_table_info); |
1527 | | |
1528 | | void NotifyOnExternalFileIngested( |
1529 | | ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job); |
1530 | | |
1531 | | Status FlushAllColumnFamilies(const FlushOptions& flush_options, |
1532 | | FlushReason flush_reason); |
1533 | | |
1534 | | virtual Status FlushForGetLiveFiles(bool force_atomic_flush = false); |
1535 | | |
1536 | | void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; |
1537 | | |
1538 | | void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const; |
1539 | | |
1540 | | void EraseThreadStatusDbInfo() const; |
1541 | | |
1542 | | // For CFs that has updates in `wbwi`, their memtable will be switched, |
1543 | | // and `wbwi` will be added as the latest immutable memtable. |
1544 | | // |
1545 | | // REQUIRES: this thread is currently at the front of the main writer queue. |
1546 | | // @param prep_log refers to the WAL that contains prepare record |
1547 | | // for the transaction based on wbwi. |
1548 | | // @param assigned_seqno Sequence numbers for the ingested memtable. |
1549 | | // @param last_seqno the value of versions_->LastSequence() after the write |
1550 | | // ingests `wbwi` is done. |
1551 | | // @param memtable_updated Whether the same write that ingests wbwi has |
1552 | | // updated memtable. This is useful for determining whether to set bg |
1553 | | // error when IngestWBWIAsMemtable fails. |
1554 | | Status IngestWBWIAsMemtable(std::shared_ptr<WriteBatchWithIndex> wbwi, |
1555 | | const WBWIMemTable::SeqnoRange& assigned_seqno, |
1556 | | uint64_t min_prep_log, SequenceNumber last_seqno, |
1557 | | bool memtable_updated, bool ignore_missing_cf); |
1558 | | |
1559 | | // If disable_memtable is set the application logic must guarantee that the |
1560 | | // batch will still be skipped from memtable during the recovery. An excption |
1561 | | // to this is seq_per_batch_ mode, in which since each batch already takes one |
1562 | | // seq, it is ok for the batch to write to memtable during recovery as long as |
1563 | | // it only takes one sequence number: i.e., no duplicate keys. |
1564 | | // In WriteCommitted it is guarnateed since disable_memtable is used for |
1565 | | // prepare batch which will be written to memtable later during the commit, |
1566 | | // and in WritePrepared it is guaranteed since it will be used only for WAL |
1567 | | // markers which will never be written to memtable. If the commit marker is |
1568 | | // accompanied with CommitTimeWriteBatch that is not written to memtable as |
1569 | | // long as it has no duplicate keys, it does not violate the one-seq-per-batch |
1570 | | // policy. |
1571 | | // batch_cnt is expected to be non-zero in seq_per_batch mode and |
1572 | | // indicates the number of sub-patches. A sub-patch is a subset of the write |
1573 | | // batch that does not have duplicate keys. |
1574 | | // `callback` is called before WAL write. |
1575 | | // See more in comment above WriteCallback::Callback(). |
1576 | | // pre_release_callback is called after WAL write and before memtable write. |
1577 | | // See more in comment above PreReleaseCallback::Callback(). |
1578 | | // post_memtable_callback is called after memtable write but before publishing |
1579 | | // the sequence number to readers. |
1580 | | // |
1581 | | // The main write queue. This is the only write queue that updates |
1582 | | // LastSequence. When using one write queue, the same sequence also indicates |
1583 | | // the last published sequence. |
1584 | | Status WriteImpl(const WriteOptions& options, WriteBatch* updates, |
1585 | | WriteCallback* callback = nullptr, |
1586 | | UserWriteCallback* user_write_cb = nullptr, |
1587 | | uint64_t* wal_used = nullptr, uint64_t log_ref = 0, |
1588 | | bool disable_memtable = false, uint64_t* seq_used = nullptr, |
1589 | | size_t batch_cnt = 0, |
1590 | | PreReleaseCallback* pre_release_callback = nullptr, |
1591 | | PostMemTableCallback* post_memtable_callback = nullptr, |
1592 | | std::shared_ptr<WriteBatchWithIndex> wbwi = nullptr); |
1593 | | |
1594 | | Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, |
1595 | | WriteCallback* callback = nullptr, |
1596 | | UserWriteCallback* user_write_cb = nullptr, |
1597 | | uint64_t* wal_used = nullptr, uint64_t log_ref = 0, |
1598 | | bool disable_memtable = false, |
1599 | | uint64_t* seq_used = nullptr); |
1600 | | |
1601 | | // Write only to memtables without joining any write queue |
1602 | | Status UnorderedWriteMemtable(const WriteOptions& write_options, |
1603 | | WriteBatch* my_batch, WriteCallback* callback, |
1604 | | uint64_t log_ref, SequenceNumber seq, |
1605 | | const size_t sub_batch_cnt); |
1606 | | |
1607 | | // Whether the batch requires to be assigned with an order |
1608 | | enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder }; |
1609 | | // Whether it requires publishing last sequence or not |
1610 | | enum PublishLastSeq : bool { kDontPublishLastSeq, kDoPublishLastSeq }; |
1611 | | |
1612 | | // Join the write_thread to write the batch only to the WAL. It is the |
1613 | | // responsibility of the caller to also write the write batch to the memtable |
1614 | | // if it required. |
1615 | | // |
1616 | | // sub_batch_cnt is expected to be non-zero when assign_order = kDoAssignOrder |
1617 | | // indicating the number of sub-batches in my_batch. A sub-patch is a subset |
1618 | | // of the write batch that does not have duplicate keys. When seq_per_batch is |
1619 | | // not set, each key is a separate sub_batch. Otherwise each duplicate key |
1620 | | // marks start of a new sub-batch. |
1621 | | Status WriteImplWALOnly( |
1622 | | WriteThread* write_thread, const WriteOptions& options, |
1623 | | WriteBatch* updates, WriteCallback* callback, |
1624 | | UserWriteCallback* user_write_cb, uint64_t* wal_used, |
1625 | | const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, |
1626 | | PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, |
1627 | | const PublishLastSeq publish_last_seq, const bool disable_memtable); |
1628 | | |
1629 | | // write cached_recoverable_state_ to memtable if it is not empty |
1630 | | // The writer must be the leader in write_thread_ and holding mutex_ |
1631 | | Status WriteRecoverableState(); |
1632 | | |
1633 | | // Actual implementation of Close() |
1634 | | virtual Status CloseImpl(); |
1635 | | |
1636 | | // Recover the descriptor from persistent storage. May do a significant |
1637 | | // amount of work to recover recently logged updates. Any changes to |
1638 | | // be made to the descriptor are added to *edit. |
1639 | | // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is |
1640 | | // skipped. |
1641 | | // recovery_ctx stores the context about version edits and all those |
1642 | | // edits are persisted to new Manifest after successfully syncing the new WAL. |
1643 | | virtual Status Recover( |
1644 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
1645 | | bool read_only = false, bool error_if_wal_file_exists = false, |
1646 | | bool error_if_data_exists_in_wals = false, bool is_retry = false, |
1647 | | uint64_t* recovered_seq = nullptr, |
1648 | | RecoveryContext* recovery_ctx = nullptr, bool* can_retry = nullptr); |
1649 | | |
1650 | 86.8k | virtual bool OwnTablesAndLogs() const { return true; } |
1651 | | |
1652 | | // Read/create DB identity file (as appropriate), and write DB ID to |
1653 | | // version_edit if provided. |
1654 | | Status SetupDBId(const WriteOptions& write_options, bool read_only, |
1655 | | bool is_new_db, bool is_retry, VersionEdit* version_edit); |
1656 | | // Assign db_id_ and write DB ID to version_edit if provided. |
1657 | | void SetDBId(std::string&& id, bool read_only, VersionEdit* version_edit); |
1658 | | |
1659 | | // Collect a deduplicated collection of paths used by this DB, including |
1660 | | // dbname_, DBOptions.db_paths, ColumnFamilyOptions.cf_paths. |
1661 | | std::set<std::string> CollectAllDBPaths(); |
1662 | | |
1663 | | // REQUIRES: db mutex held when calling this function, but the db mutex can |
1664 | | // be released and re-acquired. Db mutex will be held when the function |
1665 | | // returns. |
1666 | | // It stores all existing data files (SST and Blob) in RecoveryContext. In |
1667 | | // the meantime, we find out the largest file number present in the paths, and |
1668 | | // bump up the version set's next_file_number_ to be 1 + largest_file_number. |
1669 | | // recovery_ctx stores the context about version edits. All those edits are |
1670 | | // persisted to new Manifest after successfully syncing the new WAL. |
1671 | | Status MaybeUpdateNextFileNumber(RecoveryContext* recovery_ctx); |
1672 | | |
1673 | | // Track existing data files, including both referenced and unreferenced SST |
1674 | | // and Blob files in SstFileManager. This is only called during DB::Open and |
1675 | | // it's called before any file deletion start so that their deletion can be |
1676 | | // properly rate limited. |
1677 | | // Files may not be referenced in the MANIFEST because (e.g. |
1678 | | // 1. It's best effort recovery; |
1679 | | // 2. The VersionEdits referencing the SST files are appended to |
1680 | | // RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are |
1681 | | // still not synced to MANIFEST during recovery.) |
1682 | | // |
1683 | | // If the file is referenced in Manifest (typically that's the |
1684 | | // vast majority of all files), since it already has the file size |
1685 | | // on record, we don't need to query the file system. Otherwise, we query the |
1686 | | // file system for the size of an unreferenced file. |
1687 | | // REQUIRES: mutex unlocked |
1688 | | void TrackExistingDataFiles( |
1689 | | const std::vector<std::string>& existing_data_files); |
1690 | | |
1691 | | // Untrack data files in sst manager. This is only called during DB::Close on |
1692 | | // an unowned SstFileManager, to return it to a consistent state. |
1693 | | // REQUIRES: mutex unlocked |
1694 | | void UntrackDataFiles(); |
1695 | | |
1696 | | // SetDbSessionId() should be called in the constuctor DBImpl() |
1697 | | // to ensure that db_session_id_ gets updated every time the DB is opened |
1698 | | void SetDbSessionId(); |
1699 | | |
1700 | | Status FailIfCfHasTs(const ColumnFamilyHandle* column_family) const; |
1701 | | Status FailIfTsMismatchCf(ColumnFamilyHandle* column_family, |
1702 | | const Slice& ts) const; |
1703 | | |
1704 | | // Check that the read timestamp `ts` is at or above the `full_history_ts_low` |
1705 | | // timestamp in a `SuperVersion`. It's necessary to do this check after |
1706 | | // grabbing the SuperVersion. If the check passed, the referenced SuperVersion |
1707 | | // this read holds on to can ensure the read won't be affected if |
1708 | | // `full_history_ts_low` is increased concurrently, and this achieves that |
1709 | | // without explicitly locking by piggybacking the SuperVersion. |
1710 | | Status FailIfReadCollapsedHistory(const ColumnFamilyData* cfd, |
1711 | | const SuperVersion* sv, |
1712 | | const Slice& ts) const; |
1713 | | |
1714 | | // recovery_ctx stores the context about version edits and |
1715 | | // LogAndApplyForRecovery persist all those edits to new Manifest after |
1716 | | // successfully syncing new WAL. |
1717 | | // LogAndApplyForRecovery should be called only once during recovery and it |
1718 | | // should be called when RocksDB writes to a first new MANIFEST since this |
1719 | | // recovery. |
1720 | | Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx); |
1721 | | |
1722 | | // Schedule background work to open and validate SST files asynchronously. |
1723 | | // Called when open_files_async is enabled. |
1724 | | void ScheduleAsyncFileOpening(); |
1725 | | |
1726 | | // Mark async file opening as not needed. Used by subclasses that load |
1727 | | // table files through a different mechanism (e.g., ReactiveVersionSet). |
1728 | | void MarkAsyncFileOpenNotNeeded(); |
1729 | | |
1730 | | // Background work function for async file opening. |
1731 | | static void BGWorkAsyncFileOpen(void* arg); |
1732 | | |
1733 | | void InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap(); |
1734 | | |
1735 | | // Return true to proceed with current WAL record whose content is stored in |
1736 | | // `batch`. Return false to skip current WAL record. |
1737 | | bool InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number, |
1738 | | const std::string& wal_fname, |
1739 | | log::Reader::Reporter& reporter, |
1740 | | Status& status, bool& stop_replay, |
1741 | | WriteBatch& batch); |
1742 | | |
1743 | | // Indicate DB was opened successfully |
1744 | | bool opened_successfully_ = false; |
1745 | | |
1746 | | private: |
1747 | | friend class DB; |
1748 | | friend class DBImplSecondary; |
1749 | | friend class ErrorHandler; |
1750 | | friend class InternalStats; |
1751 | | friend class PessimisticTransaction; |
1752 | | friend class TransactionBaseImpl; |
1753 | | friend class WriteCommittedTxn; |
1754 | | friend class WritePreparedTxn; |
1755 | | friend class WritePreparedTxnDB; |
1756 | | friend class WriteBatchWithIndex; |
1757 | | friend class WriteUnpreparedTxnDB; |
1758 | | friend class WriteUnpreparedTxn; |
1759 | | |
1760 | | friend class ForwardIterator; |
1761 | | friend struct SuperVersion; |
1762 | | friend class CompactedDBImpl; |
1763 | | friend class DBImplFollower; |
1764 | | #ifndef NDEBUG |
1765 | | friend class DBTest_ConcurrentFlushWAL_Test; |
1766 | | friend class DBTest_MixedSlowdownOptionsStop_Test; |
1767 | | friend class DBCompactionTest_CompactBottomLevelFilesWithDeletions_Test; |
1768 | | friend class DBCompactionTest_CompactionDuringShutdown_Test; |
1769 | | friend class DBCompactionTest_DelayCompactBottomLevelFilesWithDeletions_Test; |
1770 | | friend class DBCompactionTest_DisableCompactBottomLevelFiles_Test; |
1771 | | friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test; |
1772 | | friend class DBTest2_ReadCallbackTest_Test; |
1773 | | friend class WriteCallbackPTest_WriteWithCallbackTest_Test; |
1774 | | friend class XFTransactionWriteHandler; |
1775 | | friend class DBBlobIndexTest; |
1776 | | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; |
1777 | | friend class CompactionServiceTest_PreservedOptionsLocalCompaction_Test; |
1778 | | friend class CompactionServiceTest_PreservedOptionsRemoteCompaction_Test; |
1779 | | #endif |
1780 | | |
1781 | | struct CompactionState; |
1782 | | struct PrepickedCompaction; |
1783 | | struct PurgeFileInfo; |
1784 | | |
1785 | | struct WriteContext { |
1786 | | SuperVersionContext superversion_context; |
1787 | | autovector<ReadOnlyMemTable*> memtables_to_free_; |
1788 | | |
1789 | | explicit WriteContext(bool create_superversion = false) |
1790 | 497k | : superversion_context(create_superversion) {} |
1791 | | |
1792 | 497k | ~WriteContext() { |
1793 | 497k | superversion_context.Clean(); |
1794 | 497k | for (auto& m : memtables_to_free_) { |
1795 | 0 | delete m; |
1796 | 0 | } |
1797 | 497k | } |
1798 | | }; |
1799 | | |
1800 | | struct WalFileNumberSize { |
1801 | 74.8k | explicit WalFileNumberSize(uint64_t _number) : number(_number) {} |
1802 | 0 | WalFileNumberSize() {} |
1803 | 495k | void AddSize(uint64_t new_size) { size += new_size; } |
1804 | | uint64_t number; |
1805 | | uint64_t size = 0; |
1806 | | bool getting_flushed = false; |
1807 | | }; |
1808 | | |
1809 | | struct LogWriterNumber { |
1810 | | // pass ownership of _writer |
1811 | | LogWriterNumber(uint64_t _number, log::Writer* _writer) |
1812 | 41.9k | : number(_number), writer(_writer) {} |
1813 | | |
1814 | 1.72k | log::Writer* ReleaseWriter() { |
1815 | 1.72k | auto* w = writer; |
1816 | 1.72k | writer = nullptr; |
1817 | 1.72k | return w; |
1818 | 1.72k | } |
1819 | 40.2k | Status ClearWriter() { |
1820 | 40.2k | Status s; |
1821 | 40.2k | if (writer->file()) { |
1822 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
1823 | 40.2k | s = writer->WriteBuffer(WriteOptions()); |
1824 | 40.2k | if (attempt_truncate_size < SIZE_MAX && |
1825 | 0 | attempt_truncate_size < writer->file()->GetFileSize()) { |
1826 | 0 | Status s2 = writer->file()->writable_file()->Truncate( |
1827 | 0 | attempt_truncate_size, IOOptions{}, nullptr); |
1828 | | // This is just a best effort attempt |
1829 | 0 | s2.PermitUncheckedError(); |
1830 | 0 | } |
1831 | 40.2k | } |
1832 | 40.2k | delete writer; |
1833 | 40.2k | writer = nullptr; |
1834 | 40.2k | return s; |
1835 | 40.2k | } |
1836 | | |
1837 | 1.72k | bool IsSyncing() { return getting_synced; } |
1838 | | |
1839 | 0 | uint64_t GetPreSyncSize() { |
1840 | 0 | assert(getting_synced); |
1841 | 0 | return pre_sync_size; |
1842 | 0 | } |
1843 | | |
1844 | 1.72k | void PrepareForSync() { |
1845 | 1.72k | assert(!getting_synced); |
1846 | | // Ensure the head of logs_ is marked as getting_synced if any is. |
1847 | 1.72k | getting_synced = true; |
1848 | | // If last sync failed on a later WAL, this could be a fully synced |
1849 | | // and closed WAL that just needs to be recorded as synced in the |
1850 | | // manifest. |
1851 | 1.72k | if (writer->file()) { |
1852 | | // Size is expected to be monotonically increasing. |
1853 | 1.72k | assert(writer->file()->GetFlushedSize() >= pre_sync_size); |
1854 | 1.72k | pre_sync_size = writer->file()->GetFlushedSize(); |
1855 | 1.72k | } |
1856 | 1.72k | } |
1857 | | |
1858 | 1.72k | void FinishSync() { |
1859 | 1.72k | assert(getting_synced); |
1860 | 1.72k | getting_synced = false; |
1861 | 1.72k | } |
1862 | | |
1863 | 0 | void SetAttemptTruncateSize(uint64_t size) { |
1864 | 0 | assert(attempt_truncate_size == SIZE_MAX); |
1865 | 0 | attempt_truncate_size = size; |
1866 | 0 | } |
1867 | | |
1868 | | uint64_t number; |
1869 | | // Visual Studio doesn't support deque's member to be noncopyable because |
1870 | | // of a std::unique_ptr as a member. |
1871 | | log::Writer* writer; // own |
1872 | | |
1873 | | private: |
1874 | | // true for some prefix of logs_ |
1875 | | bool getting_synced = false; |
1876 | | // The size of the file before the sync happens. This amount is guaranteed |
1877 | | // to be persisted even if appends happen during sync so it can be used for |
1878 | | // tracking the synced size in MANIFEST. |
1879 | | uint64_t pre_sync_size = 0; |
1880 | | // When < SIZE_MAX, attempt to truncate the WAL to this size on close, |
1881 | | // because a bad entry was written to it beyond that point and it likely |
1882 | | // won't be recoverable with the bad entry. |
1883 | | uint64_t attempt_truncate_size = SIZE_MAX; |
1884 | | }; |
1885 | | |
1886 | | struct WalContext { |
1887 | | explicit WalContext(bool need_sync = false) |
1888 | 495k | : need_wal_sync(need_sync), need_wal_dir_sync(need_sync) {} |
1889 | | bool need_wal_sync = false; |
1890 | | bool need_wal_dir_sync = false; |
1891 | | log::Writer* writer = nullptr; |
1892 | | WalFileNumberSize* wal_file_number_size = nullptr; |
1893 | | uint64_t prev_size = SIZE_MAX; |
1894 | | }; |
1895 | | |
1896 | | // PurgeFileInfo is a structure to hold information of files to be deleted in |
1897 | | // purge_files_ |
1898 | | struct PurgeFileInfo { |
1899 | | std::string fname; |
1900 | | std::string dir_to_sync; |
1901 | | FileType type; |
1902 | | uint64_t number; |
1903 | | int job_id; |
1904 | | PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num, |
1905 | | int jid) |
1906 | 0 | : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {} |
1907 | | }; |
1908 | | |
1909 | | // Argument required by background flush thread. |
1910 | | struct BGFlushArg { |
1911 | | BGFlushArg() |
1912 | | : cfd_(nullptr), |
1913 | | max_memtable_id_(0), |
1914 | | superversion_context_(nullptr), |
1915 | 0 | flush_reason_(FlushReason::kOthers) {} |
1916 | | BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id, |
1917 | | SuperVersionContext* superversion_context, |
1918 | | FlushReason flush_reason, bool atomic_flush) |
1919 | 1.72k | : cfd_(cfd), |
1920 | 1.72k | max_memtable_id_(max_memtable_id), |
1921 | 1.72k | superversion_context_(superversion_context), |
1922 | 1.72k | flush_reason_(flush_reason), |
1923 | 1.72k | atomic_flush_(atomic_flush) {} |
1924 | | |
1925 | | // Column family to flush. |
1926 | | ColumnFamilyData* cfd_; |
1927 | | // Maximum ID of memtable to flush. In this column family, memtables with |
1928 | | // IDs smaller than this value must be flushed before this flush completes. |
1929 | | uint64_t max_memtable_id_; |
1930 | | // Pointer to a SuperVersionContext object. After flush completes, RocksDB |
1931 | | // installs a new superversion for the column family. This operation |
1932 | | // requires a SuperVersionContext object (currently embedded in JobContext). |
1933 | | SuperVersionContext* superversion_context_; |
1934 | | FlushReason flush_reason_; |
1935 | | // Whether this flush should use atomic flush code path. |
1936 | | bool atomic_flush_ = false; |
1937 | | }; |
1938 | | |
1939 | | // Argument passed to flush thread. |
1940 | | struct FlushThreadArg { |
1941 | | DBImpl* db_; |
1942 | | |
1943 | | Env::Priority thread_pri_; |
1944 | | }; |
1945 | | |
1946 | | // Information for a manual compaction |
1947 | | struct ManualCompactionState { |
1948 | | ManualCompactionState(ColumnFamilyData* _cfd, int _input_level, |
1949 | | int _output_level, uint32_t _output_path_id, |
1950 | | bool _exclusive, bool _disallow_trivial_move, |
1951 | | std::atomic<bool>* _canceled) |
1952 | 2.14k | : cfd(_cfd), |
1953 | 2.14k | input_level(_input_level), |
1954 | 2.14k | output_level(_output_level), |
1955 | 2.14k | output_path_id(_output_path_id), |
1956 | 2.14k | exclusive(_exclusive), |
1957 | 2.14k | disallow_trivial_move(_disallow_trivial_move), |
1958 | 2.14k | canceled(_canceled ? *_canceled : canceled_internal_storage) {} |
1959 | | // When _canceled is not provided by ther user, we assign the reference of |
1960 | | // canceled_internal_storage to it to consolidate canceled and |
1961 | | // manual_compaction_paused since DisableManualCompaction() might be |
1962 | | // called |
1963 | | |
1964 | | ColumnFamilyData* cfd; |
1965 | | int input_level; |
1966 | | int output_level; |
1967 | | uint32_t output_path_id; |
1968 | | Status status; |
1969 | | bool done = false; |
1970 | | bool in_progress = false; // compaction request being processed? |
1971 | | bool incomplete = false; // only part of requested range compacted |
1972 | | bool exclusive; // current behavior of only one manual |
1973 | | bool disallow_trivial_move; // Force actual compaction to run |
1974 | | const InternalKey* begin = nullptr; // nullptr means beginning of key range |
1975 | | const InternalKey* end = nullptr; // nullptr means end of key range |
1976 | | InternalKey* manual_end = nullptr; // how far we are compacting |
1977 | | InternalKey tmp_storage; // Used to keep track of compaction progress |
1978 | | InternalKey tmp_storage1; // Used to keep track of compaction progress |
1979 | | |
1980 | | // When the user provides a canceled pointer in CompactRangeOptions, the |
1981 | | // above varaibe is the reference of the user-provided |
1982 | | // `canceled`, otherwise, it is the reference of canceled_internal_storage |
1983 | | std::atomic<bool> canceled_internal_storage = false; |
1984 | | std::atomic<bool>& canceled; // Compaction canceled pointer reference |
1985 | | }; |
1986 | | struct PrepickedCompaction { |
1987 | | // background compaction takes ownership of `compaction`. |
1988 | | // TODO(hx235): consider using std::shared_ptr for easier ownership |
1989 | | // management |
1990 | | Compaction* compaction; |
1991 | | // caller retains ownership of `manual_compaction_state` as it is reused |
1992 | | // across background compactions. |
1993 | | ManualCompactionState* manual_compaction_state; // nullptr if non-manual |
1994 | | // task limiter token is requested during compaction picking. |
1995 | | std::unique_ptr<TaskLimiterToken> task_token; |
1996 | | // If true, `compaction` is picked temporarily to express compaction intent |
1997 | | // and will be released before re-picking a real compaction based on the |
1998 | | // updated LSM shape when thread associated with `compaction` is ready to |
1999 | | // run |
2000 | | bool need_repick; |
2001 | | }; |
2002 | | |
2003 | | struct CompactionArg { |
2004 | | // caller retains ownership of `db`. |
2005 | | DBImpl* db; |
2006 | | // background compaction takes ownership of `prepicked_compaction`. |
2007 | | PrepickedCompaction* prepicked_compaction; |
2008 | | Env::Priority compaction_pri_; |
2009 | | }; |
2010 | | |
2011 | 0 | static bool IsRecoveryFlush(FlushReason flush_reason) { |
2012 | 0 | return flush_reason == FlushReason::kErrorRecoveryRetryFlush || |
2013 | 0 | flush_reason == FlushReason::kErrorRecovery; |
2014 | 0 | } |
2015 | | // Initialize the built-in column family for persistent stats. Depending on |
2016 | | // whether on-disk persistent stats have been enabled before, it may either |
2017 | | // create a new column family and column family handle or just a column family |
2018 | | // handle. |
2019 | | // Required: DB mutex held |
2020 | | Status InitPersistStatsColumnFamily(); |
2021 | | |
2022 | | // Persistent Stats column family has two format version key which are used |
2023 | | // for compatibility check. Write format version if it's created for the |
2024 | | // first time, read format version and check compatibility if recovering |
2025 | | // from disk. This function requires DB mutex held at entrance but may |
2026 | | // release and re-acquire DB mutex in the process. |
2027 | | // Required: DB mutex held |
2028 | | Status PersistentStatsProcessFormatVersion(); |
2029 | | |
2030 | | Status ResumeImpl(DBRecoverContext context); |
2031 | | |
2032 | | void MaybeIgnoreError(Status* s) const; |
2033 | | |
2034 | | const Status CreateArchivalDirectory(); |
2035 | | |
2036 | | // Create a column family, without some of the follow-up work yet |
2037 | | Status CreateColumnFamilyImpl(const ReadOptions& read_options, |
2038 | | const WriteOptions& write_options, |
2039 | | const ColumnFamilyOptions& cf_options, |
2040 | | const std::string& cf_name, |
2041 | | ColumnFamilyHandle** handle); |
2042 | | |
2043 | | // Follow-up work to user creating a column family or (families) |
2044 | | Status WrapUpCreateColumnFamilies( |
2045 | | const WriteOptions& write_options, |
2046 | | const std::vector<const ColumnFamilyOptions*>& cf_options); |
2047 | | |
2048 | | Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family); |
2049 | | |
2050 | | // Delete any unneeded files and stale in-memory entries. |
2051 | | void DeleteObsoleteFiles(); |
2052 | | // Delete obsolete files and log status and information of file deletion |
2053 | | void DeleteObsoleteFileImpl(int job_id, const std::string& fname, |
2054 | | const std::string& path_to_sync, FileType type, |
2055 | | uint64_t number); |
2056 | | |
2057 | | // Background process needs to call |
2058 | | // auto x = CaptureCurrentFileNumberInPendingOutputs() |
2059 | | // auto file_num = versions_->NewFileNumber(); |
2060 | | // <do something> |
2061 | | // ReleaseFileNumberFromPendingOutputs(x) |
2062 | | // This will protect any file with number `file_num` or greater from being |
2063 | | // deleted while <do something> is running. |
2064 | | // ----------- |
2065 | | // This function will capture current file number and append it to |
2066 | | // pending_outputs_. This will prevent any background process to delete any |
2067 | | // file created after this point. |
2068 | | std::list<uint64_t>::iterator CaptureCurrentFileNumberInPendingOutputs(); |
2069 | | // This function should be called with the result of |
2070 | | // CaptureCurrentFileNumberInPendingOutputs(). It then marks that any file |
2071 | | // created between the calls CaptureCurrentFileNumberInPendingOutputs() and |
2072 | | // ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live |
2073 | | // and blocked by any other pending_outputs_ calls) |
2074 | | void ReleaseFileNumberFromPendingOutputs( |
2075 | | std::unique_ptr<std::list<uint64_t>::iterator>& v); |
2076 | | |
2077 | | // Similar to pending_outputs, preserve OPTIONS file. Used for remote |
2078 | | // compaction. |
2079 | | std::list<uint64_t>::iterator CaptureOptionsFileNumber(); |
2080 | | void ReleaseOptionsFileNumber( |
2081 | | std::unique_ptr<std::list<uint64_t>::iterator>& v); |
2082 | | |
2083 | | // Sets bg error if there is an error writing to WAL. |
2084 | | IOStatus SyncClosedWals(const WriteOptions& write_options, |
2085 | | JobContext* job_context, VersionEdit* synced_wals, |
2086 | | bool error_recovery_in_prog); |
2087 | | |
2088 | | // Flush the in-memory write buffer to storage. Switches to a new |
2089 | | // log-file/memtable and writes a new descriptor iff successful. Then |
2090 | | // installs a new super version for the column family. |
2091 | | Status FlushMemTableToOutputFile(ColumnFamilyData* cfd, |
2092 | | const MutableCFOptions& mutable_cf_options, |
2093 | | bool* madeProgress, JobContext* job_context, |
2094 | | FlushReason flush_reason, |
2095 | | SuperVersionContext* superversion_context, |
2096 | | LogBuffer* log_buffer, |
2097 | | Env::Priority thread_pri); |
2098 | | |
2099 | | // Flush the memtables of (multiple) column families to multiple files on |
2100 | | // persistent storage. |
2101 | | Status FlushMemTablesToOutputFiles( |
2102 | | const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, |
2103 | | JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); |
2104 | | |
2105 | | Status AtomicFlushMemTablesToOutputFiles( |
2106 | | const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, |
2107 | | JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); |
2108 | | |
2109 | | // REQUIRES: log_numbers are sorted in ascending order |
2110 | | // corrupted_wal_found is set to true if we recover from a corrupted log file. |
2111 | | Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers, |
2112 | | SequenceNumber* next_sequence, bool read_only, |
2113 | | bool is_retry, bool* corrupted_wal_found, |
2114 | | RecoveryContext* recovery_ctx); |
2115 | | |
2116 | | void SetupLogFilesRecovery( |
2117 | | const std::vector<uint64_t>& wal_numbers, |
2118 | | std::unordered_map<int, VersionEdit>* version_edits, int* job_id, |
2119 | | uint64_t* min_wal_number); |
2120 | | |
2121 | | Status ProcessLogFiles(const std::vector<uint64_t>& wal_numbers, |
2122 | | bool read_only, bool is_retry, uint64_t min_wal_number, |
2123 | | int job_id, SequenceNumber* next_sequence, |
2124 | | std::unordered_map<int, VersionEdit>* version_edits, |
2125 | | bool* corrupted_wal_found, |
2126 | | RecoveryContext* recovery_ctx); |
2127 | | |
2128 | | Status ProcessLogFile( |
2129 | | uint64_t wal_number, uint64_t min_wal_number, bool is_retry, |
2130 | | bool read_only, int job_id, SequenceNumber* next_sequence, |
2131 | | bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter, |
2132 | | uint64_t* corrupted_wal_number, bool* corrupted_wal_found, |
2133 | | std::unordered_map<int, VersionEdit>* version_edits, bool* flushed, |
2134 | | PredecessorWALInfo& predecessor_wal_info); |
2135 | | |
2136 | | void SetupLogFileProcessing(uint64_t wal_number); |
2137 | | |
2138 | | Status InitializeLogReader(uint64_t wal_number, bool is_retry, |
2139 | | std::string& fname, |
2140 | | |
2141 | | bool stop_replay_for_corruption, |
2142 | | uint64_t min_wal_number, |
2143 | | const PredecessorWALInfo& predecessor_wal_info, |
2144 | | bool* const old_log_record, |
2145 | | Status* const reporter_status, |
2146 | | DBOpenLogRecordReadReporter* reporter, |
2147 | | std::unique_ptr<log::Reader>& reader); |
2148 | | Status ProcessLogRecord( |
2149 | | Slice record, const std::unique_ptr<log::Reader>& reader, |
2150 | | const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number, |
2151 | | const std::string& fname, bool read_only, int job_id, |
2152 | | const std::function<void()>& logFileDropped, |
2153 | | DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum, |
2154 | | SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence, |
2155 | | bool* stop_replay_for_corruption, Status* status, |
2156 | | bool* stop_replay_by_wal_filter, |
2157 | | std::unordered_map<int, VersionEdit>* version_edits, bool* flushed); |
2158 | | |
2159 | | Status InitializeWriteBatchForLogRecord( |
2160 | | Slice record, const std::unique_ptr<log::Reader>& reader, |
2161 | | const UnorderedMap<uint32_t, size_t>& running_ts_sz, WriteBatch* batch, |
2162 | | std::unique_ptr<WriteBatch>& new_batch, WriteBatch*& batch_to_use, |
2163 | | uint64_t* record_checksum); |
2164 | | |
2165 | | void MaybeReviseStopReplayForCorruption( |
2166 | | SequenceNumber sequence, SequenceNumber const* const next_sequence, |
2167 | | bool* stop_replay_for_corruption); |
2168 | | |
2169 | | Status InsertLogRecordToMemtable(WriteBatch* batch_to_use, |
2170 | | uint64_t wal_number, |
2171 | | SequenceNumber* next_sequence, |
2172 | | bool* has_valid_writes, bool read_only); |
2173 | | |
2174 | | Status MaybeWriteLevel0TableForRecovery( |
2175 | | bool has_valid_writes, bool read_only, uint64_t wal_number, int job_id, |
2176 | | SequenceNumber const* const next_sequence, |
2177 | | std::unordered_map<int, VersionEdit>* version_edits, bool* flushed); |
2178 | | |
2179 | | Status HandleNonOkStatusOrOldLogRecord( |
2180 | | uint64_t wal_number, SequenceNumber const* const next_sequence, |
2181 | | Status status, const DBOpenLogRecordReadReporter& reporter, |
2182 | | bool* old_log_record, bool* stop_replay_for_corruption, |
2183 | | uint64_t* corrupted_wal_number, bool* corrupted_wal_found); |
2184 | | |
2185 | | Status UpdatePredecessorWALInfo(uint64_t wal_number, |
2186 | | const SequenceNumber last_seqno_observed, |
2187 | | const std::string& fname, |
2188 | | PredecessorWALInfo& predecessor_wal_info); |
2189 | | |
2190 | | void FinishLogFileProcessing(const Status& status, |
2191 | | const SequenceNumber* next_sequence); |
2192 | | |
2193 | | // Return `Status::Corruption()` when `stop_replay_for_corruption == true` and |
2194 | | // exits inconsistency between SST and WAL data |
2195 | | Status MaybeHandleStopReplayForCorruptionForInconsistency( |
2196 | | bool stop_replay_for_corruption, uint64_t corrupted_wal_number); |
2197 | | |
2198 | | Status MaybeFlushFinalMemtableOrRestoreActiveLogFiles( |
2199 | | const std::vector<uint64_t>& wal_numbers, bool read_only, int job_id, |
2200 | | bool flushed, std::unordered_map<int, VersionEdit>* version_edits, |
2201 | | RecoveryContext* recovery_ctx); |
2202 | | |
2203 | | // Check that DB sequence number is not set back during recovery between |
2204 | | // replaying of WAL files and between replaying of WriteBatches. |
2205 | | Status CheckSeqnoNotSetBackDuringRecovery(SequenceNumber prev_next_seqno, |
2206 | | SequenceNumber current_next_seqno); |
2207 | | |
2208 | | void FinishLogFilesRecovery(int job_id, const Status& status); |
2209 | | // The following two methods are used to flush a memtable to |
2210 | | // storage. The first one is used at database RecoveryTime (when the |
2211 | | // database is opened) and is heavyweight because it holds the mutex |
2212 | | // for the entire period. The second method WriteLevel0Table supports |
2213 | | // concurrent flush memtables to storage. |
2214 | | Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, |
2215 | | MemTable* mem, VersionEdit* edit); |
2216 | | |
2217 | | // Get the size of a log file and, if truncate is true, truncate the |
2218 | | // log file to its actual size, thereby freeing preallocated space. |
2219 | | // Return success even if truncate fails |
2220 | | Status GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate, |
2221 | | WalFileNumberSize* log); |
2222 | | |
2223 | | // Restore alive_wal_files_ and wals_total_size_ after recovery. |
2224 | | // It needs to run only when there's no flush during recovery |
2225 | | // (e.g. avoid_flush_during_recovery=true). May also trigger flush |
2226 | | // in case wals_total_size > max_total_wal_size. |
2227 | | Status RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers); |
2228 | | |
2229 | | // num_bytes: for slowdown case, delay time is calculated based on |
2230 | | // `num_bytes` going through. |
2231 | | Status DelayWrite(uint64_t num_bytes, WriteThread& write_thread, |
2232 | | const WriteOptions& write_options); |
2233 | | |
2234 | | // Begin stalling of writes when memory usage increases beyond a certain |
2235 | | // threshold. |
2236 | | void WriteBufferManagerStallWrites(); |
2237 | | |
2238 | | Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, |
2239 | | WriteBatch* my_batch); |
2240 | | |
2241 | | // REQUIRES: mutex locked and in write thread. |
2242 | | Status ScheduleFlushes(WriteContext* context); |
2243 | | |
2244 | | void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds); |
2245 | | |
2246 | | Status TrimMemtableHistory(WriteContext* context); |
2247 | | |
2248 | | // Switches the current live memtable to immutable/read-only memtable. |
2249 | | // A new WAL is created if the current WAL is not empty. |
2250 | | // If `new_imm` is not nullptr, it will be added as the newest immutable |
2251 | | // memtable, if and only if OK status is returned. |
2252 | | // `last_seqno` needs to be provided if `new_imm` is not nullptr. It is |
2253 | | // the value of versions_->LastSequence() after the write that ingests new_imm |
2254 | | // is done. |
2255 | | // |
2256 | | // REQUIRES: mutex_ is held |
2257 | | // REQUIRES: this thread is currently at the front of the writer queue |
2258 | | // REQUIRES: this thread is currently at the front of the 2nd writer queue if |
2259 | | // two_write_queues_ is true (This is to simplify the reasoning.) |
2260 | | Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, |
2261 | | ReadOnlyMemTable* new_imm = nullptr, |
2262 | | SequenceNumber last_seqno = 0); |
2263 | | |
2264 | | // Select and output column families qualified for atomic flush in |
2265 | | // `selected_cfds`. If `provided_candidate_cfds` is non-empty, it will be used |
2266 | | // as candidate CFs to select qualified ones from. Otherwise, all column |
2267 | | // families are used as candidate to select from. |
2268 | | // |
2269 | | // REQUIRES: mutex held |
2270 | | void SelectColumnFamiliesForAtomicFlush( |
2271 | | autovector<ColumnFamilyData*>* selected_cfds, |
2272 | | const autovector<ColumnFamilyData*>& provided_candidate_cfds = {}, |
2273 | | FlushReason flush_reason = FlushReason::kOthers); |
2274 | | |
2275 | | // Force current memtable contents to be flushed. |
2276 | | Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, |
2277 | | FlushReason flush_reason, |
2278 | | bool entered_write_thread = false); |
2279 | | |
2280 | | // Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds` |
2281 | | // (if non-empty) or amomg all column families and atomically record the |
2282 | | // result to the MANIFEST. |
2283 | | Status AtomicFlushMemTables( |
2284 | | const FlushOptions& options, FlushReason flush_reason, |
2285 | | const autovector<ColumnFamilyData*>& provided_candidate_cfds = {}, |
2286 | | bool entered_write_thread = false); |
2287 | | |
2288 | | Status RetryFlushesForErrorRecovery(FlushReason flush_reason, bool wait); |
2289 | | |
2290 | | // Wait until flushing this column family won't stall writes |
2291 | | Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, |
2292 | | bool* flush_needed); |
2293 | | |
2294 | | // Wait for memtable flushed. |
2295 | | // If flush_memtable_id is non-null, wait until the memtable with the ID |
2296 | | // gets flush. Otherwise, wait until the column family don't have any |
2297 | | // memtable pending flush. |
2298 | | // resuming_from_bg_err indicates whether the caller is attempting to resume |
2299 | | // from background error. |
2300 | | Status WaitForFlushMemTable( |
2301 | | ColumnFamilyData* cfd, const uint64_t* flush_memtable_id = nullptr, |
2302 | | bool resuming_from_bg_err = false, |
2303 | 0 | std::optional<FlushReason> flush_reason = std::nullopt) { |
2304 | 0 | return WaitForFlushMemTables({cfd}, {flush_memtable_id}, |
2305 | 0 | resuming_from_bg_err, flush_reason); |
2306 | 0 | } |
2307 | | // Wait for memtables to be flushed for multiple column families. |
2308 | | Status WaitForFlushMemTables( |
2309 | | const autovector<ColumnFamilyData*>& cfds, |
2310 | | const autovector<const uint64_t*>& flush_memtable_ids, |
2311 | | bool resuming_from_bg_err, std::optional<FlushReason> flush_reason); |
2312 | | |
2313 | 1.72k | inline void WaitForPendingWrites() { |
2314 | 1.72k | mutex_.AssertHeld(); |
2315 | 1.72k | TEST_SYNC_POINT("DBImpl::WaitForPendingWrites:BeforeBlock"); |
2316 | | // In case of pipelined write is enabled, wait for all pending memtable |
2317 | | // writers. |
2318 | 1.72k | if (immutable_db_options_.enable_pipelined_write) { |
2319 | | // Memtable writers may call DB::Get in case max_successive_merges > 0, |
2320 | | // which may lock mutex. Unlocking mutex here to avoid deadlock. |
2321 | 0 | mutex_.Unlock(); |
2322 | 0 | write_thread_.WaitForMemTableWriters(); |
2323 | 0 | mutex_.Lock(); |
2324 | 0 | } |
2325 | | |
2326 | 1.72k | if (immutable_db_options_.unordered_write) { |
2327 | | // Wait for the ones who already wrote to the WAL to finish their |
2328 | | // memtable write. |
2329 | 0 | if (pending_memtable_writes_.load() != 0) { |
2330 | | // XXX: suspicious wait while holding DB mutex? |
2331 | 0 | std::unique_lock<std::mutex> guard(switch_mutex_); |
2332 | 0 | switch_cv_.wait(guard, |
2333 | 0 | [&] { return pending_memtable_writes_.load() == 0; }); |
2334 | 0 | } |
2335 | 1.72k | } else { |
2336 | | // (Writes are finished before the next write group starts.) |
2337 | 1.72k | } |
2338 | | |
2339 | | // Wait for any LockWAL to clear |
2340 | 1.72k | while (lock_wal_count_ > 0) { |
2341 | 0 | bg_cv_.Wait(); |
2342 | 0 | } |
2343 | 1.72k | } |
2344 | | |
2345 | | // TaskType is used to identify tasks in thread-pool, currently only |
2346 | | // differentiate manual compaction, which could be unscheduled from the |
2347 | | // thread-pool. |
2348 | | enum class TaskType : uint8_t { |
2349 | | kDefault = 0, |
2350 | | kManualCompaction = 1, |
2351 | | kCount = 2, |
2352 | | }; |
2353 | | |
2354 | | // Task tag is used to identity tasks in thread-pool, which is |
2355 | | // dbImpl obj address + type |
2356 | 2.02k | inline void* GetTaskTag(TaskType type) { |
2357 | 2.02k | return GetTaskTag(static_cast<uint8_t>(type)); |
2358 | 2.02k | } |
2359 | | |
2360 | 243k | inline void* GetTaskTag(uint8_t type) { |
2361 | 243k | return static_cast<uint8_t*>(static_cast<void*>(this)) + type; |
2362 | 243k | } |
2363 | | |
2364 | | // REQUIRES: mutex locked and in write thread. |
2365 | | void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds); |
2366 | | |
2367 | | // REQUIRES: mutex locked and in write thread. |
2368 | | Status SwitchWAL(WriteContext* write_context); |
2369 | | |
2370 | | // REQUIRES: mutex locked and in write thread. |
2371 | | Status HandleWriteBufferManagerFlush(WriteContext* write_context); |
2372 | | |
2373 | | // REQUIRES: mutex locked |
2374 | | Status PreprocessWrite(const WriteOptions& write_options, |
2375 | | WalContext* log_context, WriteContext* write_context); |
2376 | | |
2377 | | // Merge write batches in the write group into merged_batch. |
2378 | | // Returns OK if merge is successful. |
2379 | | // Returns Corruption if corruption in write batch is detected. |
2380 | | Status MergeBatch(const WriteThread::WriteGroup& write_group, |
2381 | | WriteBatch* tmp_batch, WriteBatch** merged_batch, |
2382 | | size_t* write_with_wal, WriteBatch** to_be_cached_state); |
2383 | | |
2384 | | IOStatus WriteToWAL(const WriteBatch& merged_batch, |
2385 | | const WriteOptions& write_options, |
2386 | | log::Writer* log_writer, uint64_t* wal_used, |
2387 | | uint64_t* log_size, |
2388 | | WalFileNumberSize& wal_file_number_size, |
2389 | | SequenceNumber sequence); |
2390 | | |
2391 | | IOStatus WriteGroupToWAL(const WriteThread::WriteGroup& write_group, |
2392 | | log::Writer* log_writer, uint64_t* wal_used, |
2393 | | bool need_wal_sync, bool need_wal_dir_sync, |
2394 | | SequenceNumber sequence, |
2395 | | WalFileNumberSize& wal_file_number_size); |
2396 | | |
2397 | | IOStatus ConcurrentWriteGroupToWAL(const WriteThread::WriteGroup& write_group, |
2398 | | uint64_t* wal_used, |
2399 | | SequenceNumber* last_sequence, |
2400 | | size_t seq_inc); |
2401 | | |
2402 | | // Used by WriteImpl to update bg_error_ if paranoid check is enabled. |
2403 | | // Caller must hold mutex_. |
2404 | | void WriteStatusCheckOnLocked(const Status& status); |
2405 | | |
2406 | | // Used by WriteImpl to update bg_error_ if paranoid check is enabled. |
2407 | | void WriteStatusCheck(const Status& status); |
2408 | | |
2409 | | // Used by WriteImpl to update bg_error_ when IO error happens, e.g., write |
2410 | | // WAL, sync WAL fails, if paranoid check is enabled. |
2411 | | void WALIOStatusCheck(const IOStatus& status); |
2412 | | |
2413 | | // Used by WriteImpl to update bg_error_ in case of memtable insert error. |
2414 | | void HandleMemTableInsertFailure(const Status& nonok_memtable_insert_status); |
2415 | | |
2416 | | Status CompactFilesImpl(const CompactionOptions& compact_options, |
2417 | | ColumnFamilyData* cfd, Version* version, |
2418 | | const std::vector<std::string>& input_file_names, |
2419 | | std::vector<std::string>* const output_file_names, |
2420 | | const int output_level, int output_path_id, |
2421 | | JobContext* job_context, LogBuffer* log_buffer, |
2422 | | CompactionJobInfo* compaction_job_info); |
2423 | | |
2424 | | // Helper function to perform trivial move by updating manifest metadata |
2425 | | // without rewriting data files. This is called when IsTrivialMove() is true. |
2426 | | // REQUIRES: mutex held |
2427 | | // Returns: Status of the trivial move operation |
2428 | | Status PerformTrivialMove(Compaction& c, LogBuffer* log_buffer, |
2429 | | bool& compaction_released, size_t& moved_files, |
2430 | | size_t& moved_bytes); |
2431 | | |
2432 | | // REQUIRES: mutex unlocked |
2433 | | void TrackOrUntrackFiles(const std::vector<std::string>& existing_data_files, |
2434 | | bool track); |
2435 | | |
2436 | | void MaybeScheduleFlushOrCompaction(); |
2437 | | |
2438 | | BackgroundJobPressure CaptureBackgroundJobPressure() const; |
2439 | | void NotifyOnBackgroundJobPressureChanged(); |
2440 | | |
2441 | | struct FlushRequest { |
2442 | | FlushReason flush_reason; |
2443 | | // Whether this flush request should use the atomic flush code path. |
2444 | | bool atomic_flush = false; |
2445 | | // A map from column family to flush to largest memtable id to persist for |
2446 | | // each column family. Once all the memtables whose IDs are smaller than or |
2447 | | // equal to this per-column-family specified value, this flush request is |
2448 | | // considered to have completed its work of flushing this column family. |
2449 | | // After completing the work for all column families in this request, this |
2450 | | // flush is considered complete. |
2451 | | std::unordered_map<ColumnFamilyData*, uint64_t> |
2452 | | cfd_to_max_mem_id_to_persist; |
2453 | | |
2454 | | #ifndef NDEBUG |
2455 | | int reschedule_count = 1; |
2456 | | #endif /* !NDEBUG */ |
2457 | | }; |
2458 | | |
2459 | | // In case of atomic flush, generates a `FlushRequest` for the latest atomic |
2460 | | // cuts for these `cfds`. Atomic cuts are recorded in |
2461 | | // `AssignAtomicFlushSeq()`. For each entry in `cfds`, all CFDs sharing the |
2462 | | // same latest atomic cut must also be present. |
2463 | | // |
2464 | | // REQUIRES: mutex held |
2465 | | void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds, |
2466 | | FlushReason flush_reason, FlushRequest* req); |
2467 | | |
2468 | | // Below functions are for executing flush, compaction in the background. A |
2469 | | // dequeue is the communication channel between threads that asks for the work |
2470 | | // to be done and the available threads in the thread pool that pick it up to |
2471 | | // execute it. We use these terminologies to describe the state of the work |
2472 | | // and its transitions: |
2473 | | // 1) It becomes pending once it's successfully enqueued into the |
2474 | | // corresponding dequeue, a work in this state is also called unscheduled. |
2475 | | // Counter `unscheduled_*_` counts work in this state. |
2476 | | // 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*` |
2477 | | // for the work, it becomes scheduled |
2478 | | // Counter `bg_*_scheduled_` counts work in this state. |
2479 | | // 3) Once the thread start to execute `BGWork*`, the work is popped from the |
2480 | | // dequeue, it is now in running state |
2481 | | // Counter `num_running_*_` counts work in this state. |
2482 | | // 4) Eventually, the work is finished. We don't need to specifically track |
2483 | | // finished work. |
2484 | | |
2485 | | // Returns true if `req` is successfully enqueued. |
2486 | | bool EnqueuePendingFlush(const FlushRequest& req); |
2487 | | |
2488 | | void EnqueuePendingCompaction(ColumnFamilyData* cfd); |
2489 | | void SchedulePendingPurge(std::string fname, std::string dir_to_sync, |
2490 | | FileType type, uint64_t number, int job_id); |
2491 | | static void BGWorkCompaction(void* arg); |
2492 | | // Runs a pre-chosen universal compaction involving bottom level in a |
2493 | | // separate, bottom-pri thread pool. |
2494 | | static void BGWorkBottomCompaction(void* arg); |
2495 | | static void BGWorkFlush(void* arg); |
2496 | | static void BGWorkPurge(void* arg); |
2497 | | static void UnscheduleCompactionCallback(void* arg); |
2498 | | static void UnscheduleFlushCallback(void* arg); |
2499 | | void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, |
2500 | | Env::Priority thread_pri); |
2501 | | void BackgroundCallFlush(Env::Priority thread_pri); |
2502 | | void BackgroundCallPurge(); |
2503 | | Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, |
2504 | | LogBuffer* log_buffer, |
2505 | | PrepickedCompaction* prepicked_compaction, |
2506 | | Env::Priority thread_pri); |
2507 | | Status BackgroundFlush(bool* madeProgress, JobContext* job_context, |
2508 | | LogBuffer* log_buffer, FlushReason* reason, |
2509 | | bool* flush_rescheduled_to_retain_udt, |
2510 | | Env::Priority thread_pri); |
2511 | | |
2512 | | Compaction* CreateIntendedCompactionForwardedToBottomPriorityPool( |
2513 | | Compaction* c); |
2514 | | bool EnoughRoomForCompaction(ColumnFamilyData* cfd, |
2515 | | const std::vector<CompactionInputFiles>& inputs, |
2516 | | bool* sfm_bookkeeping, LogBuffer* log_buffer); |
2517 | | |
2518 | | // Request compaction tasks token from compaction thread limiter. |
2519 | | // It always succeeds if force = true or limiter is disable. |
2520 | | bool RequestCompactionToken(ColumnFamilyData* cfd, bool force, |
2521 | | std::unique_ptr<TaskLimiterToken>* token, |
2522 | | LogBuffer* log_buffer); |
2523 | | |
2524 | | // Return true if the `FlushRequest` can be rescheduled to retain the UDT. |
2525 | | // Only true if there are user-defined timestamps in the involved MemTables |
2526 | | // with newer than cutoff timestamp `full_history_ts_low` and not flushing |
2527 | | // immediately will not cause entering write stall mode. |
2528 | | bool ShouldRescheduleFlushRequestToRetainUDT(const FlushRequest& flush_req); |
2529 | | |
2530 | | // Schedule background tasks |
2531 | | Status StartPeriodicTaskScheduler(); |
2532 | | |
2533 | | // Compute the repeat period for the kTriggerCompaction task, which ensures |
2534 | | // compactions not dependent on writes (flushes) are eventually triggered when |
2535 | | // there are no writes (flushes). NOT thread safe; only called during DB open |
2536 | | // (StartPeriodicTaskScheduler). KNOWN LIMITATION: doesn't get updated with |
2537 | | // dynamic option updates. (Probably not worth the extra complexity.) |
2538 | | uint64_t ComputeTriggerCompactionPeriod(); |
2539 | | |
2540 | | // Cancel scheduled periodic tasks |
2541 | | Status CancelPeriodicTaskScheduler(); |
2542 | | |
2543 | | Status RegisterRecordSeqnoTimeWorker(); |
2544 | | |
2545 | | void PrintStatistics(); |
2546 | | |
2547 | | size_t EstimateInMemoryStatsHistorySize() const; |
2548 | | |
2549 | | // Return the minimum empty level that could hold the total data in the |
2550 | | // input level. Return the input level, if such level could not be found. |
2551 | | int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, |
2552 | | const MutableCFOptions& mutable_cf_options, |
2553 | | int level); |
2554 | | |
2555 | | // Move the files in the input level to the target level. |
2556 | | // If target_level < 0, automatically calculate the minimum level that could |
2557 | | // hold the data set. |
2558 | | Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1); |
2559 | | |
2560 | | // helper functions for adding and removing from flush & compaction queues |
2561 | | void AddToCompactionQueue(ColumnFamilyData* cfd); |
2562 | | ColumnFamilyData* PopFirstFromCompactionQueue(); |
2563 | | FlushRequest PopFirstFromFlushQueue(); |
2564 | | |
2565 | | // Pick the first unthrottled compaction with task token from queue. |
2566 | | ColumnFamilyData* PickCompactionFromQueue( |
2567 | | std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer); |
2568 | | |
2569 | | IOStatus SyncWalImpl(bool include_current_wal, |
2570 | | const WriteOptions& write_options, |
2571 | | JobContext* job_context, VersionEdit* synced_wals, |
2572 | | bool error_recovery_in_prog); |
2573 | | |
2574 | | // helper function to call after some of the logs_ were synced |
2575 | | void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit); |
2576 | | Status ApplyWALToManifest(const ReadOptions& read_options, |
2577 | | const WriteOptions& write_options, |
2578 | | VersionEdit* edit); |
2579 | | // WALs with log number up to up_to are not synced successfully. |
2580 | | void MarkLogsNotSynced(uint64_t up_to); |
2581 | | |
2582 | | SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary, |
2583 | | bool lock = true); |
2584 | | |
2585 | | // If snapshot_seq != kMaxSequenceNumber, then this function can only be |
2586 | | // called from the write thread that publishes sequence numbers to readers. |
2587 | | // For 1) write-committed, or 2) write-prepared + one-write-queue, this will |
2588 | | // be the write thread performing memtable writes. For write-prepared with |
2589 | | // two write queues, this will be the write thread writing commit marker to |
2590 | | // the WAL. |
2591 | | // If snapshot_seq == kMaxSequenceNumber, this function is called by a caller |
2592 | | // ensuring no writes to the database. |
2593 | | std::pair<Status, std::shared_ptr<const SnapshotImpl>> |
2594 | | CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, |
2595 | | bool lock = true); |
2596 | | |
2597 | | uint64_t GetMaxTotalWalSize() const; |
2598 | | |
2599 | | FSDirectory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const; |
2600 | | |
2601 | | Status MaybeReleaseTimestampedSnapshotsAndCheck(); |
2602 | | |
2603 | | Status CloseHelper(); |
2604 | | |
2605 | | void WaitForBackgroundWork(); |
2606 | | |
2607 | | // Background threads call this function, which is just a wrapper around |
2608 | | // the InstallSuperVersion() function. Background threads carry |
2609 | | // sv_context to allow allocation of SuperVersion object outside of holding |
2610 | | // the DB mutex. |
2611 | | // All ColumnFamily state changes go through this function. Here we analyze |
2612 | | // the new state and we schedule background work if we detect that the new |
2613 | | // state needs flush or compaction. |
2614 | | // See also InstallSuperVersionForConfigChange(). |
2615 | | void InstallSuperVersionAndScheduleWork( |
2616 | | ColumnFamilyData* cfd, SuperVersionContext* sv_context, |
2617 | | std::optional<std::shared_ptr<SeqnoToTimeMapping>> |
2618 | | new_seqno_to_time_mapping = {}); |
2619 | | |
2620 | | // A variant of InstallSuperVersionAndScheduleWork() that must be used for |
2621 | | // new CFs or for changes to mutable_cf_options. This is so that it can |
2622 | | // update seqno_to_time_mapping cached for the new SuperVersion as relevant. |
2623 | | void InstallSuperVersionForConfigChange(ColumnFamilyData* cfd, |
2624 | | SuperVersionContext* sv_context); |
2625 | | |
2626 | | bool GetIntPropertyInternal(ColumnFamilyData* cfd, |
2627 | | const DBPropertyInfo& property_info, |
2628 | | bool is_locked, uint64_t* value); |
2629 | | bool GetPropertyHandleOptionsStatistics(std::string* value); |
2630 | | |
2631 | | bool HasPendingManualCompaction(); |
2632 | | bool HasExclusiveManualCompaction(); |
2633 | | void AddManualCompaction(ManualCompactionState* m); |
2634 | | void RemoveManualCompaction(ManualCompactionState* m); |
2635 | | bool ShouldntRunManualCompaction(ManualCompactionState* m); |
2636 | | bool HaveManualCompaction(ColumnFamilyData* cfd); |
2637 | | bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1); |
2638 | | void UpdateFIFOCompactionStatus(const std::unique_ptr<Compaction>& c); |
2639 | | |
2640 | | // May open and read table files for table property. |
2641 | | // Should not be called while holding mutex_. |
2642 | | void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c, |
2643 | | const Status& st, |
2644 | | const CompactionJobStats& compaction_job_stats, |
2645 | | const int job_id, |
2646 | | CompactionJobInfo* compaction_job_info) const; |
2647 | | // Reserve the next 'num' file numbers for to-be-ingested external SST files, |
2648 | | // and return the current file_number in 'next_file_number'. |
2649 | | // Write a version edit to the MANIFEST. |
2650 | | Status ReserveFileNumbersBeforeIngestion( |
2651 | | ColumnFamilyData* cfd, uint64_t num, |
2652 | | std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem, |
2653 | | uint64_t* next_file_number); |
2654 | | |
2655 | | bool ShouldPurge(uint64_t file_number) const; |
2656 | | void MarkAsGrabbedForPurge(uint64_t file_number); |
2657 | | |
2658 | | size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; |
2659 | 41.9k | Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; } |
2660 | | |
2661 | | IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num, |
2662 | | uint64_t recycle_log_number, size_t preallocate_block_size, |
2663 | | const PredecessorWALInfo& predecessor_wal_info, |
2664 | | log::Writer** new_log); |
2665 | | |
2666 | | // Validate self-consistency of DB options |
2667 | | static Status ValidateOptions(const DBOptions& db_options); |
2668 | | // Validate self-consistency of DB options and its consistency with cf options |
2669 | | static Status ValidateOptions( |
2670 | | const DBOptions& db_options, |
2671 | | const std::vector<ColumnFamilyDescriptor>& column_families); |
2672 | | |
2673 | | // Utility function to do some debug validation and sort the given vector |
2674 | | // of MultiGet keys |
2675 | | void PrepareMultiGetKeys( |
2676 | | const size_t num_keys, bool sorted, |
2677 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs); |
2678 | | |
2679 | | void MultiGetCommon(const ReadOptions& options, |
2680 | | ColumnFamilyHandle* column_family, const size_t num_keys, |
2681 | | const Slice* keys, PinnableSlice* values, |
2682 | | PinnableWideColumns* columns, std::string* timestamps, |
2683 | | Status* statuses, bool sorted_input); |
2684 | | |
2685 | | void MultiGetCommon(const ReadOptions& options, const size_t num_keys, |
2686 | | ColumnFamilyHandle** column_families, const Slice* keys, |
2687 | | PinnableSlice* values, PinnableWideColumns* columns, |
2688 | | std::string* timestamps, Status* statuses, |
2689 | | bool sorted_input); |
2690 | | |
2691 | | // A structure to hold the information required to process MultiGet of keys |
2692 | | // belonging to one column family. For a multi column family MultiGet, there |
2693 | | // will be a container of these objects. |
2694 | | struct MultiGetKeyRangePerCf { |
2695 | | // For the batched MultiGet which relies on sorted keys, start specifies |
2696 | | // the index of first key belonging to this column family in the sorted |
2697 | | // list. |
2698 | | size_t start; |
2699 | | |
2700 | | // For the batched MultiGet case, num_keys specifies the number of keys |
2701 | | // belonging to this column family in the sorted list |
2702 | | size_t num_keys; |
2703 | | |
2704 | 0 | MultiGetKeyRangePerCf() : start(0), num_keys(0) {} |
2705 | | |
2706 | | MultiGetKeyRangePerCf(size_t first, size_t count) |
2707 | 0 | : start(first), num_keys(count) {} |
2708 | | }; |
2709 | | |
2710 | | // A structure to contain ColumnFamilyData and the SuperVersion obtained for |
2711 | | // the consistent view of DB |
2712 | | struct ColumnFamilySuperVersionPair { |
2713 | | ColumnFamilyHandleImpl* cfh; |
2714 | | ColumnFamilyData* cfd; |
2715 | | |
2716 | | // SuperVersion for the column family obtained in a manner that ensures a |
2717 | | // consistent view across all column families in the DB |
2718 | | SuperVersion* super_version; |
2719 | | ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family, |
2720 | | SuperVersion* sv) |
2721 | 0 | : cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)), |
2722 | 0 | cfd(cfh->cfd()), |
2723 | 0 | super_version(sv) {} |
2724 | | |
2725 | | ColumnFamilySuperVersionPair() = default; |
2726 | | }; |
2727 | | |
2728 | | // A common function to obtain a consistent snapshot, which can be implicit |
2729 | | // if the user doesn't specify a snapshot in read_options, across |
2730 | | // multiple column families. It will attempt to get an implicit |
2731 | | // snapshot without acquiring the db_mutes, but will give up after a few |
2732 | | // tries and acquire the mutex if a memtable flush happens. The template |
2733 | | // allows both the batched and non-batched MultiGet to call this with |
2734 | | // either an std::unordered_map or autovector of column families. |
2735 | | // |
2736 | | // If callback is non-null, the callback is refreshed with the snapshot |
2737 | | // sequence number |
2738 | | // |
2739 | | // `extra_sv_ref` is used to indicate whether thread-local SuperVersion |
2740 | | // should be obtained with an extra ref (by GetReferencedSuperVersion()) or |
2741 | | // not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet |
2742 | | // does not require SuperVersion to be re-acquired throughout the entire |
2743 | | // invocation (no need extra ref), while MultiCfIterators may need the |
2744 | | // SuperVersion to be updated during Refresh() (requires extra ref). |
2745 | | // |
2746 | | // `sv_from_thread_local` being set to false indicates that the SuperVersion |
2747 | | // obtained from the ColumnFamilyData, whereas true indicates they are thread |
2748 | | // local. |
2749 | | // |
2750 | | // A non-OK status will be returned if for a column family that enables |
2751 | | // user-defined timestamp feature, the specified `ReadOptions.timestamp` |
2752 | | // attemps to read collapsed history. |
2753 | | template <class T, typename IterDerefFuncType> |
2754 | | Status MultiCFSnapshot(const ReadOptions& read_options, |
2755 | | ReadCallback* callback, |
2756 | | IterDerefFuncType iter_deref_func, T* cf_list, |
2757 | | bool extra_sv_ref, SequenceNumber* snapshot, |
2758 | | bool* sv_from_thread_local); |
2759 | | |
2760 | | // The actual implementation of the batching MultiGet. The caller is expected |
2761 | | // to have acquired the SuperVersion and pass in a snapshot sequence number |
2762 | | // in order to construct the LookupKeys. The start_key and num_keys specify |
2763 | | // the range of keys in the sorted_keys vector for a single column family. |
2764 | | Status MultiGetImpl( |
2765 | | const ReadOptions& read_options, size_t start_key, size_t num_keys, |
2766 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys, |
2767 | | SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback); |
2768 | | |
2769 | | void MultiGetWithCallbackImpl( |
2770 | | const ReadOptions& read_options, ColumnFamilyHandle* column_family, |
2771 | | ReadCallback* callback, |
2772 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys); |
2773 | | |
2774 | | Status DisableFileDeletionsWithLock(); |
2775 | | |
2776 | | Status IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, |
2777 | | std::string ts_low); |
2778 | | |
2779 | | bool ShouldReferenceSuperVersion(const MergeContext& merge_context); |
2780 | | |
2781 | | template <typename IterType, typename ImplType, |
2782 | | typename ErrorIteratorFuncType> |
2783 | | std::unique_ptr<IterType> NewMultiCfIterator( |
2784 | | const ReadOptions& _read_options, |
2785 | | const std::vector<ColumnFamilyHandle*>& column_families, |
2786 | | ErrorIteratorFuncType error_iterator_func); |
2787 | | |
2788 | | bool ShouldPickCompaction(bool is_prepicked, |
2789 | | const PrepickedCompaction* prepicked_compaction); |
2790 | | |
2791 | | void ResetBottomPriCompactionIntent(ColumnFamilyData* cfd, |
2792 | | std::unique_ptr<Compaction>& c); |
2793 | | // Lock over the persistent DB state. Non-nullptr iff successfully acquired. |
2794 | | FileLock* db_lock_ = nullptr; |
2795 | | |
2796 | | // Guards changes to DB and CF options to ensure consistency between |
2797 | | // * In-memory options objects |
2798 | | // * Settings in effect |
2799 | | // * Options file contents |
2800 | | // while allowing the DB mutex to be released during slow operations like |
2801 | | // persisting options file or modifying global periodic task timer. |
2802 | | // Always acquired *before* DB mutex when this one is applicable. |
2803 | | InstrumentedMutex options_mutex_; |
2804 | | |
2805 | | // Guards reads and writes to in-memory stats_history_. |
2806 | | InstrumentedMutex stats_history_mutex_; |
2807 | | |
2808 | | // In addition to mutex_, wal_write_mutex_ protects writes to logs_ and |
2809 | | // cur_wal_number_. With two_write_queues it also protects alive_wal_files_, |
2810 | | // and wal_empty_. Refer to the definition of each variable below for more |
2811 | | // details. |
2812 | | // Note: to avoid deadlock, if needed to acquire both wal_write_mutex_ and |
2813 | | // mutex_, the order should be first mutex_ and then wal_write_mutex_. |
2814 | | InstrumentedMutex wal_write_mutex_; |
2815 | | |
2816 | | // If zero, manual compactions are allowed to proceed. If non-zero, manual |
2817 | | // compactions may still be running, but will quickly fail with |
2818 | | // `Status::Incomplete`. The value indicates how many threads have paused |
2819 | | // manual compactions. It is accessed in read mode outside the DB mutex in |
2820 | | // compaction code paths. |
2821 | | std::atomic<int> manual_compaction_paused_ = false; |
2822 | | |
2823 | | // If non-zero, all compaction jobs (background automatic compactions, |
2824 | | // manual compactions via CompactRange, and foreground CompactFiles calls) |
2825 | | // are being aborted. Compactions will be signaled to stop. Any new |
2826 | | // compaction job would fail immediately. The value indicates how many threads |
2827 | | // have called AbortAllCompactions(). It is accessed in read mode outside the |
2828 | | // DB mutex in compaction code paths. |
2829 | | std::atomic<int> compaction_aborted_ = 0; |
2830 | | |
2831 | | // This condition variable is signaled on these conditions: |
2832 | | // * whenever bg_compaction_scheduled_ goes down to 0 |
2833 | | // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't |
2834 | | // made any progress |
2835 | | // * whenever a compaction made any progress |
2836 | | // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases |
2837 | | // (i.e. whenever a flush is done, even if it didn't make any progress) |
2838 | | // * whenever there is an error in background purge, flush or compaction |
2839 | | // * whenever num_running_ingest_file_ goes to 0. |
2840 | | // * whenever pending_purge_obsolete_files_ goes to 0. |
2841 | | // * whenever disable_delete_obsolete_files_ goes to 0. |
2842 | | // * whenever SetOptions successfully updates options. |
2843 | | // * whenever a column family is dropped. |
2844 | | InstrumentedCondVar bg_cv_; |
2845 | | |
2846 | | ColumnFamilyHandleImpl* persist_stats_cf_handle_ = nullptr; |
2847 | | |
2848 | | bool persistent_stats_cfd_exists_ = true; |
2849 | | |
2850 | | // Writes are protected by locking both mutex_ and wal_write_mutex_, and reads |
2851 | | // must be under either mutex_ or wal_write_mutex_. Since after ::Open, |
2852 | | // cur_wal_number_ is currently updated only in write_thread_, it can be read |
2853 | | // from the same write_thread_ without any locks. |
2854 | | uint64_t cur_wal_number_ = 0; |
2855 | | |
2856 | | // Log files that we can recycle. Must be protected by db mutex_. |
2857 | | std::deque<uint64_t> wal_recycle_files_; |
2858 | | |
2859 | | // The minimum log file number taht can be recycled, if log recycling is |
2860 | | // enabled. This is used to ensure that log files created by previous |
2861 | | // instances of the database are not recycled, as we cannot be sure they |
2862 | | // were created in the recyclable format. |
2863 | | uint64_t min_wal_number_to_recycle_ = 0; |
2864 | | |
2865 | | // Protected by wal_write_mutex_. |
2866 | | bool wal_dir_synced_ = false; |
2867 | | |
2868 | | // Without two_write_queues, read and writes to wal_empty_ are protected by |
2869 | | // mutex_. Since it is currently updated/read only in write_thread_, it can be |
2870 | | // accessed from the same write_thread_ without any locks. With |
2871 | | // two_write_queues writes, where it can be updated in different threads, |
2872 | | // read and writes are protected by wal_write_mutex_ instead. This is to avoid |
2873 | | // expensive mutex_ lock during WAL write, which update wal_empty_. |
2874 | | bool wal_empty_ = true; |
2875 | | |
2876 | | // The current WAL file and those that have not been found obsolete from |
2877 | | // memtable flushes. A WAL not on this list might still be pending writer |
2878 | | // flush and/or sync and close and might still be in logs_. alive_wal_files_ |
2879 | | // is protected by mutex_ and wal_write_mutex_ with details as follows: |
2880 | | // 1. read by FindObsoleteFiles() which can be called in either application |
2881 | | // thread or RocksDB bg threads, both mutex_ and wal_write_mutex_ are |
2882 | | // held. |
2883 | | // 2. pop_front() by FindObsoleteFiles(), both mutex_ and wal_write_mutex_ |
2884 | | // are held. |
2885 | | // 3. push_back() by DBImpl::Open() and DBImpl::RestoreAliveLogFiles() |
2886 | | // (actually called by Open()), only mutex_ is held because at this point, |
2887 | | // the DB::Open() call has not returned success to application, and the |
2888 | | // only other thread(s) that can conflict are bg threads calling |
2889 | | // FindObsoleteFiles() which ensure that both mutex_ and wal_write_mutex_ |
2890 | | // are held when accessing alive_wal_files_. |
2891 | | // 4. read by DBImpl::Open() is protected by mutex_. |
2892 | | // 5. push_back() by SwitchMemtable(). Both mutex_ and wal_write_mutex_ are |
2893 | | // held. This is done by the write group leader. Note that in the case of |
2894 | | // two-write-queues, another WAL-only write thread can be writing to the |
2895 | | // WAL concurrently. See 9. |
2896 | | // 6. read by SwitchWAL() with both mutex_ and wal_write_mutex_ held. This is |
2897 | | // done by write group leader. |
2898 | | // 7. read by ConcurrentWriteToWAL() by the write group leader in the case of |
2899 | | // two-write-queues. Only wal_write_mutex_ is held to protect concurrent |
2900 | | // pop_front() by FindObsoleteFiles(). |
2901 | | // 8. read by PreprocessWrite() by the write group leader. wal_write_mutex_ |
2902 | | // is held to protect the data structure from concurrent pop_front() by |
2903 | | // FindObsoleteFiles(). |
2904 | | // 9. read by ConcurrentWriteToWAL() by a WAL-only write thread in the case |
2905 | | // of two-write-queues. Only wal_write_mutex_ is held. This suffices to |
2906 | | // protect the data structure from concurrent push_back() by current |
2907 | | // write group leader as well as pop_front() by FindObsoleteFiles(). |
2908 | | std::deque<WalFileNumberSize> alive_wal_files_; |
2909 | | |
2910 | | // Total size of all "alive" WALs (for easy access without synchronization) |
2911 | | RelaxedAtomic<uint64_t> wals_total_size_{0}; |
2912 | | |
2913 | | // Log files that aren't fully synced, and the current log file. |
2914 | | // Synchronization: |
2915 | | // 1. read by FindObsoleteFiles() which can be called either in application |
2916 | | // thread or RocksDB bg threads. wal_write_mutex_ is always held, while |
2917 | | // some reads are performed without mutex_. |
2918 | | // 2. pop_front() by FindObsoleteFiles() with only wal_write_mutex_ held. |
2919 | | // 3. read by DBImpl::Open() with both mutex_ and wal_write_mutex_. |
2920 | | // 4. emplace_back() by DBImpl::Open() with both mutex_ and wal_write_mutex. |
2921 | | // Note that at this point, DB::Open() has not returned success to |
2922 | | // application, thus the only other thread(s) that can conflict are bg |
2923 | | // threads calling FindObsoleteFiles(). See 1. |
2924 | | // 5. iteration and clear() from CloseHelper() always hold wal_write_mutex |
2925 | | // and mutex_. |
2926 | | // 6. back() called by APIs FlushWAL() and LockWAL() are protected by only |
2927 | | // wal_write_mutex_. These two can be called by application threads after |
2928 | | // DB::Open() returns success to applications. |
2929 | | // 7. read by SyncWAL(), another API, protected by only wal_write_mutex_. |
2930 | | // 8. read by MarkLogsNotSynced() and MarkLogsSynced() are protected by |
2931 | | // wal_write_mutex_. |
2932 | | // 9. erase() by MarkLogsSynced() protected by wal_write_mutex_. |
2933 | | // 10. read by SyncClosedWals() protected by only wal_write_mutex_. This can |
2934 | | // happen in bg flush threads after DB::Open() returns success to |
2935 | | // applications. |
2936 | | // 11. reads, e.g. front(), iteration, and back() called by PreprocessWrite() |
2937 | | // holds only the wal_write_mutex_. This is done by the write group |
2938 | | // leader. A bg thread calling FindObsoleteFiles() or MarkLogsSynced() |
2939 | | // can happen concurrently. This is fine because wal_write_mutex_ is used |
2940 | | // by all parties. See 2, 5, 9. |
2941 | | // 12. reads, empty(), back() called by SwitchMemtable() hold both mutex_ and |
2942 | | // wal_write_mutex_. This happens in the write group leader. |
2943 | | // 13. emplace_back() by SwitchMemtable() hold both mutex_ and |
2944 | | // wal_write_mutex_. This happens in the write group leader. Can conflict |
2945 | | // with bg threads calling FindObsoleteFiles(), MarkLogsSynced(), |
2946 | | // SyncClosedWals(), etc. as well as application threads calling |
2947 | | // FlushWAL(), SyncWAL(), LockWAL(). This is fine because all parties |
2948 | | // require at least wal_write_mutex_. |
2949 | | // 14. iteration called in WriteToWAL(write_group) protected by |
2950 | | // wal_write_mutex_. This is done by write group leader when |
2951 | | // two-write-queues is disabled and write needs to sync logs. |
2952 | | // 15. back() called in ConcurrentWriteToWAL() protected by wal_write_mutex_. |
2953 | | // This can be done by the write group leader if two-write-queues is |
2954 | | // enabled. It can also be done by another WAL-only write thread. |
2955 | | // |
2956 | | // Other observations: |
2957 | | // - back() and items with getting_synced=true are not popped, |
2958 | | // - The same thread that sets getting_synced=true will reset it. |
2959 | | // - it follows that the object referred by back() can be safely read from |
2960 | | // the write_thread_ without using mutex. Note that calling back() without |
2961 | | // mutex may be unsafe because different implementations of deque::back() may |
2962 | | // access other member variables of deque, causing undefined behaviors. |
2963 | | // Generally, do not access stl containers without proper synchronization. |
2964 | | // - it follows that the items with getting_synced=true can be safely read |
2965 | | // from the same thread that has set getting_synced=true |
2966 | | std::deque<LogWriterNumber> logs_; |
2967 | | |
2968 | | // Signaled when getting_synced becomes false for some of the logs_. |
2969 | | InstrumentedCondVar wal_sync_cv_; |
2970 | | // This is the app-level state that is written to the WAL but will be used |
2971 | | // only during recovery. Using this feature enables not writing the state to |
2972 | | // memtable on normal writes and hence improving the throughput. Each new |
2973 | | // write of the state will replace the previous state entirely even if the |
2974 | | // keys in the two consecutive states do not overlap. |
2975 | | // It is protected by wal_write_mutex_ when two_write_queues_ is enabled. |
2976 | | // Otherwise only the heaad of write_thread_ can access it. |
2977 | | WriteBatch cached_recoverable_state_; |
2978 | | std::atomic<bool> cached_recoverable_state_empty_ = {true}; |
2979 | | |
2980 | | // If this is non-empty, we need to delete these log files in background |
2981 | | // threads. Protected by wal_write_mutex_. |
2982 | | autovector<log::Writer*> wals_to_free_; |
2983 | | |
2984 | | bool is_snapshot_supported_ = true; |
2985 | | |
2986 | | std::map<uint64_t, std::map<std::string, uint64_t>> stats_history_; |
2987 | | |
2988 | | std::map<std::string, uint64_t> stats_slice_; |
2989 | | |
2990 | | bool stats_slice_initialized_ = false; |
2991 | | |
2992 | | Directories directories_; |
2993 | | |
2994 | | WriteBufferManager* write_buffer_manager_; |
2995 | | |
2996 | | WriteThread write_thread_; |
2997 | | WriteBatch tmp_batch_; |
2998 | | // The write thread when the writers have no memtable write. This will be used |
2999 | | // in 2PC to batch the prepares separately from the serial commit. |
3000 | | WriteThread nonmem_write_thread_; |
3001 | | |
3002 | | WriteController write_controller_; |
3003 | | |
3004 | | // Size of the last batch group. In slowdown mode, next write needs to |
3005 | | // sleep if it uses up the quota. |
3006 | | // Note: This is to protect memtable and compaction. If the batch only writes |
3007 | | // to the WAL its size need not to be included in this. |
3008 | | uint64_t last_batch_group_size_ = 0; |
3009 | | |
3010 | | FlushScheduler flush_scheduler_; |
3011 | | |
3012 | | TrimHistoryScheduler trim_history_scheduler_; |
3013 | | |
3014 | | SnapshotList snapshots_; |
3015 | | |
3016 | | TimestampedSnapshotList timestamped_snapshots_; |
3017 | | |
3018 | | // For each background job, pending_outputs_ keeps the current file number at |
3019 | | // the time that background job started. |
3020 | | // FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has |
3021 | | // number bigger than any of the file number in pending_outputs_. Since file |
3022 | | // numbers grow monotonically, this also means that pending_outputs_ is always |
3023 | | // sorted. After a background job is done executing, its file number is |
3024 | | // deleted from pending_outputs_, which allows PurgeObsoleteFiles() to clean |
3025 | | // it up. |
3026 | | // State is protected with db mutex. |
3027 | | std::list<uint64_t> pending_outputs_; |
3028 | | |
3029 | | // Similar to pending_outputs_, FindObsoleteFiles()/PurgeObsoleteFiles() never |
3030 | | // deletes any OPTIONS file that has number bigger than any of the file number |
3031 | | // in min_options_file_numbers_. |
3032 | | std::list<uint64_t> min_options_file_numbers_; |
3033 | | |
3034 | | // flush_queue_ and compaction_queue_ hold column families that we need to |
3035 | | // flush and compact, respectively. |
3036 | | // A column family is inserted into flush_queue_ when it satisfies condition |
3037 | | // cfd->imm()->IsFlushPending() |
3038 | | // A column family is inserted into compaction_queue_ when it satisfied |
3039 | | // condition cfd->NeedsCompaction() |
3040 | | // Column families in this list are all Ref()-erenced |
3041 | | // TODO(icanadi) Provide some kind of ReferencedColumnFamily class that will |
3042 | | // do RAII on ColumnFamilyData |
3043 | | // Column families are in this queue when they need to be flushed or |
3044 | | // compacted. Consumers of these queues are flush and compaction threads. When |
3045 | | // column family is put on this queue, we increase unscheduled_flushes_ and |
3046 | | // unscheduled_compactions_. When these variables are bigger than zero, that |
3047 | | // means we need to schedule background threads for flush and compaction. |
3048 | | // Once the background threads are scheduled, we decrease unscheduled_flushes_ |
3049 | | // and unscheduled_compactions_. That way we keep track of number of |
3050 | | // compaction and flush threads we need to schedule. This scheduling is done |
3051 | | // in MaybeScheduleFlushOrCompaction() |
3052 | | // invariant(column family present in flush_queue_ <==> |
3053 | | // ColumnFamilyData::pending_flush_ == true) |
3054 | | std::deque<FlushRequest> flush_queue_; |
3055 | | // invariant(column family present in compaction_queue_ <==> |
3056 | | // ColumnFamilyData::pending_compaction_ == true) |
3057 | | std::deque<ColumnFamilyData*> compaction_queue_; |
3058 | | |
3059 | | // A map to store file numbers and filenames of the files to be purged |
3060 | | std::unordered_map<uint64_t, PurgeFileInfo> purge_files_; |
3061 | | |
3062 | | // A vector to store the file numbers that have been assigned to certain |
3063 | | // JobContext. Current implementation tracks table and blob files only. |
3064 | | std::unordered_set<uint64_t> files_grabbed_for_purge_; |
3065 | | |
3066 | | // A queue to store log writers to close. Protected by db mutex_. |
3067 | | std::deque<log::Writer*> wals_to_free_queue_; |
3068 | | |
3069 | | std::deque<SuperVersion*> superversions_to_free_queue_; |
3070 | | |
3071 | | int unscheduled_flushes_ = 0; |
3072 | | |
3073 | | int unscheduled_compactions_ = 0; |
3074 | | |
3075 | | // count how many background compactions are running or have been scheduled in |
3076 | | // the BOTTOM pool |
3077 | | int bg_bottom_compaction_scheduled_ = 0; |
3078 | | |
3079 | | // count how many background compactions are running or have been scheduled |
3080 | | int bg_compaction_scheduled_ = 0; |
3081 | | |
3082 | | // stores the number of compactions are currently running |
3083 | | int num_running_compactions_ = 0; |
3084 | | |
3085 | | // stores the number of BOTTOM-priority compactions currently running |
3086 | | int num_running_bottom_compactions_ = 0; |
3087 | | |
3088 | | // number of background memtable flush jobs, submitted to the HIGH pool |
3089 | | int bg_flush_scheduled_ = 0; |
3090 | | |
3091 | | // stores the number of flushes are currently running |
3092 | | int num_running_flushes_ = 0; |
3093 | | |
3094 | | // number of background obsolete file purge jobs, submitted to the HIGH pool |
3095 | | int bg_purge_scheduled_ = 0; |
3096 | | |
3097 | | // number of pressure callbacks currently in progress (for destructor safety) |
3098 | | int bg_pressure_callback_in_progress_ = 0; |
3099 | | |
3100 | | enum class AsyncFileOpenState : uint8_t { |
3101 | | kNotScheduled = 0, // Async file opening has not been scheduled. |
3102 | | kScheduled, // Async file opening is in-flight in the HIGH pool. |
3103 | | kComplete, // Async file opening has finished (or was not needed). |
3104 | | }; |
3105 | | |
3106 | | // Tracks whether background async file opening has been scheduled/completed. |
3107 | | AsyncFileOpenState bg_async_file_open_state_ = |
3108 | | AsyncFileOpenState::kNotScheduled; |
3109 | | |
3110 | | std::deque<ManualCompactionState*> manual_compaction_dequeue_; |
3111 | | |
3112 | | // shall we disable deletion of obsolete files |
3113 | | // if 0 the deletion is enabled. |
3114 | | // if non-zero, files will not be getting deleted |
3115 | | // This enables two different threads to call |
3116 | | // EnableFileDeletions() and DisableFileDeletions() |
3117 | | // without any synchronization |
3118 | | int disable_delete_obsolete_files_ = 0; |
3119 | | |
3120 | | // Number of times FindObsoleteFiles has found deletable files and the |
3121 | | // corresponding call to PurgeObsoleteFiles has not yet finished. |
3122 | | int pending_purge_obsolete_files_ = 0; |
3123 | | |
3124 | | // last time when DeleteObsoleteFiles with full scan was executed. Originally |
3125 | | // initialized with startup time. |
3126 | | uint64_t delete_obsolete_files_last_run_; |
3127 | | |
3128 | | // The thread that wants to switch memtable, can wait on this cv until the |
3129 | | // pending writes to memtable finishes. |
3130 | | std::condition_variable switch_cv_; |
3131 | | // The mutex used by switch_cv_. mutex_ should be acquired beforehand. |
3132 | | std::mutex switch_mutex_; |
3133 | | // Number of threads intending to write to memtable |
3134 | | std::atomic<size_t> pending_memtable_writes_{0}; |
3135 | | |
3136 | | // A flag indicating whether the current rocksdb database has any |
3137 | | // data that is not yet persisted into either WAL or SST file. |
3138 | | // Used when disableWAL is true. |
3139 | | std::atomic<bool> has_unpersisted_data_{false}; |
3140 | | |
3141 | | // if an attempt was made to flush all column families that |
3142 | | // the oldest log depends on but uncommitted data in the oldest |
3143 | | // log prevents the log from being released. |
3144 | | // We must attempt to free the dependent memtables again |
3145 | | // at a later time after the transaction in the oldest |
3146 | | // log is fully commited. |
3147 | | bool unable_to_release_oldest_log_{false}; |
3148 | | |
3149 | | // Number of running IngestExternalFile() or CreateColumnFamilyWithImport() |
3150 | | // calls. |
3151 | | // REQUIRES: mutex held |
3152 | | int num_running_ingest_file_ = 0; |
3153 | | |
3154 | | WalManager wal_manager_; |
3155 | | |
3156 | | // A value of > 0 temporarily disables scheduling of background work |
3157 | | int bg_work_paused_ = 0; |
3158 | | |
3159 | | // A value of > 0 temporarily disables scheduling of background compaction |
3160 | | int bg_compaction_paused_ = 0; |
3161 | | |
3162 | | // Guard against multiple concurrent refitting |
3163 | | bool refitting_level_ = false; |
3164 | | |
3165 | | // The min threshold to triggere bottommost compaction for removing |
3166 | | // garbages, among all column families. |
3167 | | SequenceNumber bottommost_files_mark_threshold_ = kMaxSequenceNumber; |
3168 | | |
3169 | | // The min threshold to trigger compactions for standalone range deletion |
3170 | | // files that are marked for compaction. |
3171 | | SequenceNumber standalone_range_deletion_files_mark_threshold_ = |
3172 | | kMaxSequenceNumber; |
3173 | | |
3174 | | LogsWithPrepTracker logs_with_prep_tracker_; |
3175 | | |
3176 | | // Callback for compaction to check if a key is visible to a snapshot. |
3177 | | // REQUIRES: mutex held |
3178 | | std::unique_ptr<SnapshotChecker> snapshot_checker_; |
3179 | | |
3180 | | // Callback for when the cached_recoverable_state_ is written to memtable |
3181 | | // Only to be set during initialization |
3182 | | std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_; |
3183 | | |
3184 | | // Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog(). |
3185 | | // Currently, internally it has a global timer instance for running the tasks. |
3186 | | PeriodicTaskScheduler periodic_task_scheduler_; |
3187 | | |
3188 | | // It contains the implementations for each periodic task. |
3189 | | std::map<PeriodicTaskType, const PeriodicTaskFunc> periodic_task_functions_; |
3190 | | |
3191 | | // When set, we use a separate queue for writes that don't write to memtable. |
3192 | | // In 2PC these are the writes at Prepare phase. |
3193 | | const bool two_write_queues_; |
3194 | | const bool manual_wal_flush_; |
3195 | | |
3196 | | // LastSequence also indicates last published sequence visibile to the |
3197 | | // readers. Otherwise LastPublishedSequence should be used. |
3198 | | const bool last_seq_same_as_publish_seq_; |
3199 | | // It indicates that a customized gc algorithm must be used for |
3200 | | // flush/compaction and if it is not provided vis SnapshotChecker, we should |
3201 | | // disable gc to be safe. |
3202 | | const bool use_custom_gc_; |
3203 | | // Flag to indicate that the DB instance shutdown has been initiated. This |
3204 | | // different from shutting_down_ atomic in that it is set at the beginning |
3205 | | // of shutdown sequence, specifically in order to prevent any background |
3206 | | // error recovery from going on in parallel. The latter, shutting_down_, |
3207 | | // is set a little later during the shutdown after scheduling memtable |
3208 | | // flushes |
3209 | | std::atomic<bool> shutdown_initiated_{false}; |
3210 | | // Flag to indicate whether sst_file_manager object was allocated in |
3211 | | // DB::Open() or passed to us |
3212 | | bool own_sfm_; |
3213 | | |
3214 | | // Flag to check whether Close() has been called on this DB |
3215 | | bool closed_ = false; |
3216 | | // save the closing status, for re-calling the close() |
3217 | | Status closing_status_; |
3218 | | // mutex for DB::Close() |
3219 | | InstrumentedMutex closing_mutex_; |
3220 | | |
3221 | | // Conditional variable to coordinate installation of atomic flush results. |
3222 | | // With atomic flush, each bg thread installs the result of flushing multiple |
3223 | | // column families, and different threads can flush different column |
3224 | | // families. It's difficult to rely on one thread to perform batch |
3225 | | // installation for all threads. This is different from the non-atomic flush |
3226 | | // case. |
3227 | | // atomic_flush_install_cv_ makes sure that threads install atomic flush |
3228 | | // results sequentially. Flush results of memtables with lower IDs get |
3229 | | // installed to MANIFEST first. |
3230 | | InstrumentedCondVar atomic_flush_install_cv_; |
3231 | | |
3232 | | bool wal_in_db_path_ = false; |
3233 | | std::atomic<uint64_t> max_total_wal_size_; |
3234 | | |
3235 | | BlobFileCompletionCallback blob_callback_; |
3236 | | |
3237 | | // Pointer to WriteBufferManager stalling interface. |
3238 | | std::unique_ptr<StallInterface> wbm_stall_; |
3239 | | |
3240 | | // seqno_to_time_mapping_ stores the sequence number to time mapping, it's not |
3241 | | // thread safe, both read and write need db mutex hold. |
3242 | | SeqnoToTimeMapping seqno_to_time_mapping_; |
3243 | | |
3244 | | // Stop write token that is acquired when first LockWAL() is called. |
3245 | | // Destroyed when last UnlockWAL() is called. Controlled by DB mutex. |
3246 | | // See lock_wal_count_ |
3247 | | std::unique_ptr<WriteControllerToken> lock_wal_write_token_; |
3248 | | |
3249 | | // The number of LockWAL called without matching UnlockWAL call. |
3250 | | // See also lock_wal_write_token_ |
3251 | | uint32_t lock_wal_count_ = 0; |
3252 | | }; |
3253 | | |
3254 | | class GetWithTimestampReadCallback : public ReadCallback { |
3255 | | public: |
3256 | | explicit GetWithTimestampReadCallback(SequenceNumber seq) |
3257 | 16.6k | : ReadCallback(seq) {} |
3258 | 0 | bool IsVisibleFullCheck(SequenceNumber seq) override { |
3259 | 0 | return seq <= max_visible_seq_; |
3260 | 0 | } |
3261 | | }; |
3262 | | |
3263 | | Options SanitizeOptions(const std::string& db, const Options& src, |
3264 | | bool read_only = false, |
3265 | | Status* logger_creation_s = nullptr); |
3266 | | |
3267 | | DBOptions SanitizeOptions(const std::string& db, const DBOptions& src, |
3268 | | bool read_only = false, |
3269 | | Status* logger_creation_s = nullptr); |
3270 | | |
3271 | | CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions, |
3272 | | const MutableCFOptions& mutable_cf_options); |
3273 | | |
3274 | | // Return a VersionEdit for the DB's recovery when the `memtables` of the |
3275 | | // specified column family are obsolete. Specifically, the min log number to |
3276 | | // keep, and the WAL files that can be deleted. |
3277 | | VersionEdit GetDBRecoveryEditForObsoletingMemTables( |
3278 | | VersionSet* vset, const ColumnFamilyData& cfd, |
3279 | | const autovector<VersionEdit*>& edit_list, |
3280 | | const autovector<ReadOnlyMemTable*>& memtables, |
3281 | | LogsWithPrepTracker* prep_tracker); |
3282 | | |
3283 | | // Return the earliest log file to keep after the memtable flush is |
3284 | | // finalized. |
3285 | | // `cfd_to_flush` is the column family whose memtable (specified in |
3286 | | // `memtables_to_flush`) will be flushed and thus will not depend on any WAL |
3287 | | // file. |
3288 | | // The function is only applicable to 2pc mode. |
3289 | | uint64_t PrecomputeMinLogNumberToKeep2PC( |
3290 | | VersionSet* vset, const ColumnFamilyData& cfd_to_flush, |
3291 | | const autovector<VersionEdit*>& edit_list, |
3292 | | const autovector<ReadOnlyMemTable*>& memtables_to_flush, |
3293 | | LogsWithPrepTracker* prep_tracker); |
3294 | | // For atomic flush. |
3295 | | uint64_t PrecomputeMinLogNumberToKeep2PC( |
3296 | | VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, |
3297 | | const autovector<autovector<VersionEdit*>>& edit_lists, |
3298 | | const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush, |
3299 | | LogsWithPrepTracker* prep_tracker); |
3300 | | |
3301 | | // In non-2PC mode, WALs with log number < the returned number can be |
3302 | | // deleted after the cfd_to_flush column family is flushed successfully. |
3303 | | uint64_t PrecomputeMinLogNumberToKeepNon2PC( |
3304 | | VersionSet* vset, const ColumnFamilyData& cfd_to_flush, |
3305 | | const autovector<VersionEdit*>& edit_list); |
3306 | | // For atomic flush. |
3307 | | uint64_t PrecomputeMinLogNumberToKeepNon2PC( |
3308 | | VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, |
3309 | | const autovector<autovector<VersionEdit*>>& edit_lists); |
3310 | | |
3311 | | // `cfd_to_flush` is the column family whose memtable will be flushed and thus |
3312 | | // will not depend on any WAL file. nullptr means no memtable is being flushed. |
3313 | | // The function is only applicable to 2pc mode. |
3314 | | uint64_t FindMinPrepLogReferencedByMemTable( |
3315 | | VersionSet* vset, const autovector<ReadOnlyMemTable*>& memtables_to_flush); |
3316 | | // For atomic flush. |
3317 | | uint64_t FindMinPrepLogReferencedByMemTable( |
3318 | | VersionSet* vset, |
3319 | | const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush); |
3320 | | |
3321 | | // Fix user-supplied options to be reasonable |
3322 | | template <class T, class V> |
3323 | 127k | static void ClipToRange(T* ptr, V minvalue, V maxvalue) { |
3324 | 127k | if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; |
3325 | 127k | if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; |
3326 | 127k | } column_family.cc:void rocksdb::ClipToRange<unsigned long, unsigned long>(unsigned long*, unsigned long, unsigned long) Line | Count | Source | 3323 | 127k | static void ClipToRange(T* ptr, V minvalue, V maxvalue) { | 3324 | 127k | if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; | 3325 | 127k | if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; | 3326 | 127k | } |
Unexecuted instantiation: db_impl_open.cc:void rocksdb::ClipToRange<int, int>(int*, int, int) |
3327 | | |
3328 | | inline Status DBImpl::FailIfCfHasTs( |
3329 | 523k | const ColumnFamilyHandle* column_family) const { |
3330 | 523k | if (!column_family) { |
3331 | 0 | return Status::InvalidArgument("column family handle cannot be null"); |
3332 | 0 | } |
3333 | 523k | assert(column_family); |
3334 | 523k | const Comparator* const ucmp = column_family->GetComparator(); |
3335 | 523k | assert(ucmp); |
3336 | 523k | if (ucmp->timestamp_size() > 0) { |
3337 | 0 | std::ostringstream oss; |
3338 | 0 | oss << "cannot call this method on column family " |
3339 | 0 | << column_family->GetName() << " that enables timestamp"; |
3340 | 0 | return Status::InvalidArgument(oss.str()); |
3341 | 0 | } |
3342 | 523k | return Status::OK(); |
3343 | 523k | } |
3344 | | |
3345 | | inline Status DBImpl::FailIfTsMismatchCf(ColumnFamilyHandle* column_family, |
3346 | 0 | const Slice& ts) const { |
3347 | 0 | if (!column_family) { |
3348 | 0 | return Status::InvalidArgument("column family handle cannot be null"); |
3349 | 0 | } |
3350 | 0 | assert(column_family); |
3351 | 0 | const Comparator* const ucmp = column_family->GetComparator(); |
3352 | 0 | assert(ucmp); |
3353 | 0 | if (0 == ucmp->timestamp_size()) { |
3354 | 0 | std::stringstream oss; |
3355 | 0 | oss << "cannot call this method on column family " |
3356 | 0 | << column_family->GetName() << " that does not enable timestamp"; |
3357 | 0 | return Status::InvalidArgument(oss.str()); |
3358 | 0 | } |
3359 | 0 | const size_t ts_sz = ts.size(); |
3360 | 0 | if (ts_sz != ucmp->timestamp_size()) { |
3361 | 0 | std::stringstream oss; |
3362 | 0 | oss << "Timestamp sizes mismatch: expect " << ucmp->timestamp_size() << ", " |
3363 | 0 | << ts_sz << " given"; |
3364 | 0 | return Status::InvalidArgument(oss.str()); |
3365 | 0 | } |
3366 | 0 | return Status::OK(); |
3367 | 0 | } |
3368 | | |
3369 | | inline Status DBImpl::FailIfReadCollapsedHistory(const ColumnFamilyData* cfd, |
3370 | | const SuperVersion* sv, |
3371 | 0 | const Slice& ts) const { |
3372 | | // Reaching to this point means the timestamp size matching sanity check in |
3373 | | // `DBImpl::FailIfTsMismatchCf` already passed. So we skip that and assume |
3374 | | // column family has the same user-defined timestamp format as `ts`. |
3375 | 0 | const Comparator* const ucmp = cfd->user_comparator(); |
3376 | 0 | assert(ucmp); |
3377 | 0 | const std::string& full_history_ts_low = sv->full_history_ts_low; |
3378 | 0 | assert(full_history_ts_low.empty() || |
3379 | 0 | full_history_ts_low.size() == ts.size()); |
3380 | 0 | if (!full_history_ts_low.empty() && |
3381 | 0 | ucmp->CompareTimestamp(ts, full_history_ts_low) < 0) { |
3382 | 0 | std::stringstream oss; |
3383 | 0 | oss << "Read timestamp: " << ucmp->TimestampToString(ts) |
3384 | 0 | << " is smaller than full_history_ts_low: " |
3385 | 0 | << ucmp->TimestampToString(full_history_ts_low) << std::endl; |
3386 | 0 | return Status::InvalidArgument(oss.str()); |
3387 | 0 | } |
3388 | 0 | return Status::OK(); |
3389 | 0 | } |
3390 | | } // namespace ROCKSDB_NAMESPACE |