/src/rocksdb/db/db_impl/db_impl.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | #pragma once |
10 | | |
11 | | #include <atomic> |
12 | | #include <cstdint> |
13 | | #include <deque> |
14 | | #include <functional> |
15 | | #include <limits> |
16 | | #include <list> |
17 | | #include <map> |
18 | | #include <set> |
19 | | #include <string> |
20 | | #include <unordered_map> |
21 | | #include <utility> |
22 | | #include <vector> |
23 | | |
24 | | #include "db/column_family.h" |
25 | | #include "db/compaction/compaction_iterator.h" |
26 | | #include "db/compaction/compaction_job.h" |
27 | | #include "db/error_handler.h" |
28 | | #include "db/event_helpers.h" |
29 | | #include "db/external_sst_file_ingestion_job.h" |
30 | | #include "db/flush_job.h" |
31 | | #include "db/flush_scheduler.h" |
32 | | #include "db/import_column_family_job.h" |
33 | | #include "db/internal_stats.h" |
34 | | #include "db/log_writer.h" |
35 | | #include "db/logs_with_prep_tracker.h" |
36 | | #include "db/memtable_list.h" |
37 | | #include "db/periodic_task_scheduler.h" |
38 | | #include "db/post_memtable_callback.h" |
39 | | #include "db/pre_release_callback.h" |
40 | | #include "db/range_del_aggregator.h" |
41 | | #include "db/read_callback.h" |
42 | | #include "db/seqno_to_time_mapping.h" |
43 | | #include "db/snapshot_checker.h" |
44 | | #include "db/snapshot_impl.h" |
45 | | #include "db/trim_history_scheduler.h" |
46 | | #include "db/version_edit.h" |
47 | | #include "db/wal_manager.h" |
48 | | #include "db/write_controller.h" |
49 | | #include "db/write_thread.h" |
50 | | #include "logging/event_logger.h" |
51 | | #include "monitoring/instrumented_mutex.h" |
52 | | #include "options/db_options.h" |
53 | | #include "port/port.h" |
54 | | #include "rocksdb/attribute_groups.h" |
55 | | #include "rocksdb/db.h" |
56 | | #include "rocksdb/env.h" |
57 | | #include "rocksdb/memtablerep.h" |
58 | | #include "rocksdb/status.h" |
59 | | #include "rocksdb/trace_reader_writer.h" |
60 | | #include "rocksdb/transaction_log.h" |
61 | | #include "rocksdb/user_write_callback.h" |
62 | | #include "rocksdb/utilities/replayer.h" |
63 | | #include "rocksdb/write_buffer_manager.h" |
64 | | #include "table/merging_iterator.h" |
65 | | #include "util/autovector.h" |
66 | | #include "util/hash.h" |
67 | | #include "util/repeatable_thread.h" |
68 | | #include "util/stop_watch.h" |
69 | | #include "util/thread_local.h" |
70 | | |
71 | | namespace ROCKSDB_NAMESPACE { |
72 | | |
73 | | class Arena; |
74 | | class ArenaWrappedDBIter; |
75 | | class InMemoryStatsHistoryIterator; |
76 | | class MemTable; |
77 | | class PersistentStatsHistoryIterator; |
78 | | class TableCache; |
79 | | class TaskLimiterToken; |
80 | | class Version; |
81 | | class VersionEdit; |
82 | | class VersionSet; |
83 | | class WriteCallback; |
84 | | struct JobContext; |
85 | | struct ExternalSstFileInfo; |
86 | | struct MemTableInfo; |
87 | | |
88 | | // Class to maintain directories for all database paths other than main one. |
89 | | class Directories { |
90 | | public: |
91 | | IOStatus SetDirectories(FileSystem* fs, const std::string& dbname, |
92 | | const std::string& wal_dir, |
93 | | const std::vector<DbPath>& data_paths); |
94 | | |
95 | 0 | FSDirectory* GetDataDir(size_t path_id) const { |
96 | 0 | assert(path_id < data_dirs_.size()); |
97 | 0 | FSDirectory* ret_dir = data_dirs_[path_id].get(); |
98 | 0 | if (ret_dir == nullptr) { |
99 | | // Should use db_dir_ |
100 | 0 | return db_dir_.get(); |
101 | 0 | } |
102 | 0 | return ret_dir; |
103 | 0 | } |
104 | | |
105 | 0 | FSDirectory* GetWalDir() { |
106 | 0 | if (wal_dir_) { |
107 | 0 | return wal_dir_.get(); |
108 | 0 | } |
109 | 0 | return db_dir_.get(); |
110 | 0 | } |
111 | | |
112 | 28.5k | FSDirectory* GetDbDir() { return db_dir_.get(); } |
113 | | |
114 | 11.0k | IOStatus Close(const IOOptions& options, IODebugContext* dbg) { |
115 | | // close all directories for all database paths |
116 | 11.0k | IOStatus s = IOStatus::OK(); |
117 | | |
118 | | // The default implementation for Close() in Directory/FSDirectory class |
119 | | // "NotSupported" status, the upper level interface should be able to |
120 | | // handle this error so that Close() does not fail after upgrading when |
121 | | // run on FileSystems that have not implemented `Directory::Close()` or |
122 | | // `FSDirectory::Close()` yet |
123 | | |
124 | 11.0k | if (db_dir_) { |
125 | 11.0k | IOStatus temp_s = db_dir_->Close(options, dbg); |
126 | 11.0k | if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) { |
127 | 0 | s = std::move(temp_s); |
128 | 0 | } |
129 | 11.0k | } |
130 | | |
131 | | // Attempt to close everything even if one fails |
132 | 11.0k | s.PermitUncheckedError(); |
133 | | |
134 | 11.0k | if (wal_dir_) { |
135 | 0 | IOStatus temp_s = wal_dir_->Close(options, dbg); |
136 | 0 | if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) { |
137 | 0 | s = std::move(temp_s); |
138 | 0 | } |
139 | 0 | } |
140 | | |
141 | 11.0k | s.PermitUncheckedError(); |
142 | | |
143 | 11.0k | for (auto& data_dir_ptr : data_dirs_) { |
144 | 11.0k | if (data_dir_ptr) { |
145 | 0 | IOStatus temp_s = data_dir_ptr->Close(options, dbg); |
146 | 0 | if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) { |
147 | 0 | s = std::move(temp_s); |
148 | 0 | } |
149 | 0 | } |
150 | 11.0k | } |
151 | | |
152 | | // Ready for caller |
153 | 11.0k | s.MustCheck(); |
154 | 11.0k | return s; |
155 | 11.0k | } |
156 | | |
157 | | private: |
158 | | std::unique_ptr<FSDirectory> db_dir_; |
159 | | std::vector<std::unique_ptr<FSDirectory>> data_dirs_; |
160 | | std::unique_ptr<FSDirectory> wal_dir_; |
161 | | }; |
162 | | |
163 | | // While DB is the public interface of RocksDB, and DBImpl is the actual |
164 | | // class implementing it. It's the entrance of the core RocksdB engine. |
165 | | // All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a |
166 | | // DBImpl internally. |
167 | | // Other than functions implementing the DB interface, some public |
168 | | // functions are there for other internal components to call. For |
169 | | // example, TransactionDB directly calls DBImpl::WriteImpl() and |
170 | | // BlobDB directly calls DBImpl::GetImpl(). Some other functions |
171 | | // are for sub-components to call. For example, ColumnFamilyHandleImpl |
172 | | // calls DBImpl::FindObsoleteFiles(). |
173 | | // |
174 | | // Since it's a very large class, the definition of the functions is |
175 | | // divided in several db_impl_*.cc files, besides db_impl.cc. |
176 | | class DBImpl : public DB { |
177 | | public: |
178 | | DBImpl(const DBOptions& options, const std::string& dbname, |
179 | | const bool seq_per_batch = false, const bool batch_per_txn = true, |
180 | | bool read_only = false); |
181 | | // No copying allowed |
182 | | DBImpl(const DBImpl&) = delete; |
183 | | void operator=(const DBImpl&) = delete; |
184 | | |
185 | | virtual ~DBImpl(); |
186 | | |
187 | | // ---- Implementations of the DB interface ---- |
188 | | |
189 | | using DB::Resume; |
190 | | Status Resume() override; |
191 | | |
192 | | using DB::Put; |
193 | | Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, |
194 | | const Slice& key, const Slice& value) override; |
195 | | Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, |
196 | | const Slice& key, const Slice& ts, const Slice& value) override; |
197 | | |
198 | | using DB::PutEntity; |
199 | | Status PutEntity(const WriteOptions& options, |
200 | | ColumnFamilyHandle* column_family, const Slice& key, |
201 | | const WideColumns& columns) override; |
202 | | Status PutEntity(const WriteOptions& options, const Slice& key, |
203 | | const AttributeGroups& attribute_groups) override; |
204 | | |
205 | | using DB::Merge; |
206 | | Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, |
207 | | const Slice& key, const Slice& value) override; |
208 | | Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, |
209 | | const Slice& key, const Slice& ts, const Slice& value) override; |
210 | | |
211 | | using DB::Delete; |
212 | | Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, |
213 | | const Slice& key) override; |
214 | | Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family, |
215 | | const Slice& key, const Slice& ts) override; |
216 | | |
217 | | using DB::SingleDelete; |
218 | | Status SingleDelete(const WriteOptions& options, |
219 | | ColumnFamilyHandle* column_family, |
220 | | const Slice& key) override; |
221 | | Status SingleDelete(const WriteOptions& options, |
222 | | ColumnFamilyHandle* column_family, const Slice& key, |
223 | | const Slice& ts) override; |
224 | | |
225 | | using DB::DeleteRange; |
226 | | Status DeleteRange(const WriteOptions& options, |
227 | | ColumnFamilyHandle* column_family, const Slice& begin_key, |
228 | | const Slice& end_key) override; |
229 | | Status DeleteRange(const WriteOptions& options, |
230 | | ColumnFamilyHandle* column_family, const Slice& begin_key, |
231 | | const Slice& end_key, const Slice& ts) override; |
232 | | |
233 | | using DB::Write; |
234 | | Status Write(const WriteOptions& options, WriteBatch* updates) override; |
235 | | |
236 | | using DB::WriteWithCallback; |
237 | | Status WriteWithCallback(const WriteOptions& options, WriteBatch* updates, |
238 | | UserWriteCallback* user_write_cb) override; |
239 | | |
240 | | using DB::Get; |
241 | | Status Get(const ReadOptions& _read_options, |
242 | | ColumnFamilyHandle* column_family, const Slice& key, |
243 | | PinnableSlice* value, std::string* timestamp) override; |
244 | | |
245 | | using DB::GetEntity; |
246 | | Status GetEntity(const ReadOptions& options, |
247 | | ColumnFamilyHandle* column_family, const Slice& key, |
248 | | PinnableWideColumns* columns) override; |
249 | | Status GetEntity(const ReadOptions& options, const Slice& key, |
250 | | PinnableAttributeGroups* result) override; |
251 | | |
252 | | using DB::GetMergeOperands; |
253 | | Status GetMergeOperands(const ReadOptions& options, |
254 | | ColumnFamilyHandle* column_family, const Slice& key, |
255 | | PinnableSlice* merge_operands, |
256 | | GetMergeOperandsOptions* get_merge_operands_options, |
257 | 0 | int* number_of_operands) override { |
258 | 0 | GetImplOptions get_impl_options; |
259 | 0 | get_impl_options.column_family = column_family; |
260 | 0 | get_impl_options.merge_operands = merge_operands; |
261 | 0 | get_impl_options.get_merge_operands_options = get_merge_operands_options; |
262 | 0 | get_impl_options.number_of_operands = number_of_operands; |
263 | 0 | get_impl_options.get_value = false; |
264 | 0 | return GetImpl(options, key, get_impl_options); |
265 | 0 | } |
266 | | |
267 | | using DB::MultiGet; |
268 | | // This MultiGet is a batched version, which may be faster than calling Get |
269 | | // multiple times, especially if the keys have some spatial locality that |
270 | | // enables them to be queried in the same SST files/set of files. The larger |
271 | | // the batch size, the more scope for batching and performance improvement |
272 | | // The values and statuses parameters are arrays with number of elements |
273 | | // equal to keys.size(). This allows the storage for those to be alloacted |
274 | | // by the caller on the stack for small batches |
275 | | void MultiGet(const ReadOptions& _read_options, const size_t num_keys, |
276 | | ColumnFamilyHandle** column_families, const Slice* keys, |
277 | | PinnableSlice* values, std::string* timestamps, |
278 | | Status* statuses, const bool sorted_input = false) override; |
279 | | |
280 | | void MultiGetWithCallback( |
281 | | const ReadOptions& _read_options, ColumnFamilyHandle* column_family, |
282 | | ReadCallback* callback, |
283 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys); |
284 | | |
285 | | using DB::MultiGetEntity; |
286 | | |
287 | | void MultiGetEntity(const ReadOptions& options, |
288 | | ColumnFamilyHandle* column_family, size_t num_keys, |
289 | | const Slice* keys, PinnableWideColumns* results, |
290 | | Status* statuses, bool sorted_input) override; |
291 | | |
292 | | void MultiGetEntity(const ReadOptions& options, size_t num_keys, |
293 | | ColumnFamilyHandle** column_families, const Slice* keys, |
294 | | PinnableWideColumns* results, Status* statuses, |
295 | | bool sorted_input) override; |
296 | | void MultiGetEntity(const ReadOptions& options, size_t num_keys, |
297 | | const Slice* keys, |
298 | | PinnableAttributeGroups* results) override; |
299 | | |
300 | | void MultiGetEntityWithCallback( |
301 | | const ReadOptions& read_options, ColumnFamilyHandle* column_family, |
302 | | ReadCallback* callback, |
303 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys); |
304 | | |
305 | | Status CreateColumnFamily(const ColumnFamilyOptions& cf_options, |
306 | | const std::string& column_family, |
307 | 368 | ColumnFamilyHandle** handle) override { |
308 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
309 | 368 | return CreateColumnFamily(ReadOptions(), WriteOptions(), cf_options, |
310 | 368 | column_family, handle); |
311 | 368 | } |
312 | | virtual Status CreateColumnFamily(const ReadOptions& read_options, |
313 | | const WriteOptions& write_options, |
314 | | const ColumnFamilyOptions& cf_options, |
315 | | const std::string& column_family, |
316 | | ColumnFamilyHandle** handle); |
317 | | Status CreateColumnFamilies( |
318 | | const ColumnFamilyOptions& cf_options, |
319 | | const std::vector<std::string>& column_family_names, |
320 | 0 | std::vector<ColumnFamilyHandle*>* handles) override { |
321 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
322 | 0 | return CreateColumnFamilies(ReadOptions(), WriteOptions(), cf_options, |
323 | 0 | column_family_names, handles); |
324 | 0 | } |
325 | | virtual Status CreateColumnFamilies( |
326 | | const ReadOptions& read_options, const WriteOptions& write_options, |
327 | | const ColumnFamilyOptions& cf_options, |
328 | | const std::vector<std::string>& column_family_names, |
329 | | std::vector<ColumnFamilyHandle*>* handles); |
330 | | |
331 | | Status CreateColumnFamilies( |
332 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
333 | 0 | std::vector<ColumnFamilyHandle*>* handles) override { |
334 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
335 | 0 | return CreateColumnFamilies(ReadOptions(), WriteOptions(), column_families, |
336 | 0 | handles); |
337 | 0 | } |
338 | | virtual Status CreateColumnFamilies( |
339 | | const ReadOptions& read_options, const WriteOptions& write_options, |
340 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
341 | | std::vector<ColumnFamilyHandle*>* handles); |
342 | | Status DropColumnFamily(ColumnFamilyHandle* column_family) override; |
343 | | Status DropColumnFamilies( |
344 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
345 | | |
346 | | // Returns false if key doesn't exist in the database and true if it may. |
347 | | // If value_found is not passed in as null, then return the value if found in |
348 | | // memory. On return, if value was found, then value_found will be set to true |
349 | | // , otherwise false. |
350 | | using DB::KeyMayExist; |
351 | | bool KeyMayExist(const ReadOptions& options, |
352 | | ColumnFamilyHandle* column_family, const Slice& key, |
353 | | std::string* value, std::string* timestamp, |
354 | | bool* value_found = nullptr) override; |
355 | | |
356 | | using DB::NewIterator; |
357 | | Iterator* NewIterator(const ReadOptions& _read_options, |
358 | | ColumnFamilyHandle* column_family) override; |
359 | | Status NewIterators(const ReadOptions& _read_options, |
360 | | const std::vector<ColumnFamilyHandle*>& column_families, |
361 | | std::vector<Iterator*>* iterators) override; |
362 | | |
363 | | const Snapshot* GetSnapshot() override; |
364 | | void ReleaseSnapshot(const Snapshot* snapshot) override; |
365 | | |
366 | | // EXPERIMENTAL |
367 | | std::unique_ptr<Iterator> NewCoalescingIterator( |
368 | | const ReadOptions& options, |
369 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
370 | | |
371 | | // EXPERIMENTAL |
372 | | std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator( |
373 | | const ReadOptions& options, |
374 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
375 | | |
376 | | // Create a timestamped snapshot. This snapshot can be shared by multiple |
377 | | // readers. If any of them uses it for write conflict checking, then |
378 | | // is_write_conflict_boundary is true. For simplicity, set it to true by |
379 | | // default. |
380 | | std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot( |
381 | | SequenceNumber snapshot_seq, uint64_t ts); |
382 | | std::shared_ptr<const SnapshotImpl> GetTimestampedSnapshot(uint64_t ts) const; |
383 | | void ReleaseTimestampedSnapshotsOlderThan( |
384 | | uint64_t ts, size_t* remaining_total_ss = nullptr); |
385 | | Status GetTimestampedSnapshots(uint64_t ts_lb, uint64_t ts_ub, |
386 | | std::vector<std::shared_ptr<const Snapshot>>& |
387 | | timestamped_snapshots) const; |
388 | | |
389 | | using DB::GetProperty; |
390 | | bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property, |
391 | | std::string* value) override; |
392 | | using DB::GetMapProperty; |
393 | | bool GetMapProperty(ColumnFamilyHandle* column_family, const Slice& property, |
394 | | std::map<std::string, std::string>* value) override; |
395 | | using DB::GetIntProperty; |
396 | | bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property, |
397 | | uint64_t* value) override; |
398 | | using DB::GetAggregatedIntProperty; |
399 | | bool GetAggregatedIntProperty(const Slice& property, |
400 | | uint64_t* aggregated_value) override; |
401 | | using DB::GetApproximateSizes; |
402 | | Status GetApproximateSizes(const SizeApproximationOptions& options, |
403 | | ColumnFamilyHandle* column_family, |
404 | | const Range* range, int n, |
405 | | uint64_t* sizes) override; |
406 | | using DB::GetApproximateMemTableStats; |
407 | | void GetApproximateMemTableStats(ColumnFamilyHandle* column_family, |
408 | | const Range& range, uint64_t* const count, |
409 | | uint64_t* const size) override; |
410 | | using DB::CompactRange; |
411 | | Status CompactRange(const CompactRangeOptions& options, |
412 | | ColumnFamilyHandle* column_family, const Slice* begin, |
413 | | const Slice* end) override; |
414 | | |
415 | | using DB::CompactFiles; |
416 | | Status CompactFiles( |
417 | | const CompactionOptions& compact_options, |
418 | | ColumnFamilyHandle* column_family, |
419 | | const std::vector<std::string>& input_file_names, const int output_level, |
420 | | const int output_path_id = -1, |
421 | | std::vector<std::string>* const output_file_names = nullptr, |
422 | | CompactionJobInfo* compaction_job_info = nullptr) override; |
423 | | |
424 | | Status PauseBackgroundWork() override; |
425 | | Status ContinueBackgroundWork() override; |
426 | | |
427 | | Status EnableAutoCompaction( |
428 | | const std::vector<ColumnFamilyHandle*>& column_family_handles) override; |
429 | | |
430 | | void EnableManualCompaction() override; |
431 | | void DisableManualCompaction() override; |
432 | | |
433 | | using DB::SetOptions; |
434 | | Status SetOptions( |
435 | | ColumnFamilyHandle* column_family, |
436 | | const std::unordered_map<std::string, std::string>& options_map) override; |
437 | | |
438 | | Status SetDBOptions( |
439 | | const std::unordered_map<std::string, std::string>& options_map) override; |
440 | | |
441 | | using DB::NumberLevels; |
442 | | int NumberLevels(ColumnFamilyHandle* column_family) override; |
443 | | using DB::MaxMemCompactionLevel; |
444 | | int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) override; |
445 | | using DB::Level0StopWriteTrigger; |
446 | | int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) override; |
447 | | const std::string& GetName() const override; |
448 | | Env* GetEnv() const override; |
449 | | FileSystem* GetFileSystem() const override; |
450 | | using DB::GetOptions; |
451 | | Options GetOptions(ColumnFamilyHandle* column_family) const override; |
452 | | using DB::GetDBOptions; |
453 | | DBOptions GetDBOptions() const override; |
454 | | using DB::Flush; |
455 | | Status Flush(const FlushOptions& options, |
456 | | ColumnFamilyHandle* column_family) override; |
457 | | Status Flush( |
458 | | const FlushOptions& options, |
459 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
460 | 0 | Status FlushWAL(bool sync) override { |
461 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
462 | 0 | return FlushWAL(WriteOptions(), sync); |
463 | 0 | } |
464 | | |
465 | | virtual Status FlushWAL(const WriteOptions& write_options, bool sync); |
466 | | bool WALBufferIsEmpty(); |
467 | | Status SyncWAL() override; |
468 | | Status LockWAL() override; |
469 | | Status UnlockWAL() override; |
470 | | |
471 | | SequenceNumber GetLatestSequenceNumber() const override; |
472 | | |
473 | | // IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire |
474 | | // and release db_mutex |
475 | | Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family, |
476 | | std::string ts_low) override; |
477 | | |
478 | | // GetFullHistoryTsLow(ColumnFamilyHandle*, std::string*) will acquire and |
479 | | // release db_mutex |
480 | | Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family, |
481 | | std::string* ts_low) override; |
482 | | |
483 | | Status GetDbIdentity(std::string& identity) const override; |
484 | | |
485 | | virtual Status GetDbIdentityFromIdentityFile(std::string* identity) const; |
486 | | |
487 | | Status GetDbSessionId(std::string& session_id) const override; |
488 | | |
489 | | ColumnFamilyHandle* DefaultColumnFamily() const override; |
490 | | |
491 | | ColumnFamilyHandle* PersistentStatsColumnFamily() const; |
492 | | |
493 | | Status Close() override; |
494 | | |
495 | | Status DisableFileDeletions() override; |
496 | | |
497 | | Status EnableFileDeletions() override; |
498 | | |
499 | | virtual bool IsFileDeletionsEnabled() const; |
500 | | |
501 | | Status GetStatsHistory( |
502 | | uint64_t start_time, uint64_t end_time, |
503 | | std::unique_ptr<StatsHistoryIterator>* stats_iterator) override; |
504 | | |
505 | | using DB::ResetStats; |
506 | | Status ResetStats() override; |
507 | | // All the returned filenames start with "/" |
508 | | Status GetLiveFiles(std::vector<std::string>&, uint64_t* manifest_file_size, |
509 | | bool flush_memtable = true) override; |
510 | | Status GetSortedWalFiles(VectorWalPtr& files) override; |
511 | | Status GetSortedWalFilesImpl(VectorWalPtr& files, bool need_seqnos); |
512 | | |
513 | | // Get the known flushed sizes of WALs that might still be written to |
514 | | // or have pending sync. |
515 | | // NOTE: unlike alive_log_files_, this function includes WALs that might |
516 | | // be obsolete (but not obsolete to a pending Checkpoint) and not yet fully |
517 | | // synced. |
518 | | Status GetOpenWalSizes(std::map<uint64_t, uint64_t>& number_to_size); |
519 | | Status GetCurrentWalFile(std::unique_ptr<WalFile>* current_log_file) override; |
520 | | Status GetCreationTimeOfOldestFile(uint64_t* creation_time) override; |
521 | | |
522 | | Status GetUpdatesSince( |
523 | | SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter, |
524 | | const TransactionLogIterator::ReadOptions& read_options = |
525 | | TransactionLogIterator::ReadOptions()) override; |
526 | | Status DeleteFile(std::string name) override; |
527 | | Status DeleteFilesInRanges(ColumnFamilyHandle* column_family, |
528 | | const RangePtr* ranges, size_t n, |
529 | | bool include_end = true); |
530 | | |
531 | | void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) override; |
532 | | |
533 | | Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) override; |
534 | | |
535 | | Status GetLiveFilesStorageInfo( |
536 | | const LiveFilesStorageInfoOptions& opts, |
537 | | std::vector<LiveFileStorageInfo>* files) override; |
538 | | |
539 | | // Obtains the meta data of the specified column family of the DB. |
540 | | // TODO(yhchiang): output parameter is placed in the end in this codebase. |
541 | | void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family, |
542 | | ColumnFamilyMetaData* metadata) override; |
543 | | |
544 | | void GetAllColumnFamilyMetaData( |
545 | | std::vector<ColumnFamilyMetaData>* metadata) override; |
546 | | |
547 | | Status SuggestCompactRange(ColumnFamilyHandle* column_family, |
548 | | const Slice* begin, const Slice* end) override; |
549 | | |
550 | | Status PromoteL0(ColumnFamilyHandle* column_family, |
551 | | int target_level) override; |
552 | | |
553 | | using DB::IngestExternalFile; |
554 | | Status IngestExternalFile( |
555 | | ColumnFamilyHandle* column_family, |
556 | | const std::vector<std::string>& external_files, |
557 | | const IngestExternalFileOptions& ingestion_options) override; |
558 | | |
559 | | using DB::IngestExternalFiles; |
560 | | Status IngestExternalFiles( |
561 | | const std::vector<IngestExternalFileArg>& args) override; |
562 | | |
563 | | using DB::CreateColumnFamilyWithImport; |
564 | | Status CreateColumnFamilyWithImport( |
565 | | const ColumnFamilyOptions& options, const std::string& column_family_name, |
566 | | const ImportColumnFamilyOptions& import_options, |
567 | | const std::vector<const ExportImportFilesMetaData*>& metadatas, |
568 | | ColumnFamilyHandle** handle) override; |
569 | | |
570 | | using DB::ClipColumnFamily; |
571 | | Status ClipColumnFamily(ColumnFamilyHandle* column_family, |
572 | | const Slice& begin_key, |
573 | | const Slice& end_key) override; |
574 | | |
575 | | using DB::VerifyFileChecksums; |
576 | | Status VerifyFileChecksums(const ReadOptions& read_options) override; |
577 | | |
578 | | using DB::VerifyChecksum; |
579 | | Status VerifyChecksum(const ReadOptions& /*read_options*/) override; |
580 | | // Verify the checksums of files in db. Currently only tables are checked. |
581 | | // |
582 | | // read_options: controls file I/O behavior, e.g. read ahead size while |
583 | | // reading all the live table files. |
584 | | // |
585 | | // use_file_checksum: if false, verify the block checksums of all live table |
586 | | // in db. Otherwise, obtain the file checksums and compare |
587 | | // with the MANIFEST. Currently, file checksums are |
588 | | // recomputed by reading all table files. |
589 | | // |
590 | | // Returns: OK if there is no file whose file or block checksum mismatches. |
591 | | Status VerifyChecksumInternal(const ReadOptions& read_options, |
592 | | bool use_file_checksum); |
593 | | |
594 | | Status VerifyFullFileChecksum(const std::string& file_checksum_expected, |
595 | | const std::string& func_name_expected, |
596 | | const std::string& fpath, |
597 | | const ReadOptions& read_options); |
598 | | |
599 | | using DB::StartTrace; |
600 | | Status StartTrace(const TraceOptions& options, |
601 | | std::unique_ptr<TraceWriter>&& trace_writer) override; |
602 | | |
603 | | using DB::EndTrace; |
604 | | Status EndTrace() override; |
605 | | |
606 | | using DB::NewDefaultReplayer; |
607 | | Status NewDefaultReplayer(const std::vector<ColumnFamilyHandle*>& handles, |
608 | | std::unique_ptr<TraceReader>&& reader, |
609 | | std::unique_ptr<Replayer>* replayer) override; |
610 | | |
611 | | using DB::StartBlockCacheTrace; |
612 | | Status StartBlockCacheTrace( |
613 | | const TraceOptions& trace_options, |
614 | | std::unique_ptr<TraceWriter>&& trace_writer) override; |
615 | | |
616 | | Status StartBlockCacheTrace( |
617 | | const BlockCacheTraceOptions& options, |
618 | | std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) override; |
619 | | |
620 | | using DB::EndBlockCacheTrace; |
621 | | Status EndBlockCacheTrace() override; |
622 | | |
623 | | using DB::StartIOTrace; |
624 | | Status StartIOTrace(const TraceOptions& options, |
625 | | std::unique_ptr<TraceWriter>&& trace_writer) override; |
626 | | |
627 | | using DB::EndIOTrace; |
628 | | Status EndIOTrace() override; |
629 | | |
630 | | using DB::GetPropertiesOfAllTables; |
631 | | Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family, |
632 | | TablePropertiesCollection* props) override; |
633 | | Status GetPropertiesOfTablesInRange( |
634 | | ColumnFamilyHandle* column_family, const Range* range, std::size_t n, |
635 | | TablePropertiesCollection* props) override; |
636 | | |
637 | | // ---- End of implementations of the DB interface ---- |
638 | | SystemClock* GetSystemClock() const; |
639 | | |
640 | | struct GetImplOptions { |
641 | | ColumnFamilyHandle* column_family = nullptr; |
642 | | PinnableSlice* value = nullptr; |
643 | | PinnableWideColumns* columns = nullptr; |
644 | | std::string* timestamp = nullptr; |
645 | | bool* value_found = nullptr; |
646 | | ReadCallback* callback = nullptr; |
647 | | bool* is_blob_index = nullptr; |
648 | | // If true return value associated with key via value pointer else return |
649 | | // all merge operands for key via merge_operands pointer |
650 | | bool get_value = true; |
651 | | // Pointer to an array of size |
652 | | // get_merge_operands_options.expected_max_number_of_operands allocated by |
653 | | // user |
654 | | PinnableSlice* merge_operands = nullptr; |
655 | | GetMergeOperandsOptions* get_merge_operands_options = nullptr; |
656 | | int* number_of_operands = nullptr; |
657 | | }; |
658 | | |
659 | | Status GetImpl(const ReadOptions& read_options, |
660 | | ColumnFamilyHandle* column_family, const Slice& key, |
661 | | PinnableSlice* value); |
662 | | |
663 | | Status GetImpl(const ReadOptions& read_options, |
664 | | ColumnFamilyHandle* column_family, const Slice& key, |
665 | | PinnableSlice* value, std::string* timestamp); |
666 | | |
667 | | // Function that Get and KeyMayExist call with no_io true or false |
668 | | // Note: 'value_found' from KeyMayExist propagates here |
669 | | // This function is also called by GetMergeOperands |
670 | | // If get_impl_options.get_value = true get value associated with |
671 | | // get_impl_options.key via get_impl_options.value |
672 | | // If get_impl_options.get_value = false get merge operands associated with |
673 | | // get_impl_options.key via get_impl_options.merge_operands |
674 | | virtual Status GetImpl(const ReadOptions& options, const Slice& key, |
675 | | GetImplOptions& get_impl_options); |
676 | | |
677 | | // If `snapshot` == kMaxSequenceNumber, set a recent one inside the file. |
678 | | ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, |
679 | | ColumnFamilyHandleImpl* cfh, |
680 | | SuperVersion* sv, SequenceNumber snapshot, |
681 | | ReadCallback* read_callback, |
682 | | bool expose_blob_index = false, |
683 | | bool allow_refresh = true); |
684 | | |
685 | 1.47k | virtual SequenceNumber GetLastPublishedSequence() const { |
686 | 1.47k | if (last_seq_same_as_publish_seq_) { |
687 | 1.47k | return versions_->LastSequence(); |
688 | 1.47k | } else { |
689 | 0 | return versions_->LastPublishedSequence(); |
690 | 0 | } |
691 | 1.47k | } |
692 | | |
693 | | // REQUIRES: joined the main write queue if two_write_queues is disabled, and |
694 | | // the second write queue otherwise. |
695 | | virtual void SetLastPublishedSequence(SequenceNumber seq); |
696 | | // Returns LastSequence in last_seq_same_as_publish_seq_ |
697 | | // mode and LastAllocatedSequence otherwise. This is useful when visiblility |
698 | | // depends also on data written to the WAL but not to the memtable. |
699 | | SequenceNumber TEST_GetLastVisibleSequence() const; |
700 | | |
701 | | // Similar to Write() but will call the callback once on the single write |
702 | | // thread to determine whether it is safe to perform the write. |
703 | | virtual Status WriteWithCallback(const WriteOptions& write_options, |
704 | | WriteBatch* my_batch, |
705 | | WriteCallback* callback, |
706 | | UserWriteCallback* user_write_cb = nullptr); |
707 | | |
708 | | // Returns the sequence number that is guaranteed to be smaller than or equal |
709 | | // to the sequence number of any key that could be inserted into the current |
710 | | // memtables. It can then be assumed that any write with a larger(or equal) |
711 | | // sequence number will be present in this memtable or a later memtable. |
712 | | // |
713 | | // If the earliest sequence number could not be determined, |
714 | | // kMaxSequenceNumber will be returned. |
715 | | // |
716 | | // If include_history=true, will also search Memtables in MemTableList |
717 | | // History. |
718 | | SequenceNumber GetEarliestMemTableSequenceNumber(SuperVersion* sv, |
719 | | bool include_history); |
720 | | |
721 | | // For a given key, check to see if there are any records for this key |
722 | | // in the memtables, including memtable history. If cache_only is false, |
723 | | // SST files will also be checked. |
724 | | // |
725 | | // `key` should NOT have user-defined timestamp appended to user key even if |
726 | | // timestamp is enabled. |
727 | | // |
728 | | // If a key is found, *found_record_for_key will be set to true and |
729 | | // *seq will be set to the stored sequence number for the latest |
730 | | // operation on this key or kMaxSequenceNumber if unknown. If user-defined |
731 | | // timestamp is enabled for this column family and timestamp is not nullptr, |
732 | | // then *timestamp will be set to the stored timestamp for the latest |
733 | | // operation on this key. |
734 | | // If no key is found, *found_record_for_key will be set to false. |
735 | | // |
736 | | // Note: If cache_only=false, it is possible for *seq to be set to 0 if |
737 | | // the sequence number has been cleared from the record. If the caller is |
738 | | // holding an active db snapshot, we know the missing sequence must be less |
739 | | // than the snapshot's sequence number (sequence numbers are only cleared |
740 | | // when there are no earlier active snapshots). |
741 | | // |
742 | | // If NotFound is returned and found_record_for_key is set to false, then no |
743 | | // record for this key was found. If the caller is holding an active db |
744 | | // snapshot, we know that no key could have existing after this snapshot |
745 | | // (since we do not compact keys that have an earlier snapshot). |
746 | | // |
747 | | // Only records newer than or at `lower_bound_seq` are guaranteed to be |
748 | | // returned. Memtables and files may not be checked if it only contains data |
749 | | // older than `lower_bound_seq`. |
750 | | // |
751 | | // Returns OK or NotFound on success, |
752 | | // other status on unexpected error. |
753 | | // TODO(andrewkr): this API need to be aware of range deletion operations |
754 | | Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key, |
755 | | bool cache_only, |
756 | | SequenceNumber lower_bound_seq, |
757 | | SequenceNumber* seq, std::string* timestamp, |
758 | | bool* found_record_for_key, |
759 | | bool* is_blob_index); |
760 | | |
761 | | Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key, |
762 | | const Slice& lower_bound, const Slice upper_bound); |
763 | | Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key, |
764 | | const Slice& lower_bound, |
765 | | const Slice upper_bound); |
766 | | |
767 | | // Similar to GetSnapshot(), but also lets the db know that this snapshot |
768 | | // will be used for transaction write-conflict checking. The DB can then |
769 | | // make sure not to compact any keys that would prevent a write-conflict from |
770 | | // being detected. |
771 | | const Snapshot* GetSnapshotForWriteConflictBoundary(); |
772 | | |
773 | | // checks if all live files exist on file system and that their file sizes |
774 | | // match to our in-memory records |
775 | | virtual Status CheckConsistency(); |
776 | | |
777 | | // max_file_num_to_ignore allows bottom level compaction to filter out newly |
778 | | // compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will |
779 | | // disable the filtering |
780 | | // If `final_output_level` is not nullptr, it is set to manual compaction's |
781 | | // output level if returned status is OK, and it may or may not be set to |
782 | | // manual compaction's output level if returned status is not OK. |
783 | | Status RunManualCompaction(ColumnFamilyData* cfd, int input_level, |
784 | | int output_level, |
785 | | const CompactRangeOptions& compact_range_options, |
786 | | const Slice* begin, const Slice* end, |
787 | | bool exclusive, bool disallow_trivial_move, |
788 | | uint64_t max_file_num_to_ignore, |
789 | | const std::string& trim_ts, |
790 | | int* final_output_level = nullptr); |
791 | | |
792 | | // Return an internal iterator over the current state of the database. |
793 | | // The keys of this iterator are internal keys (see format.h). |
794 | | // The returned iterator should be deleted when no longer needed. |
795 | | // If allow_unprepared_value is true, the returned iterator may defer reading |
796 | | // the value and so will require PrepareValue() to be called before value(); |
797 | | // allow_unprepared_value = false is convenient when this optimization is not |
798 | | // useful, e.g. when reading the whole column family. |
799 | | // |
800 | | // read_options.ignore_range_deletions determines whether range tombstones are |
801 | | // processed in the returned interator internally, i.e., whether range |
802 | | // tombstone covered keys are in this iterator's output. |
803 | | // @param read_options Must outlive the returned iterator. |
804 | | InternalIterator* NewInternalIterator( |
805 | | const ReadOptions& read_options, Arena* arena, SequenceNumber sequence, |
806 | | ColumnFamilyHandle* column_family = nullptr, |
807 | | bool allow_unprepared_value = false); |
808 | | |
809 | | // Note: to support DB iterator refresh, memtable range tombstones in the |
810 | | // underlying merging iterator needs to be refreshed. If db_iter is not |
811 | | // nullptr, db_iter->SetMemtableRangetombstoneIter() is called with the |
812 | | // memtable range tombstone iterator used by the underlying merging iterator. |
813 | | // This range tombstone iterator can be refreshed later by db_iter. |
814 | | // @param read_options Must outlive the returned iterator. |
815 | | InternalIterator* NewInternalIterator(const ReadOptions& read_options, |
816 | | ColumnFamilyData* cfd, |
817 | | SuperVersion* super_version, |
818 | | Arena* arena, SequenceNumber sequence, |
819 | | bool allow_unprepared_value, |
820 | | ArenaWrappedDBIter* db_iter = nullptr); |
821 | | |
822 | 0 | LogsWithPrepTracker* logs_with_prep_tracker() { |
823 | 0 | return &logs_with_prep_tracker_; |
824 | 0 | } |
825 | | |
826 | | struct BGJobLimits { |
827 | | int max_flushes; |
828 | | int max_compactions; |
829 | | }; |
830 | | // Returns maximum background flushes and compactions allowed to be scheduled |
831 | | BGJobLimits GetBGJobLimits() const; |
832 | | // Need a static version that can be called during SanitizeOptions(). |
833 | | static BGJobLimits GetBGJobLimits(int max_background_flushes, |
834 | | int max_background_compactions, |
835 | | int max_background_jobs, |
836 | | bool parallelize_compactions); |
837 | | |
838 | | // move logs pending closing from job_context to the DB queue and |
839 | | // schedule a purge |
840 | | void ScheduleBgLogWriterClose(JobContext* job_context); |
841 | | |
842 | | uint64_t MinLogNumberToKeep(); |
843 | | |
844 | | uint64_t MinLogNumberToRecycle(); |
845 | | |
846 | | // Returns the lower bound file number for SSTs that won't be deleted, even if |
847 | | // they're obsolete. This lower bound is used internally to prevent newly |
848 | | // created flush/compaction output files from being deleted before they're |
849 | | // installed. This technique avoids the need for tracking the exact numbers of |
850 | | // files pending creation, although it prevents more files than necessary from |
851 | | // being deleted. |
852 | | uint64_t MinObsoleteSstNumberToKeep(); |
853 | | |
854 | | uint64_t GetObsoleteSstFilesSize(); |
855 | | |
856 | | // Returns the list of live files in 'live' and the list |
857 | | // of all files in the filesystem in 'candidate_files'. |
858 | | // If force == false and the last call was less than |
859 | | // db_options_.delete_obsolete_files_period_micros microseconds ago, |
860 | | // it will not fill up the job_context |
861 | | void FindObsoleteFiles(JobContext* job_context, bool force, |
862 | | bool no_full_scan = false); |
863 | | |
864 | | // Diffs the files listed in filenames and those that do not |
865 | | // belong to live files are possibly removed. Also, removes all the |
866 | | // files in sst_delete_files and log_delete_files. |
867 | | // It is not necessary to hold the mutex when invoking this method. |
868 | | // If FindObsoleteFiles() was run, we need to also run |
869 | | // PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true |
870 | | void PurgeObsoleteFiles(JobContext& background_contet, |
871 | | bool schedule_only = false); |
872 | | |
873 | | // Schedule a background job to actually delete obsolete files. |
874 | | void SchedulePurge(); |
875 | | |
876 | 0 | const SnapshotList& snapshots() const { return snapshots_; } |
877 | | |
878 | | // load list of snapshots to `snap_vector` that is no newer than `max_seq` |
879 | | // in ascending order. |
880 | | // `oldest_write_conflict_snapshot` is filled with the oldest snapshot |
881 | | // which satisfies SnapshotImpl.is_write_conflict_boundary_ = true. |
882 | | void LoadSnapshots(std::vector<SequenceNumber>* snap_vector, |
883 | | SequenceNumber* oldest_write_conflict_snapshot, |
884 | 0 | const SequenceNumber& max_seq) const { |
885 | 0 | InstrumentedMutexLock l(mutex()); |
886 | 0 | snapshots().GetAll(snap_vector, oldest_write_conflict_snapshot, max_seq); |
887 | 0 | } |
888 | | |
889 | 0 | const ImmutableDBOptions& immutable_db_options() const { |
890 | 0 | return immutable_db_options_; |
891 | 0 | } |
892 | | |
893 | | // Cancel all background jobs, including flush, compaction, background |
894 | | // purging, stats dumping threads, etc. If `wait` = true, wait for the |
895 | | // running jobs to abort or finish before returning. Otherwise, only |
896 | | // sends the signals. |
897 | | void CancelAllBackgroundWork(bool wait); |
898 | | |
899 | | // Find Super version and reference it. Based on options, it might return |
900 | | // the thread local cached one. |
901 | | // Call ReturnAndCleanupSuperVersion() when it is no longer needed. |
902 | | SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd); |
903 | | |
904 | | // Similar to the previous function but looks up based on a column family id. |
905 | | // nullptr will be returned if this column family no longer exists. |
906 | | // REQUIRED: this function should only be called on the write thread or if the |
907 | | // mutex is held. |
908 | | SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id); |
909 | | |
910 | | // Un-reference the super version and clean it up if it is the last reference. |
911 | | void CleanupSuperVersion(SuperVersion* sv); |
912 | | |
913 | | // Un-reference the super version and return it to thread local cache if |
914 | | // needed. If it is the last reference of the super version. Clean it up |
915 | | // after un-referencing it. |
916 | | void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, SuperVersion* sv); |
917 | | |
918 | | // Similar to the previous function but looks up based on a column family id. |
919 | | // nullptr will be returned if this column family no longer exists. |
920 | | // REQUIRED: this function should only be called on the write thread. |
921 | | void ReturnAndCleanupSuperVersion(uint32_t colun_family_id, SuperVersion* sv); |
922 | | |
923 | | // REQUIRED: this function should only be called on the write thread or if the |
924 | | // mutex is held. Return value only valid until next call to this function or |
925 | | // mutex is released. |
926 | | ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id); |
927 | | |
928 | | // Same as above, should called without mutex held and not on write thread. |
929 | | std::unique_ptr<ColumnFamilyHandle> GetColumnFamilyHandleUnlocked( |
930 | | uint32_t column_family_id); |
931 | | |
932 | | // Returns the number of currently running flushes. |
933 | | // REQUIREMENT: mutex_ must be held when calling this function. |
934 | 0 | int num_running_flushes() { |
935 | 0 | mutex_.AssertHeld(); |
936 | 0 | return num_running_flushes_; |
937 | 0 | } |
938 | | |
939 | | // Returns the number of currently running compactions. |
940 | | // REQUIREMENT: mutex_ must be held when calling this function. |
941 | 0 | int num_running_compactions() { |
942 | 0 | mutex_.AssertHeld(); |
943 | 0 | return num_running_compactions_; |
944 | 0 | } |
945 | | |
946 | 0 | const WriteController& write_controller() { return write_controller_; } |
947 | | |
948 | | // hollow transactions shell used for recovery. |
949 | | // these will then be passed to TransactionDB so that |
950 | | // locks can be reacquired before writing can resume. |
951 | | struct RecoveredTransaction { |
952 | | std::string name_; |
953 | | bool unprepared_; |
954 | | |
955 | | struct BatchInfo { |
956 | | uint64_t log_number_; |
957 | | // TODO(lth): For unprepared, the memory usage here can be big for |
958 | | // unprepared transactions. This is only useful for rollbacks, and we |
959 | | // can in theory just keep keyset for that. |
960 | | WriteBatch* batch_; |
961 | | // Number of sub-batches. A new sub-batch is created if txn attempts to |
962 | | // insert a duplicate key,seq to memtable. This is currently used in |
963 | | // WritePreparedTxn/WriteUnpreparedTxn. |
964 | | size_t batch_cnt_; |
965 | | }; |
966 | | |
967 | | // This maps the seq of the first key in the batch to BatchInfo, which |
968 | | // contains WriteBatch and other information relevant to the batch. |
969 | | // |
970 | | // For WriteUnprepared, batches_ can have size greater than 1, but for |
971 | | // other write policies, it must be of size 1. |
972 | | std::map<SequenceNumber, BatchInfo> batches_; |
973 | | |
974 | | explicit RecoveredTransaction(const uint64_t log, const std::string& name, |
975 | | WriteBatch* batch, SequenceNumber seq, |
976 | | size_t batch_cnt, bool unprepared) |
977 | 0 | : name_(name), unprepared_(unprepared) { |
978 | 0 | batches_[seq] = {log, batch, batch_cnt}; |
979 | 0 | } |
980 | | |
981 | 0 | ~RecoveredTransaction() { |
982 | 0 | for (auto& it : batches_) { |
983 | 0 | delete it.second.batch_; |
984 | 0 | } |
985 | 0 | } |
986 | | |
987 | | void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch, |
988 | 0 | size_t batch_cnt, bool unprepared) { |
989 | 0 | assert(batches_.count(seq) == 0); |
990 | 0 | batches_[seq] = {log_number, batch, batch_cnt}; |
991 | | // Prior state must be unprepared, since the prepare batch must be the |
992 | | // last batch. |
993 | 0 | assert(unprepared_); |
994 | 0 | unprepared_ = unprepared; |
995 | 0 | } |
996 | | }; |
997 | | |
998 | 11.8k | bool allow_2pc() const { return immutable_db_options_.allow_2pc; } |
999 | | |
1000 | | std::unordered_map<std::string, RecoveredTransaction*> |
1001 | 0 | recovered_transactions() { |
1002 | 0 | return recovered_transactions_; |
1003 | 0 | } |
1004 | | |
1005 | 0 | RecoveredTransaction* GetRecoveredTransaction(const std::string& name) { |
1006 | 0 | auto it = recovered_transactions_.find(name); |
1007 | 0 | if (it == recovered_transactions_.end()) { |
1008 | 0 | return nullptr; |
1009 | 0 | } else { |
1010 | 0 | return it->second; |
1011 | 0 | } |
1012 | 0 | } |
1013 | | |
1014 | | void InsertRecoveredTransaction(const uint64_t log, const std::string& name, |
1015 | | WriteBatch* batch, SequenceNumber seq, |
1016 | 0 | size_t batch_cnt, bool unprepared_batch) { |
1017 | | // For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple |
1018 | | // times for every unprepared batch encountered during recovery. |
1019 | | // |
1020 | | // If the transaction is prepared, then the last call to |
1021 | | // InsertRecoveredTransaction will have unprepared_batch = false. |
1022 | 0 | auto rtxn = recovered_transactions_.find(name); |
1023 | 0 | if (rtxn == recovered_transactions_.end()) { |
1024 | 0 | recovered_transactions_[name] = new RecoveredTransaction( |
1025 | 0 | log, name, batch, seq, batch_cnt, unprepared_batch); |
1026 | 0 | } else { |
1027 | 0 | rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch); |
1028 | 0 | } |
1029 | 0 | logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log); |
1030 | 0 | } |
1031 | | |
1032 | 0 | void DeleteRecoveredTransaction(const std::string& name) { |
1033 | 0 | auto it = recovered_transactions_.find(name); |
1034 | 0 | assert(it != recovered_transactions_.end()); |
1035 | 0 | auto* trx = it->second; |
1036 | 0 | recovered_transactions_.erase(it); |
1037 | 0 | for (const auto& info : trx->batches_) { |
1038 | 0 | logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed( |
1039 | 0 | info.second.log_number_); |
1040 | 0 | } |
1041 | 0 | delete trx; |
1042 | 0 | } |
1043 | | |
1044 | 0 | void DeleteAllRecoveredTransactions() { |
1045 | 0 | for (auto it = recovered_transactions_.begin(); |
1046 | 0 | it != recovered_transactions_.end(); ++it) { |
1047 | 0 | delete it->second; |
1048 | 0 | } |
1049 | 0 | recovered_transactions_.clear(); |
1050 | 0 | } |
1051 | | |
1052 | 0 | void AddToLogsToFreeQueue(log::Writer* log_writer) { |
1053 | 0 | mutex_.AssertHeld(); |
1054 | 0 | logs_to_free_queue_.push_back(log_writer); |
1055 | 0 | } |
1056 | | |
1057 | 0 | void AddSuperVersionsToFreeQueue(SuperVersion* sv) { |
1058 | 0 | superversions_to_free_queue_.push_back(sv); |
1059 | 0 | } |
1060 | | |
1061 | | void SetSnapshotChecker(SnapshotChecker* snapshot_checker); |
1062 | | |
1063 | | // Fill JobContext with snapshot information needed by flush and compaction. |
1064 | | void GetSnapshotContext(JobContext* job_context, |
1065 | | std::vector<SequenceNumber>* snapshot_seqs, |
1066 | | SequenceNumber* earliest_write_conflict_snapshot, |
1067 | | SnapshotChecker** snapshot_checker); |
1068 | | |
1069 | | // Not thread-safe. |
1070 | | void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback); |
1071 | | |
1072 | 12.1k | InstrumentedMutex* mutex() const { return &mutex_; } |
1073 | | |
1074 | | // Initialize a brand new DB. The DB directory is expected to be empty before |
1075 | | // calling it. Push new manifest file name into `new_filenames`. |
1076 | | Status NewDB(std::vector<std::string>* new_filenames); |
1077 | | |
1078 | | // This is to be used only by internal rocksdb classes. |
1079 | | static Status Open(const DBOptions& db_options, const std::string& name, |
1080 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
1081 | | std::vector<ColumnFamilyHandle*>* handles, DB** dbptr, |
1082 | | const bool seq_per_batch, const bool batch_per_txn, |
1083 | | const bool is_retry, bool* can_retry); |
1084 | | |
1085 | | static IOStatus CreateAndNewDirectory( |
1086 | | FileSystem* fs, const std::string& dirname, |
1087 | | std::unique_ptr<FSDirectory>* directory); |
1088 | | |
1089 | | // find stats map from stats_history_ with smallest timestamp in |
1090 | | // the range of [start_time, end_time) |
1091 | | bool FindStatsByTime(uint64_t start_time, uint64_t end_time, |
1092 | | uint64_t* new_time, |
1093 | | std::map<std::string, uint64_t>* stats_map); |
1094 | | |
1095 | | // Print information of all tombstones of all iterators to the std::string |
1096 | | // This is only used by ldb. The output might be capped. Tombstones |
1097 | | // printed out are not guaranteed to be in any order. |
1098 | | Status TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family, |
1099 | | int max_entries_to_print, |
1100 | | std::string* out_str); |
1101 | | |
1102 | 0 | VersionSet* GetVersionSet() const { return versions_.get(); } |
1103 | | |
1104 | | Status WaitForCompact( |
1105 | | const WaitForCompactOptions& wait_for_compact_options) override; |
1106 | | |
1107 | | #ifndef NDEBUG |
1108 | | // Compact any files in the named level that overlap [*begin, *end] |
1109 | | Status TEST_CompactRange(int level, const Slice* begin, const Slice* end, |
1110 | | ColumnFamilyHandle* column_family = nullptr, |
1111 | | bool disallow_trivial_move = false); |
1112 | | |
1113 | | Status TEST_SwitchWAL(); |
1114 | | |
1115 | | bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; } |
1116 | | |
1117 | | bool TEST_IsLogGettingFlushed() { |
1118 | | return alive_log_files_.begin()->getting_flushed; |
1119 | | } |
1120 | | |
1121 | | Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr); |
1122 | | |
1123 | | // Force current memtable contents to be flushed. |
1124 | | Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false, |
1125 | | ColumnFamilyHandle* cfh = nullptr); |
1126 | | |
1127 | | Status TEST_FlushMemTable(ColumnFamilyData* cfd, |
1128 | | const FlushOptions& flush_opts); |
1129 | | |
1130 | | // Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This |
1131 | | // is because in certain cases, we can flush column families, wait for the |
1132 | | // flush to complete, but delete the column family handle before the wait |
1133 | | // finishes. For example in CompactRange. |
1134 | | Status TEST_AtomicFlushMemTables( |
1135 | | const autovector<ColumnFamilyData*>& provided_candidate_cfds, |
1136 | | const FlushOptions& flush_opts); |
1137 | | |
1138 | | // Wait for background threads to complete scheduled work. |
1139 | | Status TEST_WaitForBackgroundWork(); |
1140 | | |
1141 | | // Wait for memtable compaction |
1142 | | Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); |
1143 | | |
1144 | | Status TEST_WaitForCompact(); |
1145 | | Status TEST_WaitForCompact( |
1146 | | const WaitForCompactOptions& wait_for_compact_options); |
1147 | | |
1148 | | // Wait for any background purge |
1149 | | Status TEST_WaitForPurge(); |
1150 | | |
1151 | | // Get the background error status |
1152 | | Status TEST_GetBGError(); |
1153 | | |
1154 | | // Return the maximum overlapping data (in bytes) at next level for any |
1155 | | // file at a level >= 1. |
1156 | | uint64_t TEST_MaxNextLevelOverlappingBytes( |
1157 | | ColumnFamilyHandle* column_family = nullptr); |
1158 | | |
1159 | | // Return the current manifest file no. |
1160 | | uint64_t TEST_Current_Manifest_FileNo(); |
1161 | | |
1162 | | // Returns the number that'll be assigned to the next file that's created. |
1163 | | uint64_t TEST_Current_Next_FileNo(); |
1164 | | |
1165 | | // get total level0 file size. Only for testing. |
1166 | | uint64_t TEST_GetLevel0TotalSize(); |
1167 | | |
1168 | | void TEST_GetFilesMetaData( |
1169 | | ColumnFamilyHandle* column_family, |
1170 | | std::vector<std::vector<FileMetaData>>* metadata, |
1171 | | std::vector<std::shared_ptr<BlobFileMetaData>>* blob_metadata = nullptr); |
1172 | | |
1173 | | void TEST_LockMutex(); |
1174 | | |
1175 | | void TEST_UnlockMutex(); |
1176 | | |
1177 | | InstrumentedMutex* TEST_Mutex() { return &mutex_; } |
1178 | | |
1179 | | void TEST_SignalAllBgCv(); |
1180 | | |
1181 | | // REQUIRES: mutex locked |
1182 | | void* TEST_BeginWrite(); |
1183 | | |
1184 | | // REQUIRES: mutex locked |
1185 | | // pass the pointer that you got from TEST_BeginWrite() |
1186 | | void TEST_EndWrite(void* w); |
1187 | | |
1188 | | uint64_t TEST_MaxTotalInMemoryState() const { |
1189 | | return max_total_in_memory_state_; |
1190 | | } |
1191 | | |
1192 | | size_t TEST_LogsToFreeSize(); |
1193 | | |
1194 | | uint64_t TEST_LogfileNumber(); |
1195 | | |
1196 | | uint64_t TEST_total_log_size() const { return total_log_size_; } |
1197 | | |
1198 | | // Returns column family name to ImmutableCFOptions map. |
1199 | | Status TEST_GetAllImmutableCFOptions( |
1200 | | std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map); |
1201 | | |
1202 | | // Return the lastest MutableCFOptions of a column family |
1203 | | Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family, |
1204 | | MutableCFOptions* mutable_cf_options); |
1205 | | |
1206 | | Cache* TEST_table_cache() { return table_cache_.get(); } |
1207 | | |
1208 | | WriteController& TEST_write_controler() { return write_controller_; } |
1209 | | |
1210 | | uint64_t TEST_FindMinLogContainingOutstandingPrep(); |
1211 | | uint64_t TEST_FindMinPrepLogReferencedByMemTable(); |
1212 | | size_t TEST_PreparedSectionCompletedSize(); |
1213 | | size_t TEST_LogsWithPrepSize(); |
1214 | | |
1215 | | int TEST_BGCompactionsAllowed() const; |
1216 | | int TEST_BGFlushesAllowed() const; |
1217 | | size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; |
1218 | | void TEST_WaitForPeriodicTaskRun(std::function<void()> callback) const; |
1219 | | SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const; |
1220 | | const autovector<uint64_t>& TEST_GetFilesToQuarantine() const; |
1221 | | size_t TEST_EstimateInMemoryStatsHistorySize() const; |
1222 | | |
1223 | | uint64_t TEST_GetCurrentLogNumber() const { |
1224 | | InstrumentedMutexLock l(mutex()); |
1225 | | assert(!logs_.empty()); |
1226 | | return logs_.back().number; |
1227 | | } |
1228 | | |
1229 | | void TEST_DeleteObsoleteFiles(); |
1230 | | |
1231 | | const std::unordered_set<uint64_t>& TEST_GetFilesGrabbedForPurge() const { |
1232 | | return files_grabbed_for_purge_; |
1233 | | } |
1234 | | |
1235 | | const PeriodicTaskScheduler& TEST_GetPeriodicTaskScheduler() const; |
1236 | | |
1237 | | static Status TEST_ValidateOptions(const DBOptions& db_options) { |
1238 | | return ValidateOptions(db_options); |
1239 | | } |
1240 | | |
1241 | | #endif // NDEBUG |
1242 | | |
1243 | | // persist stats to column family "_persistent_stats" |
1244 | | void PersistStats(); |
1245 | | |
1246 | | // dump rocksdb.stats to LOG |
1247 | | void DumpStats(); |
1248 | | |
1249 | | // flush LOG out of application buffer |
1250 | | void FlushInfoLog(); |
1251 | | |
1252 | | // record current sequence number to time mapping. If |
1253 | | // populate_historical_seconds > 0 then pre-populate all the |
1254 | | // sequence numbers from [1, last] to map to [now minus |
1255 | | // populate_historical_seconds, now]. |
1256 | | void RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds); |
1257 | | |
1258 | | // Everytime DB's seqno to time mapping changed (which already hold the db |
1259 | | // mutex), we install a new SuperVersion in each column family with a shared |
1260 | | // copy of the new mapping while holding the db mutex. |
1261 | | // This is done for all column families even though the column family does not |
1262 | | // explicitly enabled the |
1263 | | // `preclude_last_level_data_seconds` or `preserve_internal_time_seconds` |
1264 | | // features. |
1265 | | // This mapping supports iterators to fulfill the |
1266 | | // "rocksdb.iterator.write-time" iterator property for entries in memtables. |
1267 | | // |
1268 | | // Since this new SuperVersion doesn't involve an LSM tree shape change, we |
1269 | | // don't schedule work after installing this SuperVersion. It returns the used |
1270 | | // `SuperVersionContext` for clean up after release mutex. |
1271 | | void InstallSeqnoToTimeMappingInSV( |
1272 | | std::vector<SuperVersionContext>* sv_contexts); |
1273 | | |
1274 | | // Interface to block and signal the DB in case of stalling writes by |
1275 | | // WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface. |
1276 | | // When DB needs to be blocked or signalled by WriteBufferManager, |
1277 | | // state_ is changed accordingly. |
1278 | | class WBMStallInterface : public StallInterface { |
1279 | | public: |
1280 | | enum State { |
1281 | | BLOCKED = 0, |
1282 | | RUNNING, |
1283 | | }; |
1284 | | |
1285 | 11.0k | WBMStallInterface() : state_cv_(&state_mutex_) { |
1286 | 11.0k | MutexLock lock(&state_mutex_); |
1287 | 11.0k | state_ = State::RUNNING; |
1288 | 11.0k | } |
1289 | | |
1290 | 0 | void SetState(State state) { |
1291 | 0 | MutexLock lock(&state_mutex_); |
1292 | 0 | state_ = state; |
1293 | 0 | } |
1294 | | |
1295 | | // Change the state_ to State::BLOCKED and wait until its state is |
1296 | | // changed by WriteBufferManager. When stall is cleared, Signal() is |
1297 | | // called to change the state and unblock the DB. |
1298 | 0 | void Block() override { |
1299 | 0 | MutexLock lock(&state_mutex_); |
1300 | 0 | while (state_ == State::BLOCKED) { |
1301 | 0 | TEST_SYNC_POINT("WBMStallInterface::BlockDB"); |
1302 | 0 | state_cv_.Wait(); |
1303 | 0 | } |
1304 | 0 | } |
1305 | | |
1306 | | // Called from WriteBufferManager. This function changes the state_ |
1307 | | // to State::RUNNING indicating the stall is cleared and DB can proceed. |
1308 | 11.0k | void Signal() override { |
1309 | 11.0k | { |
1310 | 11.0k | MutexLock lock(&state_mutex_); |
1311 | 11.0k | state_ = State::RUNNING; |
1312 | 11.0k | } |
1313 | 11.0k | state_cv_.Signal(); |
1314 | 11.0k | } |
1315 | | |
1316 | | private: |
1317 | | // Conditional variable and mutex to block and |
1318 | | // signal the DB during stalling process. |
1319 | | port::Mutex state_mutex_; |
1320 | | port::CondVar state_cv_; |
1321 | | // state represting whether DB is running or blocked because of stall by |
1322 | | // WriteBufferManager. |
1323 | | State state_; |
1324 | | }; |
1325 | | |
1326 | | static void TEST_ResetDbSessionIdGen(); |
1327 | | static std::string GenerateDbSessionId(Env* env); |
1328 | | |
1329 | 0 | bool seq_per_batch() const { return seq_per_batch_; } |
1330 | | |
1331 | | protected: |
1332 | | const std::string dbname_; |
1333 | | // TODO(peterd): unify with VersionSet::db_id_ |
1334 | | std::string db_id_; |
1335 | | // db_session_id_ is an identifier that gets reset |
1336 | | // every time the DB is opened |
1337 | | std::string db_session_id_; |
1338 | | std::unique_ptr<VersionSet> versions_; |
1339 | | // Flag to check whether we allocated and own the info log file |
1340 | | bool own_info_log_; |
1341 | | Status init_logger_creation_s_; |
1342 | | const DBOptions initial_db_options_; |
1343 | | Env* const env_; |
1344 | | std::shared_ptr<IOTracer> io_tracer_; |
1345 | | const ImmutableDBOptions immutable_db_options_; |
1346 | | FileSystemPtr fs_; |
1347 | | MutableDBOptions mutable_db_options_; |
1348 | | Statistics* stats_; |
1349 | | std::unordered_map<std::string, RecoveredTransaction*> |
1350 | | recovered_transactions_; |
1351 | | std::unique_ptr<Tracer> tracer_; |
1352 | | InstrumentedMutex trace_mutex_; |
1353 | | BlockCacheTracer block_cache_tracer_; |
1354 | | |
1355 | | // constant false canceled flag, used when the compaction is not manual |
1356 | | const std::atomic<bool> kManualCompactionCanceledFalse_{false}; |
1357 | | |
1358 | | // State below is protected by mutex_ |
1359 | | // With two_write_queues enabled, some of the variables that accessed during |
1360 | | // WriteToWAL need different synchronization: log_empty_, alive_log_files_, |
1361 | | // logs_, logfile_number_. Refer to the definition of each variable below for |
1362 | | // more description. |
1363 | | // |
1364 | | // `mutex_` can be a hot lock in some workloads, so it deserves dedicated |
1365 | | // cachelines. |
1366 | | mutable CacheAlignedInstrumentedMutex mutex_; |
1367 | | |
1368 | | ColumnFamilyHandleImpl* default_cf_handle_; |
1369 | | InternalStats* default_cf_internal_stats_; |
1370 | | |
1371 | | // table_cache_ provides its own synchronization |
1372 | | std::shared_ptr<Cache> table_cache_; |
1373 | | |
1374 | | ErrorHandler error_handler_; |
1375 | | |
1376 | | // Unified interface for logging events |
1377 | | EventLogger event_logger_; |
1378 | | |
1379 | | // only used for dynamically adjusting max_total_wal_size. it is a sum of |
1380 | | // [write_buffer_size * max_write_buffer_number] over all column families |
1381 | | std::atomic<uint64_t> max_total_in_memory_state_; |
1382 | | |
1383 | | // The options to access storage files |
1384 | | const FileOptions file_options_; |
1385 | | |
1386 | | // Additonal options for compaction and flush |
1387 | | FileOptions file_options_for_compaction_; |
1388 | | |
1389 | | std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_; |
1390 | | |
1391 | | // Increase the sequence number after writing each batch, whether memtable is |
1392 | | // disabled for that or not. Otherwise the sequence number is increased after |
1393 | | // writing each key into memtable. This implies that when disable_memtable is |
1394 | | // set, the seq is not increased at all. |
1395 | | // |
1396 | | // Default: false |
1397 | | const bool seq_per_batch_; |
1398 | | // This determines during recovery whether we expect one writebatch per |
1399 | | // recovered transaction, or potentially multiple writebatches per |
1400 | | // transaction. For WriteUnprepared, this is set to false, since multiple |
1401 | | // batches can exist per transaction. |
1402 | | // |
1403 | | // Default: true |
1404 | | const bool batch_per_txn_; |
1405 | | |
1406 | | // Each flush or compaction gets its own job id. this counter makes sure |
1407 | | // they're unique |
1408 | | std::atomic<int> next_job_id_; |
1409 | | |
1410 | | std::atomic<bool> shutting_down_; |
1411 | | |
1412 | | // No new background jobs can be queued if true. This is used to prevent new |
1413 | | // background jobs from being queued after WaitForCompact() completes waiting |
1414 | | // all background jobs then attempts to close when close_db_ option is true. |
1415 | | bool reject_new_background_jobs_; |
1416 | | |
1417 | | // RecoveryContext struct stores the context about version edits along |
1418 | | // with corresponding column_family_data and column_family_options. |
1419 | | class RecoveryContext { |
1420 | | public: |
1421 | 11.0k | ~RecoveryContext() { |
1422 | 11.4k | for (auto& edit_list : edit_lists_) { |
1423 | 23.1k | for (auto* edit : edit_list) { |
1424 | 23.1k | delete edit; |
1425 | 23.1k | } |
1426 | 11.4k | } |
1427 | 11.0k | } |
1428 | | |
1429 | 23.1k | void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) { |
1430 | 23.1k | assert(cfd != nullptr); |
1431 | 23.1k | if (map_.find(cfd->GetID()) == map_.end()) { |
1432 | 11.4k | uint32_t size = static_cast<uint32_t>(map_.size()); |
1433 | 11.4k | map_.emplace(cfd->GetID(), size); |
1434 | 11.4k | cfds_.emplace_back(cfd); |
1435 | 11.4k | mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions()); |
1436 | 11.4k | edit_lists_.emplace_back(autovector<VersionEdit*>()); |
1437 | 11.4k | } |
1438 | 23.1k | uint32_t i = map_[cfd->GetID()]; |
1439 | 23.1k | edit_lists_[i].emplace_back(new VersionEdit(edit)); |
1440 | 23.1k | } |
1441 | | |
1442 | | std::unordered_map<uint32_t, uint32_t> map_; // cf_id to index; |
1443 | | autovector<ColumnFamilyData*> cfds_; |
1444 | | autovector<const MutableCFOptions*> mutable_cf_opts_; |
1445 | | autovector<autovector<VersionEdit*>> edit_lists_; |
1446 | | // All existing data files (SST files and Blob files) found during DB::Open. |
1447 | | std::vector<std::string> existing_data_files_; |
1448 | | bool is_new_db_ = false; |
1449 | | }; |
1450 | | |
1451 | | // Persist options to options file. Must be holding options_mutex_. |
1452 | | // Will lock DB mutex if !db_mutex_already_held. |
1453 | | Status WriteOptionsFile(const WriteOptions& write_options, |
1454 | | bool db_mutex_already_held); |
1455 | | |
1456 | | Status CompactRangeInternal(const CompactRangeOptions& options, |
1457 | | ColumnFamilyHandle* column_family, |
1458 | | const Slice* begin, const Slice* end, |
1459 | | const std::string& trim_ts); |
1460 | | |
1461 | | // The following two functions can only be called when: |
1462 | | // 1. WriteThread::Writer::EnterUnbatched() is used. |
1463 | | // 2. db_mutex is NOT held |
1464 | | Status RenameTempFileToOptionsFile(const std::string& file_name); |
1465 | | Status DeleteObsoleteOptionsFiles(); |
1466 | | |
1467 | | void NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds, |
1468 | | FlushReason flush_reason); |
1469 | | |
1470 | | void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, |
1471 | | const MutableCFOptions& mutable_cf_options, |
1472 | | int job_id, FlushReason flush_reason); |
1473 | | |
1474 | | void NotifyOnFlushCompleted( |
1475 | | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, |
1476 | | std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info); |
1477 | | |
1478 | | void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c, |
1479 | | const Status& st, |
1480 | | const CompactionJobStats& job_stats, int job_id); |
1481 | | |
1482 | | void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction* c, |
1483 | | const Status& st, |
1484 | | const CompactionJobStats& job_stats, |
1485 | | int job_id); |
1486 | | void NotifyOnMemTableSealed(ColumnFamilyData* cfd, |
1487 | | const MemTableInfo& mem_table_info); |
1488 | | |
1489 | | void NotifyOnExternalFileIngested( |
1490 | | ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job); |
1491 | | |
1492 | | Status FlushAllColumnFamilies(const FlushOptions& flush_options, |
1493 | | FlushReason flush_reason); |
1494 | | |
1495 | | virtual Status FlushForGetLiveFiles(); |
1496 | | |
1497 | | void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const; |
1498 | | |
1499 | | void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const; |
1500 | | |
1501 | | void EraseThreadStatusDbInfo() const; |
1502 | | |
1503 | | // If disable_memtable is set the application logic must guarantee that the |
1504 | | // batch will still be skipped from memtable during the recovery. An excption |
1505 | | // to this is seq_per_batch_ mode, in which since each batch already takes one |
1506 | | // seq, it is ok for the batch to write to memtable during recovery as long as |
1507 | | // it only takes one sequence number: i.e., no duplicate keys. |
1508 | | // In WriteCommitted it is guarnateed since disable_memtable is used for |
1509 | | // prepare batch which will be written to memtable later during the commit, |
1510 | | // and in WritePrepared it is guaranteed since it will be used only for WAL |
1511 | | // markers which will never be written to memtable. If the commit marker is |
1512 | | // accompanied with CommitTimeWriteBatch that is not written to memtable as |
1513 | | // long as it has no duplicate keys, it does not violate the one-seq-per-batch |
1514 | | // policy. |
1515 | | // batch_cnt is expected to be non-zero in seq_per_batch mode and |
1516 | | // indicates the number of sub-patches. A sub-patch is a subset of the write |
1517 | | // batch that does not have duplicate keys. |
1518 | | Status WriteImpl(const WriteOptions& options, WriteBatch* updates, |
1519 | | WriteCallback* callback = nullptr, |
1520 | | UserWriteCallback* user_write_cb = nullptr, |
1521 | | uint64_t* log_used = nullptr, uint64_t log_ref = 0, |
1522 | | bool disable_memtable = false, uint64_t* seq_used = nullptr, |
1523 | | size_t batch_cnt = 0, |
1524 | | PreReleaseCallback* pre_release_callback = nullptr, |
1525 | | PostMemTableCallback* post_memtable_callback = nullptr); |
1526 | | |
1527 | | Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, |
1528 | | WriteCallback* callback = nullptr, |
1529 | | UserWriteCallback* user_write_cb = nullptr, |
1530 | | uint64_t* log_used = nullptr, uint64_t log_ref = 0, |
1531 | | bool disable_memtable = false, |
1532 | | uint64_t* seq_used = nullptr); |
1533 | | |
1534 | | // Write only to memtables without joining any write queue |
1535 | | Status UnorderedWriteMemtable(const WriteOptions& write_options, |
1536 | | WriteBatch* my_batch, WriteCallback* callback, |
1537 | | uint64_t log_ref, SequenceNumber seq, |
1538 | | const size_t sub_batch_cnt); |
1539 | | |
1540 | | // Whether the batch requires to be assigned with an order |
1541 | | enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder }; |
1542 | | // Whether it requires publishing last sequence or not |
1543 | | enum PublishLastSeq : bool { kDontPublishLastSeq, kDoPublishLastSeq }; |
1544 | | |
1545 | | // Join the write_thread to write the batch only to the WAL. It is the |
1546 | | // responsibility of the caller to also write the write batch to the memtable |
1547 | | // if it required. |
1548 | | // |
1549 | | // sub_batch_cnt is expected to be non-zero when assign_order = kDoAssignOrder |
1550 | | // indicating the number of sub-batches in my_batch. A sub-patch is a subset |
1551 | | // of the write batch that does not have duplicate keys. When seq_per_batch is |
1552 | | // not set, each key is a separate sub_batch. Otherwise each duplicate key |
1553 | | // marks start of a new sub-batch. |
1554 | | Status WriteImplWALOnly( |
1555 | | WriteThread* write_thread, const WriteOptions& options, |
1556 | | WriteBatch* updates, WriteCallback* callback, |
1557 | | UserWriteCallback* user_write_cb, uint64_t* log_used, |
1558 | | const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, |
1559 | | PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, |
1560 | | const PublishLastSeq publish_last_seq, const bool disable_memtable); |
1561 | | |
1562 | | // write cached_recoverable_state_ to memtable if it is not empty |
1563 | | // The writer must be the leader in write_thread_ and holding mutex_ |
1564 | | Status WriteRecoverableState(); |
1565 | | |
1566 | | // Actual implementation of Close() |
1567 | | virtual Status CloseImpl(); |
1568 | | |
1569 | | // Recover the descriptor from persistent storage. May do a significant |
1570 | | // amount of work to recover recently logged updates. Any changes to |
1571 | | // be made to the descriptor are added to *edit. |
1572 | | // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is |
1573 | | // skipped. |
1574 | | // recovery_ctx stores the context about version edits and all those |
1575 | | // edits are persisted to new Manifest after successfully syncing the new WAL. |
1576 | | virtual Status Recover( |
1577 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
1578 | | bool read_only = false, bool error_if_wal_file_exists = false, |
1579 | | bool error_if_data_exists_in_wals = false, bool is_retry = false, |
1580 | | uint64_t* recovered_seq = nullptr, |
1581 | | RecoveryContext* recovery_ctx = nullptr, bool* can_retry = nullptr); |
1582 | | |
1583 | 22.5k | virtual bool OwnTablesAndLogs() const { return true; } |
1584 | | |
1585 | | // Setup DB identity file, and write DB ID to manifest if necessary. |
1586 | | Status SetupDBId(const WriteOptions& write_options, bool read_only, |
1587 | | RecoveryContext* recovery_ctx); |
1588 | | // Assign db_id_ and write DB ID to manifest if necessary. |
1589 | | void SetDBId(std::string&& id, bool read_only, RecoveryContext* recovery_ctx); |
1590 | | |
1591 | | // Collect a deduplicated collection of paths used by this DB, including |
1592 | | // dbname_, DBOptions.db_paths, ColumnFamilyOptions.cf_paths. |
1593 | | std::set<std::string> CollectAllDBPaths(); |
1594 | | |
1595 | | // REQUIRES: db mutex held when calling this function, but the db mutex can |
1596 | | // be released and re-acquired. Db mutex will be held when the function |
1597 | | // returns. |
1598 | | // It stores all existing data files (SST and Blob) in RecoveryContext. In |
1599 | | // the meantime, we find out the largest file number present in the paths, and |
1600 | | // bump up the version set's next_file_number_ to be 1 + largest_file_number. |
1601 | | // recovery_ctx stores the context about version edits. All those edits are |
1602 | | // persisted to new Manifest after successfully syncing the new WAL. |
1603 | | Status MaybeUpdateNextFileNumber(RecoveryContext* recovery_ctx); |
1604 | | |
1605 | | // Track existing data files, including both referenced and unreferenced SST |
1606 | | // and Blob files in SstFileManager. This is only called during DB::Open and |
1607 | | // it's called before any file deletion start so that their deletion can be |
1608 | | // properly rate limited. |
1609 | | // Files may not be referenced in the MANIFEST because (e.g. |
1610 | | // 1. It's best effort recovery; |
1611 | | // 2. The VersionEdits referencing the SST files are appended to |
1612 | | // RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are |
1613 | | // still not synced to MANIFEST during recovery.) |
1614 | | // |
1615 | | // If the file is referenced in Manifest (typically that's the |
1616 | | // vast majority of all files), since it already has the file size |
1617 | | // on record, we don't need to query the file system. Otherwise, we query the |
1618 | | // file system for the size of an unreferenced file. |
1619 | | void TrackExistingDataFiles( |
1620 | | const std::vector<std::string>& existing_data_files); |
1621 | | |
1622 | | // SetDbSessionId() should be called in the constuctor DBImpl() |
1623 | | // to ensure that db_session_id_ gets updated every time the DB is opened |
1624 | | void SetDbSessionId(); |
1625 | | |
1626 | | Status FailIfCfHasTs(const ColumnFamilyHandle* column_family) const; |
1627 | | Status FailIfTsMismatchCf(ColumnFamilyHandle* column_family, |
1628 | | const Slice& ts) const; |
1629 | | |
1630 | | // Check that the read timestamp `ts` is at or above the `full_history_ts_low` |
1631 | | // timestamp in a `SuperVersion`. It's necessary to do this check after |
1632 | | // grabbing the SuperVersion. If the check passed, the referenced SuperVersion |
1633 | | // this read holds on to can ensure the read won't be affected if |
1634 | | // `full_history_ts_low` is increased concurrently, and this achieves that |
1635 | | // without explicitly locking by piggybacking the SuperVersion. |
1636 | | Status FailIfReadCollapsedHistory(const ColumnFamilyData* cfd, |
1637 | | const SuperVersion* sv, |
1638 | | const Slice& ts) const; |
1639 | | |
1640 | | // recovery_ctx stores the context about version edits and |
1641 | | // LogAndApplyForRecovery persist all those edits to new Manifest after |
1642 | | // successfully syncing new WAL. |
1643 | | // LogAndApplyForRecovery should be called only once during recovery and it |
1644 | | // should be called when RocksDB writes to a first new MANIFEST since this |
1645 | | // recovery. |
1646 | | Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx); |
1647 | | |
1648 | | void InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap(); |
1649 | | |
1650 | | // Return true to proceed with current WAL record whose content is stored in |
1651 | | // `batch`. Return false to skip current WAL record. |
1652 | | bool InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number, |
1653 | | const std::string& wal_fname, |
1654 | | log::Reader::Reporter& reporter, |
1655 | | Status& status, bool& stop_replay, |
1656 | | WriteBatch& batch); |
1657 | | |
1658 | | private: |
1659 | | friend class DB; |
1660 | | friend class ErrorHandler; |
1661 | | friend class InternalStats; |
1662 | | friend class PessimisticTransaction; |
1663 | | friend class TransactionBaseImpl; |
1664 | | friend class WriteCommittedTxn; |
1665 | | friend class WritePreparedTxn; |
1666 | | friend class WritePreparedTxnDB; |
1667 | | friend class WriteBatchWithIndex; |
1668 | | friend class WriteUnpreparedTxnDB; |
1669 | | friend class WriteUnpreparedTxn; |
1670 | | |
1671 | | friend class ForwardIterator; |
1672 | | friend struct SuperVersion; |
1673 | | friend class CompactedDBImpl; |
1674 | | friend class DBImplFollower; |
1675 | | #ifndef NDEBUG |
1676 | | friend class DBTest_ConcurrentFlushWAL_Test; |
1677 | | friend class DBTest_MixedSlowdownOptionsStop_Test; |
1678 | | friend class DBCompactionTest_CompactBottomLevelFilesWithDeletions_Test; |
1679 | | friend class DBCompactionTest_CompactionDuringShutdown_Test; |
1680 | | friend class DBCompactionTest_DelayCompactBottomLevelFilesWithDeletions_Test; |
1681 | | friend class DBCompactionTest_DisableCompactBottomLevelFiles_Test; |
1682 | | friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test; |
1683 | | friend class DBTest2_ReadCallbackTest_Test; |
1684 | | friend class WriteCallbackPTest_WriteWithCallbackTest_Test; |
1685 | | friend class XFTransactionWriteHandler; |
1686 | | friend class DBBlobIndexTest; |
1687 | | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; |
1688 | | #endif |
1689 | | |
1690 | | struct CompactionState; |
1691 | | struct PrepickedCompaction; |
1692 | | struct PurgeFileInfo; |
1693 | | |
1694 | | struct WriteContext { |
1695 | | SuperVersionContext superversion_context; |
1696 | | autovector<MemTable*> memtables_to_free_; |
1697 | | |
1698 | | explicit WriteContext(bool create_superversion = false) |
1699 | 1.16M | : superversion_context(create_superversion) {} |
1700 | | |
1701 | 1.16M | ~WriteContext() { |
1702 | 1.16M | superversion_context.Clean(); |
1703 | 1.16M | for (auto& m : memtables_to_free_) { |
1704 | 0 | delete m; |
1705 | 0 | } |
1706 | 1.16M | } |
1707 | | }; |
1708 | | |
1709 | | struct LogFileNumberSize { |
1710 | 17.0k | explicit LogFileNumberSize(uint64_t _number) : number(_number) {} |
1711 | 0 | LogFileNumberSize() {} |
1712 | 1.16M | void AddSize(uint64_t new_size) { size += new_size; } |
1713 | | uint64_t number; |
1714 | | uint64_t size = 0; |
1715 | | bool getting_flushed = false; |
1716 | | }; |
1717 | | |
1718 | | struct LogWriterNumber { |
1719 | | // pass ownership of _writer |
1720 | | LogWriterNumber(uint64_t _number, log::Writer* _writer) |
1721 | 11.2k | : number(_number), writer(_writer) {} |
1722 | | |
1723 | 133 | log::Writer* ReleaseWriter() { |
1724 | 133 | auto* w = writer; |
1725 | 133 | writer = nullptr; |
1726 | 133 | return w; |
1727 | 133 | } |
1728 | 11.0k | Status ClearWriter() { |
1729 | 11.0k | Status s; |
1730 | 11.0k | if (writer->file()) { |
1731 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
1732 | 11.0k | s = writer->WriteBuffer(WriteOptions()); |
1733 | 11.0k | } |
1734 | 11.0k | delete writer; |
1735 | 11.0k | writer = nullptr; |
1736 | 11.0k | return s; |
1737 | 11.0k | } |
1738 | | |
1739 | 133 | bool IsSyncing() { return getting_synced; } |
1740 | | |
1741 | 0 | uint64_t GetPreSyncSize() { |
1742 | 0 | assert(getting_synced); |
1743 | 0 | return pre_sync_size; |
1744 | 0 | } |
1745 | | |
1746 | 133 | void PrepareForSync() { |
1747 | 133 | assert(!getting_synced); |
1748 | | // Ensure the head of logs_ is marked as getting_synced if any is. |
1749 | 133 | getting_synced = true; |
1750 | | // If last sync failed on a later WAL, this could be a fully synced |
1751 | | // and closed WAL that just needs to be recorded as synced in the |
1752 | | // manifest. |
1753 | 133 | if (writer->file()) { |
1754 | | // Size is expected to be monotonically increasing. |
1755 | 133 | assert(writer->file()->GetFlushedSize() >= pre_sync_size); |
1756 | 133 | pre_sync_size = writer->file()->GetFlushedSize(); |
1757 | 133 | } |
1758 | 133 | } |
1759 | | |
1760 | 133 | void FinishSync() { |
1761 | 133 | assert(getting_synced); |
1762 | 133 | getting_synced = false; |
1763 | 133 | } |
1764 | | |
1765 | | uint64_t number; |
1766 | | // Visual Studio doesn't support deque's member to be noncopyable because |
1767 | | // of a std::unique_ptr as a member. |
1768 | | log::Writer* writer; // own |
1769 | | |
1770 | | private: |
1771 | | // true for some prefix of logs_ |
1772 | | bool getting_synced = false; |
1773 | | // The size of the file before the sync happens. This amount is guaranteed |
1774 | | // to be persisted even if appends happen during sync so it can be used for |
1775 | | // tracking the synced size in MANIFEST. |
1776 | | uint64_t pre_sync_size = 0; |
1777 | | }; |
1778 | | |
1779 | | struct LogContext { |
1780 | | explicit LogContext(bool need_sync = false) |
1781 | 1.16M | : need_log_sync(need_sync), need_log_dir_sync(need_sync) {} |
1782 | | bool need_log_sync = false; |
1783 | | bool need_log_dir_sync = false; |
1784 | | log::Writer* writer = nullptr; |
1785 | | LogFileNumberSize* log_file_number_size = nullptr; |
1786 | | }; |
1787 | | |
1788 | | // PurgeFileInfo is a structure to hold information of files to be deleted in |
1789 | | // purge_files_ |
1790 | | struct PurgeFileInfo { |
1791 | | std::string fname; |
1792 | | std::string dir_to_sync; |
1793 | | FileType type; |
1794 | | uint64_t number; |
1795 | | int job_id; |
1796 | | PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num, |
1797 | | int jid) |
1798 | 0 | : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {} |
1799 | | }; |
1800 | | |
1801 | | // Argument required by background flush thread. |
1802 | | struct BGFlushArg { |
1803 | | BGFlushArg() |
1804 | | : cfd_(nullptr), |
1805 | | max_memtable_id_(0), |
1806 | | superversion_context_(nullptr), |
1807 | 0 | flush_reason_(FlushReason::kOthers) {} |
1808 | | BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id, |
1809 | | SuperVersionContext* superversion_context, |
1810 | | FlushReason flush_reason) |
1811 | | : cfd_(cfd), |
1812 | | max_memtable_id_(max_memtable_id), |
1813 | | superversion_context_(superversion_context), |
1814 | 133 | flush_reason_(flush_reason) {} |
1815 | | |
1816 | | // Column family to flush. |
1817 | | ColumnFamilyData* cfd_; |
1818 | | // Maximum ID of memtable to flush. In this column family, memtables with |
1819 | | // IDs smaller than this value must be flushed before this flush completes. |
1820 | | uint64_t max_memtable_id_; |
1821 | | // Pointer to a SuperVersionContext object. After flush completes, RocksDB |
1822 | | // installs a new superversion for the column family. This operation |
1823 | | // requires a SuperVersionContext object (currently embedded in JobContext). |
1824 | | SuperVersionContext* superversion_context_; |
1825 | | FlushReason flush_reason_; |
1826 | | }; |
1827 | | |
1828 | | // Argument passed to flush thread. |
1829 | | struct FlushThreadArg { |
1830 | | DBImpl* db_; |
1831 | | |
1832 | | Env::Priority thread_pri_; |
1833 | | }; |
1834 | | |
1835 | | // Information for a manual compaction |
1836 | | struct ManualCompactionState { |
1837 | | ManualCompactionState(ColumnFamilyData* _cfd, int _input_level, |
1838 | | int _output_level, uint32_t _output_path_id, |
1839 | | bool _exclusive, bool _disallow_trivial_move, |
1840 | | std::atomic<bool>* _canceled) |
1841 | | : cfd(_cfd), |
1842 | | input_level(_input_level), |
1843 | | output_level(_output_level), |
1844 | | output_path_id(_output_path_id), |
1845 | | exclusive(_exclusive), |
1846 | | disallow_trivial_move(_disallow_trivial_move), |
1847 | 159 | canceled(_canceled ? *_canceled : canceled_internal_storage) {} |
1848 | | // When _canceled is not provided by ther user, we assign the reference of |
1849 | | // canceled_internal_storage to it to consolidate canceled and |
1850 | | // manual_compaction_paused since DisableManualCompaction() might be |
1851 | | // called |
1852 | | |
1853 | | ColumnFamilyData* cfd; |
1854 | | int input_level; |
1855 | | int output_level; |
1856 | | uint32_t output_path_id; |
1857 | | Status status; |
1858 | | bool done = false; |
1859 | | bool in_progress = false; // compaction request being processed? |
1860 | | bool incomplete = false; // only part of requested range compacted |
1861 | | bool exclusive; // current behavior of only one manual |
1862 | | bool disallow_trivial_move; // Force actual compaction to run |
1863 | | const InternalKey* begin = nullptr; // nullptr means beginning of key range |
1864 | | const InternalKey* end = nullptr; // nullptr means end of key range |
1865 | | InternalKey* manual_end = nullptr; // how far we are compacting |
1866 | | InternalKey tmp_storage; // Used to keep track of compaction progress |
1867 | | InternalKey tmp_storage1; // Used to keep track of compaction progress |
1868 | | |
1869 | | // When the user provides a canceled pointer in CompactRangeOptions, the |
1870 | | // above varaibe is the reference of the user-provided |
1871 | | // `canceled`, otherwise, it is the reference of canceled_internal_storage |
1872 | | std::atomic<bool> canceled_internal_storage = false; |
1873 | | std::atomic<bool>& canceled; // Compaction canceled pointer reference |
1874 | | }; |
1875 | | struct PrepickedCompaction { |
1876 | | // background compaction takes ownership of `compaction`. |
1877 | | Compaction* compaction; |
1878 | | // caller retains ownership of `manual_compaction_state` as it is reused |
1879 | | // across background compactions. |
1880 | | ManualCompactionState* manual_compaction_state; // nullptr if non-manual |
1881 | | // task limiter token is requested during compaction picking. |
1882 | | std::unique_ptr<TaskLimiterToken> task_token; |
1883 | | }; |
1884 | | |
1885 | | struct CompactionArg { |
1886 | | // caller retains ownership of `db`. |
1887 | | DBImpl* db; |
1888 | | // background compaction takes ownership of `prepicked_compaction`. |
1889 | | PrepickedCompaction* prepicked_compaction; |
1890 | | Env::Priority compaction_pri_; |
1891 | | }; |
1892 | | |
1893 | 0 | static bool IsRecoveryFlush(FlushReason flush_reason) { |
1894 | 0 | return flush_reason == FlushReason::kErrorRecoveryRetryFlush || |
1895 | 0 | flush_reason == FlushReason::kErrorRecovery; |
1896 | 0 | } |
1897 | | // Initialize the built-in column family for persistent stats. Depending on |
1898 | | // whether on-disk persistent stats have been enabled before, it may either |
1899 | | // create a new column family and column family handle or just a column family |
1900 | | // handle. |
1901 | | // Required: DB mutex held |
1902 | | Status InitPersistStatsColumnFamily(); |
1903 | | |
1904 | | // Persistent Stats column family has two format version key which are used |
1905 | | // for compatibility check. Write format version if it's created for the |
1906 | | // first time, read format version and check compatibility if recovering |
1907 | | // from disk. This function requires DB mutex held at entrance but may |
1908 | | // release and re-acquire DB mutex in the process. |
1909 | | // Required: DB mutex held |
1910 | | Status PersistentStatsProcessFormatVersion(); |
1911 | | |
1912 | | Status ResumeImpl(DBRecoverContext context); |
1913 | | |
1914 | | void MaybeIgnoreError(Status* s) const; |
1915 | | |
1916 | | const Status CreateArchivalDirectory(); |
1917 | | |
1918 | | // Create a column family, without some of the follow-up work yet |
1919 | | Status CreateColumnFamilyImpl(const ReadOptions& read_options, |
1920 | | const WriteOptions& write_options, |
1921 | | const ColumnFamilyOptions& cf_options, |
1922 | | const std::string& cf_name, |
1923 | | ColumnFamilyHandle** handle); |
1924 | | |
1925 | | // Follow-up work to user creating a column family or (families) |
1926 | | Status WrapUpCreateColumnFamilies( |
1927 | | const ReadOptions& read_options, const WriteOptions& write_options, |
1928 | | const std::vector<const ColumnFamilyOptions*>& cf_options); |
1929 | | |
1930 | | Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family); |
1931 | | |
1932 | | // Delete any unneeded files and stale in-memory entries. |
1933 | | void DeleteObsoleteFiles(); |
1934 | | // Delete obsolete files and log status and information of file deletion |
1935 | | void DeleteObsoleteFileImpl(int job_id, const std::string& fname, |
1936 | | const std::string& path_to_sync, FileType type, |
1937 | | uint64_t number); |
1938 | | |
1939 | | // Background process needs to call |
1940 | | // auto x = CaptureCurrentFileNumberInPendingOutputs() |
1941 | | // auto file_num = versions_->NewFileNumber(); |
1942 | | // <do something> |
1943 | | // ReleaseFileNumberFromPendingOutputs(x) |
1944 | | // This will protect any file with number `file_num` or greater from being |
1945 | | // deleted while <do something> is running. |
1946 | | // ----------- |
1947 | | // This function will capture current file number and append it to |
1948 | | // pending_outputs_. This will prevent any background process to delete any |
1949 | | // file created after this point. |
1950 | | std::list<uint64_t>::iterator CaptureCurrentFileNumberInPendingOutputs(); |
1951 | | // This function should be called with the result of |
1952 | | // CaptureCurrentFileNumberInPendingOutputs(). It then marks that any file |
1953 | | // created between the calls CaptureCurrentFileNumberInPendingOutputs() and |
1954 | | // ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live |
1955 | | // and blocked by any other pending_outputs_ calls) |
1956 | | void ReleaseFileNumberFromPendingOutputs( |
1957 | | std::unique_ptr<std::list<uint64_t>::iterator>& v); |
1958 | | |
1959 | | IOStatus SyncClosedWals(const WriteOptions& write_options, |
1960 | | JobContext* job_context, VersionEdit* synced_wals, |
1961 | | bool error_recovery_in_prog); |
1962 | | |
1963 | | // Flush the in-memory write buffer to storage. Switches to a new |
1964 | | // log-file/memtable and writes a new descriptor iff successful. Then |
1965 | | // installs a new super version for the column family. |
1966 | | Status FlushMemTableToOutputFile( |
1967 | | ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, |
1968 | | bool* madeProgress, JobContext* job_context, FlushReason flush_reason, |
1969 | | SuperVersionContext* superversion_context, |
1970 | | std::vector<SequenceNumber>& snapshot_seqs, |
1971 | | SequenceNumber earliest_write_conflict_snapshot, |
1972 | | SnapshotChecker* snapshot_checker, LogBuffer* log_buffer, |
1973 | | Env::Priority thread_pri); |
1974 | | |
1975 | | // Flush the memtables of (multiple) column families to multiple files on |
1976 | | // persistent storage. |
1977 | | Status FlushMemTablesToOutputFiles( |
1978 | | const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, |
1979 | | JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); |
1980 | | |
1981 | | Status AtomicFlushMemTablesToOutputFiles( |
1982 | | const autovector<BGFlushArg>& bg_flush_args, bool* made_progress, |
1983 | | JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri); |
1984 | | |
1985 | | // REQUIRES: log_numbers are sorted in ascending order |
1986 | | // corrupted_log_found is set to true if we recover from a corrupted log file. |
1987 | | Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers, |
1988 | | SequenceNumber* next_sequence, bool read_only, |
1989 | | bool is_retry, bool* corrupted_log_found, |
1990 | | RecoveryContext* recovery_ctx); |
1991 | | |
1992 | | // The following two methods are used to flush a memtable to |
1993 | | // storage. The first one is used at database RecoveryTime (when the |
1994 | | // database is opened) and is heavyweight because it holds the mutex |
1995 | | // for the entire period. The second method WriteLevel0Table supports |
1996 | | // concurrent flush memtables to storage. |
1997 | | Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, |
1998 | | MemTable* mem, VersionEdit* edit); |
1999 | | |
2000 | | // Get the size of a log file and, if truncate is true, truncate the |
2001 | | // log file to its actual size, thereby freeing preallocated space. |
2002 | | // Return success even if truncate fails |
2003 | | Status GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate, |
2004 | | LogFileNumberSize* log); |
2005 | | |
2006 | | // Restore alive_log_files_ and total_log_size_ after recovery. |
2007 | | // It needs to run only when there's no flush during recovery |
2008 | | // (e.g. avoid_flush_during_recovery=true). May also trigger flush |
2009 | | // in case total_log_size > max_total_wal_size. |
2010 | | Status RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers); |
2011 | | |
2012 | | // num_bytes: for slowdown case, delay time is calculated based on |
2013 | | // `num_bytes` going through. |
2014 | | Status DelayWrite(uint64_t num_bytes, WriteThread& write_thread, |
2015 | | const WriteOptions& write_options); |
2016 | | |
2017 | | // Begin stalling of writes when memory usage increases beyond a certain |
2018 | | // threshold. |
2019 | | void WriteBufferManagerStallWrites(); |
2020 | | |
2021 | | Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, |
2022 | | WriteBatch* my_batch); |
2023 | | |
2024 | | // REQUIRES: mutex locked and in write thread. |
2025 | | Status ScheduleFlushes(WriteContext* context); |
2026 | | |
2027 | | void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds); |
2028 | | |
2029 | | Status TrimMemtableHistory(WriteContext* context); |
2030 | | |
2031 | | Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); |
2032 | | |
2033 | | // Select and output column families qualified for atomic flush in |
2034 | | // `selected_cfds`. If `provided_candidate_cfds` is non-empty, it will be used |
2035 | | // as candidate CFs to select qualified ones from. Otherwise, all column |
2036 | | // families are used as candidate to select from. |
2037 | | // |
2038 | | // REQUIRES: mutex held |
2039 | | void SelectColumnFamiliesForAtomicFlush( |
2040 | | autovector<ColumnFamilyData*>* selected_cfds, |
2041 | | const autovector<ColumnFamilyData*>& provided_candidate_cfds = {}, |
2042 | | FlushReason flush_reason = FlushReason::kOthers); |
2043 | | |
2044 | | // Force current memtable contents to be flushed. |
2045 | | Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options, |
2046 | | FlushReason flush_reason, |
2047 | | bool entered_write_thread = false); |
2048 | | |
2049 | | // Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds` |
2050 | | // (if non-empty) or amomg all column families and atomically record the |
2051 | | // result to the MANIFEST. |
2052 | | Status AtomicFlushMemTables( |
2053 | | const FlushOptions& options, FlushReason flush_reason, |
2054 | | const autovector<ColumnFamilyData*>& provided_candidate_cfds = {}, |
2055 | | bool entered_write_thread = false); |
2056 | | |
2057 | | Status RetryFlushesForErrorRecovery(FlushReason flush_reason, bool wait); |
2058 | | |
2059 | | // Wait until flushing this column family won't stall writes |
2060 | | Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd, |
2061 | | bool* flush_needed); |
2062 | | |
2063 | | // Wait for memtable flushed. |
2064 | | // If flush_memtable_id is non-null, wait until the memtable with the ID |
2065 | | // gets flush. Otherwise, wait until the column family don't have any |
2066 | | // memtable pending flush. |
2067 | | // resuming_from_bg_err indicates whether the caller is attempting to resume |
2068 | | // from background error. |
2069 | | Status WaitForFlushMemTable(ColumnFamilyData* cfd, |
2070 | | const uint64_t* flush_memtable_id = nullptr, |
2071 | 0 | bool resuming_from_bg_err = false) { |
2072 | 0 | return WaitForFlushMemTables({cfd}, {flush_memtable_id}, |
2073 | 0 | resuming_from_bg_err); |
2074 | 0 | } |
2075 | | // Wait for memtables to be flushed for multiple column families. |
2076 | | Status WaitForFlushMemTables( |
2077 | | const autovector<ColumnFamilyData*>& cfds, |
2078 | | const autovector<const uint64_t*>& flush_memtable_ids, |
2079 | | bool resuming_from_bg_err); |
2080 | | |
2081 | 133 | inline void WaitForPendingWrites() { |
2082 | 133 | mutex_.AssertHeld(); |
2083 | 133 | TEST_SYNC_POINT("DBImpl::WaitForPendingWrites:BeforeBlock"); |
2084 | | // In case of pipelined write is enabled, wait for all pending memtable |
2085 | | // writers. |
2086 | 133 | if (immutable_db_options_.enable_pipelined_write) { |
2087 | | // Memtable writers may call DB::Get in case max_successive_merges > 0, |
2088 | | // which may lock mutex. Unlocking mutex here to avoid deadlock. |
2089 | 0 | mutex_.Unlock(); |
2090 | 0 | write_thread_.WaitForMemTableWriters(); |
2091 | 0 | mutex_.Lock(); |
2092 | 0 | } |
2093 | | |
2094 | 133 | if (immutable_db_options_.unordered_write) { |
2095 | | // Wait for the ones who already wrote to the WAL to finish their |
2096 | | // memtable write. |
2097 | 0 | if (pending_memtable_writes_.load() != 0) { |
2098 | | // XXX: suspicious wait while holding DB mutex? |
2099 | 0 | std::unique_lock<std::mutex> guard(switch_mutex_); |
2100 | 0 | switch_cv_.wait(guard, |
2101 | 0 | [&] { return pending_memtable_writes_.load() == 0; }); |
2102 | 0 | } |
2103 | 133 | } else { |
2104 | | // (Writes are finished before the next write group starts.) |
2105 | 133 | } |
2106 | | |
2107 | | // Wait for any LockWAL to clear |
2108 | 133 | while (lock_wal_count_ > 0) { |
2109 | 0 | bg_cv_.Wait(); |
2110 | 0 | } |
2111 | 133 | } |
2112 | | |
2113 | | // TaskType is used to identify tasks in thread-pool, currently only |
2114 | | // differentiate manual compaction, which could be unscheduled from the |
2115 | | // thread-pool. |
2116 | | enum class TaskType : uint8_t { |
2117 | | kDefault = 0, |
2118 | | kManualCompaction = 1, |
2119 | | kCount = 2, |
2120 | | }; |
2121 | | |
2122 | | // Task tag is used to identity tasks in thread-pool, which is |
2123 | | // dbImpl obj address + type |
2124 | 146 | inline void* GetTaskTag(TaskType type) { |
2125 | 146 | return GetTaskTag(static_cast<uint8_t>(type)); |
2126 | 146 | } |
2127 | | |
2128 | 66.6k | inline void* GetTaskTag(uint8_t type) { |
2129 | 66.6k | return static_cast<uint8_t*>(static_cast<void*>(this)) + type; |
2130 | 66.6k | } |
2131 | | |
2132 | | // REQUIRES: mutex locked and in write thread. |
2133 | | void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds); |
2134 | | |
2135 | | // REQUIRES: mutex locked and in write thread. |
2136 | | Status SwitchWAL(WriteContext* write_context); |
2137 | | |
2138 | | // REQUIRES: mutex locked and in write thread. |
2139 | | Status HandleWriteBufferManagerFlush(WriteContext* write_context); |
2140 | | |
2141 | | // REQUIRES: mutex locked |
2142 | | Status PreprocessWrite(const WriteOptions& write_options, |
2143 | | LogContext* log_context, WriteContext* write_context); |
2144 | | |
2145 | | // Merge write batches in the write group into merged_batch. |
2146 | | // Returns OK if merge is successful. |
2147 | | // Returns Corruption if corruption in write batch is detected. |
2148 | | Status MergeBatch(const WriteThread::WriteGroup& write_group, |
2149 | | WriteBatch* tmp_batch, WriteBatch** merged_batch, |
2150 | | size_t* write_with_wal, WriteBatch** to_be_cached_state); |
2151 | | |
2152 | | IOStatus WriteToWAL(const WriteBatch& merged_batch, |
2153 | | const WriteOptions& write_options, |
2154 | | log::Writer* log_writer, uint64_t* log_used, |
2155 | | uint64_t* log_size, |
2156 | | LogFileNumberSize& log_file_number_size); |
2157 | | |
2158 | | IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group, |
2159 | | log::Writer* log_writer, uint64_t* log_used, |
2160 | | bool need_log_sync, bool need_log_dir_sync, |
2161 | | SequenceNumber sequence, |
2162 | | LogFileNumberSize& log_file_number_size); |
2163 | | |
2164 | | IOStatus ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group, |
2165 | | uint64_t* log_used, |
2166 | | SequenceNumber* last_sequence, size_t seq_inc); |
2167 | | |
2168 | | // Used by WriteImpl to update bg_error_ if paranoid check is enabled. |
2169 | | // Caller must hold mutex_. |
2170 | | void WriteStatusCheckOnLocked(const Status& status); |
2171 | | |
2172 | | // Used by WriteImpl to update bg_error_ if paranoid check is enabled. |
2173 | | void WriteStatusCheck(const Status& status); |
2174 | | |
2175 | | // Used by WriteImpl to update bg_error_ when IO error happens, e.g., write |
2176 | | // WAL, sync WAL fails, if paranoid check is enabled. |
2177 | | void IOStatusCheck(const IOStatus& status); |
2178 | | |
2179 | | // Used by WriteImpl to update bg_error_ in case of memtable insert error. |
2180 | | void MemTableInsertStatusCheck(const Status& memtable_insert_status); |
2181 | | |
2182 | | Status CompactFilesImpl(const CompactionOptions& compact_options, |
2183 | | ColumnFamilyData* cfd, Version* version, |
2184 | | const std::vector<std::string>& input_file_names, |
2185 | | std::vector<std::string>* const output_file_names, |
2186 | | const int output_level, int output_path_id, |
2187 | | JobContext* job_context, LogBuffer* log_buffer, |
2188 | | CompactionJobInfo* compaction_job_info); |
2189 | | |
2190 | | ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name); |
2191 | | |
2192 | | void MaybeScheduleFlushOrCompaction(); |
2193 | | |
2194 | | struct FlushRequest { |
2195 | | FlushReason flush_reason; |
2196 | | // A map from column family to flush to largest memtable id to persist for |
2197 | | // each column family. Once all the memtables whose IDs are smaller than or |
2198 | | // equal to this per-column-family specified value, this flush request is |
2199 | | // considered to have completed its work of flushing this column family. |
2200 | | // After completing the work for all column families in this request, this |
2201 | | // flush is considered complete. |
2202 | | std::unordered_map<ColumnFamilyData*, uint64_t> |
2203 | | cfd_to_max_mem_id_to_persist; |
2204 | | |
2205 | | #ifndef NDEBUG |
2206 | | int reschedule_count = 1; |
2207 | | #endif /* !NDEBUG */ |
2208 | | }; |
2209 | | |
2210 | | // In case of atomic flush, generates a `FlushRequest` for the latest atomic |
2211 | | // cuts for these `cfds`. Atomic cuts are recorded in |
2212 | | // `AssignAtomicFlushSeq()`. For each entry in `cfds`, all CFDs sharing the |
2213 | | // same latest atomic cut must also be present. |
2214 | | // |
2215 | | // REQUIRES: mutex held |
2216 | | void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds, |
2217 | | FlushReason flush_reason, FlushRequest* req); |
2218 | | |
2219 | | // Below functions are for executing flush, compaction in the background. A |
2220 | | // dequeue is the communication channel between threads that asks for the work |
2221 | | // to be done and the available threads in the thread pool that pick it up to |
2222 | | // execute it. We use these terminologies to describe the state of the work |
2223 | | // and its transitions: |
2224 | | // 1) It becomes pending once it's successfully enqueued into the |
2225 | | // corresponding dequeue, a work in this state is also called unscheduled. |
2226 | | // Counter `unscheduled_*_` counts work in this state. |
2227 | | // 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*` |
2228 | | // for the work, it becomes scheduled |
2229 | | // Counter `bg_*_scheduled_` counts work in this state. |
2230 | | // 3) Once the thread start to execute `BGWork*`, the work is popped from the |
2231 | | // dequeue, it is now in running state |
2232 | | // Counter `num_running_*_` counts work in this state. |
2233 | | // 4) Eventually, the work is finished. We don't need to specifically track |
2234 | | // finished work. |
2235 | | |
2236 | | // Returns true if `req` is successfully enqueued. |
2237 | | bool EnqueuePendingFlush(const FlushRequest& req); |
2238 | | |
2239 | | void EnqueuePendingCompaction(ColumnFamilyData* cfd); |
2240 | | void SchedulePendingPurge(std::string fname, std::string dir_to_sync, |
2241 | | FileType type, uint64_t number, int job_id); |
2242 | | static void BGWorkCompaction(void* arg); |
2243 | | // Runs a pre-chosen universal compaction involving bottom level in a |
2244 | | // separate, bottom-pri thread pool. |
2245 | | static void BGWorkBottomCompaction(void* arg); |
2246 | | static void BGWorkFlush(void* arg); |
2247 | | static void BGWorkPurge(void* arg); |
2248 | | static void UnscheduleCompactionCallback(void* arg); |
2249 | | static void UnscheduleFlushCallback(void* arg); |
2250 | | void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction, |
2251 | | Env::Priority thread_pri); |
2252 | | void BackgroundCallFlush(Env::Priority thread_pri); |
2253 | | void BackgroundCallPurge(); |
2254 | | Status BackgroundCompaction(bool* madeProgress, JobContext* job_context, |
2255 | | LogBuffer* log_buffer, |
2256 | | PrepickedCompaction* prepicked_compaction, |
2257 | | Env::Priority thread_pri); |
2258 | | Status BackgroundFlush(bool* madeProgress, JobContext* job_context, |
2259 | | LogBuffer* log_buffer, FlushReason* reason, |
2260 | | bool* flush_rescheduled_to_retain_udt, |
2261 | | Env::Priority thread_pri); |
2262 | | |
2263 | | bool EnoughRoomForCompaction(ColumnFamilyData* cfd, |
2264 | | const std::vector<CompactionInputFiles>& inputs, |
2265 | | bool* sfm_bookkeeping, LogBuffer* log_buffer); |
2266 | | |
2267 | | // Request compaction tasks token from compaction thread limiter. |
2268 | | // It always succeeds if force = true or limiter is disable. |
2269 | | bool RequestCompactionToken(ColumnFamilyData* cfd, bool force, |
2270 | | std::unique_ptr<TaskLimiterToken>* token, |
2271 | | LogBuffer* log_buffer); |
2272 | | |
2273 | | // Return true if the `FlushRequest` can be rescheduled to retain the UDT. |
2274 | | // Only true if there are user-defined timestamps in the involved MemTables |
2275 | | // with newer than cutoff timestamp `full_history_ts_low` and not flushing |
2276 | | // immediately will not cause entering write stall mode. |
2277 | | bool ShouldRescheduleFlushRequestToRetainUDT(const FlushRequest& flush_req); |
2278 | | |
2279 | | // Schedule background tasks |
2280 | | Status StartPeriodicTaskScheduler(); |
2281 | | |
2282 | | // Cancel scheduled periodic tasks |
2283 | | Status CancelPeriodicTaskScheduler(); |
2284 | | |
2285 | | Status RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options, |
2286 | | const WriteOptions& write_options, |
2287 | | bool is_new_db); |
2288 | | |
2289 | | void PrintStatistics(); |
2290 | | |
2291 | | size_t EstimateInMemoryStatsHistorySize() const; |
2292 | | |
2293 | | // Return the minimum empty level that could hold the total data in the |
2294 | | // input level. Return the input level, if such level could not be found. |
2295 | | int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd, |
2296 | | const MutableCFOptions& mutable_cf_options, |
2297 | | int level); |
2298 | | |
2299 | | // Move the files in the input level to the target level. |
2300 | | // If target_level < 0, automatically calculate the minimum level that could |
2301 | | // hold the data set. |
2302 | | Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1); |
2303 | | |
2304 | | // helper functions for adding and removing from flush & compaction queues |
2305 | | void AddToCompactionQueue(ColumnFamilyData* cfd); |
2306 | | ColumnFamilyData* PopFirstFromCompactionQueue(); |
2307 | | FlushRequest PopFirstFromFlushQueue(); |
2308 | | |
2309 | | // Pick the first unthrottled compaction with task token from queue. |
2310 | | ColumnFamilyData* PickCompactionFromQueue( |
2311 | | std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer); |
2312 | | |
2313 | | IOStatus SyncWalImpl(bool include_current_wal, |
2314 | | const WriteOptions& write_options, |
2315 | | JobContext* job_context, VersionEdit* synced_wals, |
2316 | | bool error_recovery_in_prog); |
2317 | | |
2318 | | // helper function to call after some of the logs_ were synced |
2319 | | void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit); |
2320 | | Status ApplyWALToManifest(const ReadOptions& read_options, |
2321 | | const WriteOptions& write_options, |
2322 | | VersionEdit* edit); |
2323 | | // WALs with log number up to up_to are not synced successfully. |
2324 | | void MarkLogsNotSynced(uint64_t up_to); |
2325 | | |
2326 | | SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary, |
2327 | | bool lock = true); |
2328 | | |
2329 | | // If snapshot_seq != kMaxSequenceNumber, then this function can only be |
2330 | | // called from the write thread that publishes sequence numbers to readers. |
2331 | | // For 1) write-committed, or 2) write-prepared + one-write-queue, this will |
2332 | | // be the write thread performing memtable writes. For write-prepared with |
2333 | | // two write queues, this will be the write thread writing commit marker to |
2334 | | // the WAL. |
2335 | | // If snapshot_seq == kMaxSequenceNumber, this function is called by a caller |
2336 | | // ensuring no writes to the database. |
2337 | | std::pair<Status, std::shared_ptr<const SnapshotImpl>> |
2338 | | CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts, |
2339 | | bool lock = true); |
2340 | | |
2341 | | uint64_t GetMaxTotalWalSize() const; |
2342 | | |
2343 | | FSDirectory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const; |
2344 | | |
2345 | | Status MaybeReleaseTimestampedSnapshotsAndCheck(); |
2346 | | |
2347 | | Status CloseHelper(); |
2348 | | |
2349 | | void WaitForBackgroundWork(); |
2350 | | |
2351 | | // Background threads call this function, which is just a wrapper around |
2352 | | // the InstallSuperVersion() function. Background threads carry |
2353 | | // sv_context which can have new_superversion already |
2354 | | // allocated. |
2355 | | // All ColumnFamily state changes go through this function. Here we analyze |
2356 | | // the new state and we schedule background work if we detect that the new |
2357 | | // state needs flush or compaction. |
2358 | | void InstallSuperVersionAndScheduleWork( |
2359 | | ColumnFamilyData* cfd, SuperVersionContext* sv_context, |
2360 | | const MutableCFOptions& mutable_cf_options); |
2361 | | |
2362 | | bool GetIntPropertyInternal(ColumnFamilyData* cfd, |
2363 | | const DBPropertyInfo& property_info, |
2364 | | bool is_locked, uint64_t* value); |
2365 | | bool GetPropertyHandleOptionsStatistics(std::string* value); |
2366 | | |
2367 | | bool HasPendingManualCompaction(); |
2368 | | bool HasExclusiveManualCompaction(); |
2369 | | void AddManualCompaction(ManualCompactionState* m); |
2370 | | void RemoveManualCompaction(ManualCompactionState* m); |
2371 | | bool ShouldntRunManualCompaction(ManualCompactionState* m); |
2372 | | bool HaveManualCompaction(ColumnFamilyData* cfd); |
2373 | | bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1); |
2374 | | void UpdateDeletionCompactionStats(const std::unique_ptr<Compaction>& c); |
2375 | | |
2376 | | // May open and read table files for table property. |
2377 | | // Should not be called while holding mutex_. |
2378 | | void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c, |
2379 | | const Status& st, |
2380 | | const CompactionJobStats& compaction_job_stats, |
2381 | | const int job_id, |
2382 | | CompactionJobInfo* compaction_job_info) const; |
2383 | | // Reserve the next 'num' file numbers for to-be-ingested external SST files, |
2384 | | // and return the current file_number in 'next_file_number'. |
2385 | | // Write a version edit to the MANIFEST. |
2386 | | Status ReserveFileNumbersBeforeIngestion( |
2387 | | ColumnFamilyData* cfd, uint64_t num, |
2388 | | std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem, |
2389 | | uint64_t* next_file_number); |
2390 | | |
2391 | | bool ShouldPurge(uint64_t file_number) const; |
2392 | | void MarkAsGrabbedForPurge(uint64_t file_number); |
2393 | | |
2394 | | size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; |
2395 | 11.2k | Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; } |
2396 | | |
2397 | | IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num, |
2398 | | uint64_t recycle_log_number, size_t preallocate_block_size, |
2399 | | log::Writer** new_log); |
2400 | | |
2401 | | // Validate self-consistency of DB options |
2402 | | static Status ValidateOptions(const DBOptions& db_options); |
2403 | | // Validate self-consistency of DB options and its consistency with cf options |
2404 | | static Status ValidateOptions( |
2405 | | const DBOptions& db_options, |
2406 | | const std::vector<ColumnFamilyDescriptor>& column_families); |
2407 | | |
2408 | | // Utility function to do some debug validation and sort the given vector |
2409 | | // of MultiGet keys |
2410 | | void PrepareMultiGetKeys( |
2411 | | const size_t num_keys, bool sorted, |
2412 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs); |
2413 | | |
2414 | | void MultiGetCommon(const ReadOptions& options, |
2415 | | ColumnFamilyHandle* column_family, const size_t num_keys, |
2416 | | const Slice* keys, PinnableSlice* values, |
2417 | | PinnableWideColumns* columns, std::string* timestamps, |
2418 | | Status* statuses, bool sorted_input); |
2419 | | |
2420 | | void MultiGetCommon(const ReadOptions& options, const size_t num_keys, |
2421 | | ColumnFamilyHandle** column_families, const Slice* keys, |
2422 | | PinnableSlice* values, PinnableWideColumns* columns, |
2423 | | std::string* timestamps, Status* statuses, |
2424 | | bool sorted_input); |
2425 | | |
2426 | | // A structure to hold the information required to process MultiGet of keys |
2427 | | // belonging to one column family. For a multi column family MultiGet, there |
2428 | | // will be a container of these objects. |
2429 | | struct MultiGetKeyRangePerCf { |
2430 | | // For the batched MultiGet which relies on sorted keys, start specifies |
2431 | | // the index of first key belonging to this column family in the sorted |
2432 | | // list. |
2433 | | size_t start; |
2434 | | |
2435 | | // For the batched MultiGet case, num_keys specifies the number of keys |
2436 | | // belonging to this column family in the sorted list |
2437 | | size_t num_keys; |
2438 | | |
2439 | 0 | MultiGetKeyRangePerCf() : start(0), num_keys(0) {} |
2440 | | |
2441 | | MultiGetKeyRangePerCf(size_t first, size_t count) |
2442 | 0 | : start(first), num_keys(count) {} |
2443 | | }; |
2444 | | |
2445 | | // A structure to contain ColumnFamilyData and the SuperVersion obtained for |
2446 | | // the consistent view of DB |
2447 | | struct ColumnFamilySuperVersionPair { |
2448 | | ColumnFamilyHandleImpl* cfh; |
2449 | | ColumnFamilyData* cfd; |
2450 | | |
2451 | | // SuperVersion for the column family obtained in a manner that ensures a |
2452 | | // consistent view across all column families in the DB |
2453 | | SuperVersion* super_version; |
2454 | | ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family, |
2455 | | SuperVersion* sv) |
2456 | | : cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)), |
2457 | | cfd(cfh->cfd()), |
2458 | 0 | super_version(sv) {} |
2459 | | |
2460 | | ColumnFamilySuperVersionPair() = default; |
2461 | | }; |
2462 | | |
2463 | | // A common function to obtain a consistent snapshot, which can be implicit |
2464 | | // if the user doesn't specify a snapshot in read_options, across |
2465 | | // multiple column families. It will attempt to get an implicit |
2466 | | // snapshot without acquiring the db_mutes, but will give up after a few |
2467 | | // tries and acquire the mutex if a memtable flush happens. The template |
2468 | | // allows both the batched and non-batched MultiGet to call this with |
2469 | | // either an std::unordered_map or autovector of column families. |
2470 | | // |
2471 | | // If callback is non-null, the callback is refreshed with the snapshot |
2472 | | // sequence number |
2473 | | // |
2474 | | // `extra_sv_ref` is used to indicate whether thread-local SuperVersion |
2475 | | // should be obtained with an extra ref (by GetReferencedSuperVersion()) or |
2476 | | // not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet |
2477 | | // does not require SuperVersion to be re-acquired throughout the entire |
2478 | | // invocation (no need extra ref), while MultiCfIterators may need the |
2479 | | // SuperVersion to be updated during Refresh() (requires extra ref). |
2480 | | // |
2481 | | // `sv_from_thread_local` being set to false indicates that the SuperVersion |
2482 | | // obtained from the ColumnFamilyData, whereas true indicates they are thread |
2483 | | // local. |
2484 | | // |
2485 | | // A non-OK status will be returned if for a column family that enables |
2486 | | // user-defined timestamp feature, the specified `ReadOptions.timestamp` |
2487 | | // attemps to read collapsed history. |
2488 | | template <class T, typename IterDerefFuncType> |
2489 | | Status MultiCFSnapshot(const ReadOptions& read_options, |
2490 | | ReadCallback* callback, |
2491 | | IterDerefFuncType iter_deref_func, T* cf_list, |
2492 | | bool extra_sv_ref, SequenceNumber* snapshot, |
2493 | | bool* sv_from_thread_local); |
2494 | | |
2495 | | // The actual implementation of the batching MultiGet. The caller is expected |
2496 | | // to have acquired the SuperVersion and pass in a snapshot sequence number |
2497 | | // in order to construct the LookupKeys. The start_key and num_keys specify |
2498 | | // the range of keys in the sorted_keys vector for a single column family. |
2499 | | Status MultiGetImpl( |
2500 | | const ReadOptions& read_options, size_t start_key, size_t num_keys, |
2501 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys, |
2502 | | SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback); |
2503 | | |
2504 | | void MultiGetWithCallbackImpl( |
2505 | | const ReadOptions& read_options, ColumnFamilyHandle* column_family, |
2506 | | ReadCallback* callback, |
2507 | | autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys); |
2508 | | |
2509 | | Status DisableFileDeletionsWithLock(); |
2510 | | |
2511 | | Status IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd, |
2512 | | std::string ts_low); |
2513 | | |
2514 | | bool ShouldReferenceSuperVersion(const MergeContext& merge_context); |
2515 | | |
2516 | | template <typename IterType, typename ImplType, |
2517 | | typename ErrorIteratorFuncType> |
2518 | | std::unique_ptr<IterType> NewMultiCfIterator( |
2519 | | const ReadOptions& _read_options, |
2520 | | const std::vector<ColumnFamilyHandle*>& column_families, |
2521 | | ErrorIteratorFuncType error_iterator_func); |
2522 | | |
2523 | | // Lock over the persistent DB state. Non-nullptr iff successfully acquired. |
2524 | | FileLock* db_lock_; |
2525 | | |
2526 | | // Guards changes to DB and CF options to ensure consistency between |
2527 | | // * In-memory options objects |
2528 | | // * Settings in effect |
2529 | | // * Options file contents |
2530 | | // while allowing the DB mutex to be released during slow operations like |
2531 | | // persisting options file or modifying global periodic task timer. |
2532 | | // Always acquired *before* DB mutex when this one is applicable. |
2533 | | InstrumentedMutex options_mutex_; |
2534 | | |
2535 | | // Guards reads and writes to in-memory stats_history_. |
2536 | | InstrumentedMutex stats_history_mutex_; |
2537 | | |
2538 | | // In addition to mutex_, log_write_mutex_ protects writes to logs_ and |
2539 | | // logfile_number_. With two_write_queues it also protects alive_log_files_, |
2540 | | // and log_empty_. Refer to the definition of each variable below for more |
2541 | | // details. |
2542 | | // Note: to avoid deadlock, if needed to acquire both log_write_mutex_ and |
2543 | | // mutex_, the order should be first mutex_ and then log_write_mutex_. |
2544 | | InstrumentedMutex log_write_mutex_; |
2545 | | |
2546 | | // If zero, manual compactions are allowed to proceed. If non-zero, manual |
2547 | | // compactions may still be running, but will quickly fail with |
2548 | | // `Status::Incomplete`. The value indicates how many threads have paused |
2549 | | // manual compactions. It is accessed in read mode outside the DB mutex in |
2550 | | // compaction code paths. |
2551 | | std::atomic<int> manual_compaction_paused_; |
2552 | | |
2553 | | // This condition variable is signaled on these conditions: |
2554 | | // * whenever bg_compaction_scheduled_ goes down to 0 |
2555 | | // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't |
2556 | | // made any progress |
2557 | | // * whenever a compaction made any progress |
2558 | | // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases |
2559 | | // (i.e. whenever a flush is done, even if it didn't make any progress) |
2560 | | // * whenever there is an error in background purge, flush or compaction |
2561 | | // * whenever num_running_ingest_file_ goes to 0. |
2562 | | // * whenever pending_purge_obsolete_files_ goes to 0. |
2563 | | // * whenever disable_delete_obsolete_files_ goes to 0. |
2564 | | // * whenever SetOptions successfully updates options. |
2565 | | // * whenever a column family is dropped. |
2566 | | InstrumentedCondVar bg_cv_; |
2567 | | // Writes are protected by locking both mutex_ and log_write_mutex_, and reads |
2568 | | // must be under either mutex_ or log_write_mutex_. Since after ::Open, |
2569 | | // logfile_number_ is currently updated only in write_thread_, it can be read |
2570 | | // from the same write_thread_ without any locks. |
2571 | | uint64_t logfile_number_; |
2572 | | // Log files that we can recycle. Must be protected by db mutex_. |
2573 | | std::deque<uint64_t> log_recycle_files_; |
2574 | | // The minimum log file number taht can be recycled, if log recycling is |
2575 | | // enabled. This is used to ensure that log files created by previous |
2576 | | // instances of the database are not recycled, as we cannot be sure they |
2577 | | // were created in the recyclable format. |
2578 | | uint64_t min_log_number_to_recycle_; |
2579 | | // Protected by log_write_mutex_. |
2580 | | bool log_dir_synced_; |
2581 | | // Without two_write_queues, read and writes to log_empty_ are protected by |
2582 | | // mutex_. Since it is currently updated/read only in write_thread_, it can be |
2583 | | // accessed from the same write_thread_ without any locks. With |
2584 | | // two_write_queues writes, where it can be updated in different threads, |
2585 | | // read and writes are protected by log_write_mutex_ instead. This is to avoid |
2586 | | // expensive mutex_ lock during WAL write, which update log_empty_. |
2587 | | bool log_empty_; |
2588 | | |
2589 | | ColumnFamilyHandleImpl* persist_stats_cf_handle_; |
2590 | | |
2591 | | bool persistent_stats_cfd_exists_ = true; |
2592 | | |
2593 | | // The current WAL file and those that have not been found obsolete from |
2594 | | // memtable flushes. A WAL not on this list might still be pending writer |
2595 | | // flush and/or sync and close and might still be in logs_. alive_log_files_ |
2596 | | // is protected by mutex_ and log_write_mutex_ with details as follows: |
2597 | | // 1. read by FindObsoleteFiles() which can be called in either application |
2598 | | // thread or RocksDB bg threads, both mutex_ and log_write_mutex_ are |
2599 | | // held. |
2600 | | // 2. pop_front() by FindObsoleteFiles(), both mutex_ and log_write_mutex_ |
2601 | | // are held. |
2602 | | // 3. push_back() by DBImpl::Open() and DBImpl::RestoreAliveLogFiles() |
2603 | | // (actually called by Open()), only mutex_ is held because at this point, |
2604 | | // the DB::Open() call has not returned success to application, and the |
2605 | | // only other thread(s) that can conflict are bg threads calling |
2606 | | // FindObsoleteFiles() which ensure that both mutex_ and log_write_mutex_ |
2607 | | // are held when accessing alive_log_files_. |
2608 | | // 4. read by DBImpl::Open() is protected by mutex_. |
2609 | | // 5. push_back() by SwitchMemtable(). Both mutex_ and log_write_mutex_ are |
2610 | | // held. This is done by the write group leader. Note that in the case of |
2611 | | // two-write-queues, another WAL-only write thread can be writing to the |
2612 | | // WAL concurrently. See 9. |
2613 | | // 6. read by SwitchWAL() with both mutex_ and log_write_mutex_ held. This is |
2614 | | // done by write group leader. |
2615 | | // 7. read by ConcurrentWriteToWAL() by the write group leader in the case of |
2616 | | // two-write-queues. Only log_write_mutex_ is held to protect concurrent |
2617 | | // pop_front() by FindObsoleteFiles(). |
2618 | | // 8. read by PreprocessWrite() by the write group leader. log_write_mutex_ |
2619 | | // is held to protect the data structure from concurrent pop_front() by |
2620 | | // FindObsoleteFiles(). |
2621 | | // 9. read by ConcurrentWriteToWAL() by a WAL-only write thread in the case |
2622 | | // of two-write-queues. Only log_write_mutex_ is held. This suffices to |
2623 | | // protect the data structure from concurrent push_back() by current |
2624 | | // write group leader as well as pop_front() by FindObsoleteFiles(). |
2625 | | std::deque<LogFileNumberSize> alive_log_files_; |
2626 | | |
2627 | | // Log files that aren't fully synced, and the current log file. |
2628 | | // Synchronization: |
2629 | | // 1. read by FindObsoleteFiles() which can be called either in application |
2630 | | // thread or RocksDB bg threads. log_write_mutex_ is always held, while |
2631 | | // some reads are performed without mutex_. |
2632 | | // 2. pop_front() by FindObsoleteFiles() with only log_write_mutex_ held. |
2633 | | // 3. read by DBImpl::Open() with both mutex_ and log_write_mutex_. |
2634 | | // 4. emplace_back() by DBImpl::Open() with both mutex_ and log_write_mutex. |
2635 | | // Note that at this point, DB::Open() has not returned success to |
2636 | | // application, thus the only other thread(s) that can conflict are bg |
2637 | | // threads calling FindObsoleteFiles(). See 1. |
2638 | | // 5. iteration and clear() from CloseHelper() always hold log_write_mutex |
2639 | | // and mutex_. |
2640 | | // 6. back() called by APIs FlushWAL() and LockWAL() are protected by only |
2641 | | // log_write_mutex_. These two can be called by application threads after |
2642 | | // DB::Open() returns success to applications. |
2643 | | // 7. read by SyncWAL(), another API, protected by only log_write_mutex_. |
2644 | | // 8. read by MarkLogsNotSynced() and MarkLogsSynced() are protected by |
2645 | | // log_write_mutex_. |
2646 | | // 9. erase() by MarkLogsSynced() protected by log_write_mutex_. |
2647 | | // 10. read by SyncClosedWals() protected by only log_write_mutex_. This can |
2648 | | // happen in bg flush threads after DB::Open() returns success to |
2649 | | // applications. |
2650 | | // 11. reads, e.g. front(), iteration, and back() called by PreprocessWrite() |
2651 | | // holds only the log_write_mutex_. This is done by the write group |
2652 | | // leader. A bg thread calling FindObsoleteFiles() or MarkLogsSynced() |
2653 | | // can happen concurrently. This is fine because log_write_mutex_ is used |
2654 | | // by all parties. See 2, 5, 9. |
2655 | | // 12. reads, empty(), back() called by SwitchMemtable() hold both mutex_ and |
2656 | | // log_write_mutex_. This happens in the write group leader. |
2657 | | // 13. emplace_back() by SwitchMemtable() hold both mutex_ and |
2658 | | // log_write_mutex_. This happens in the write group leader. Can conflict |
2659 | | // with bg threads calling FindObsoleteFiles(), MarkLogsSynced(), |
2660 | | // SyncClosedWals(), etc. as well as application threads calling |
2661 | | // FlushWAL(), SyncWAL(), LockWAL(). This is fine because all parties |
2662 | | // require at least log_write_mutex_. |
2663 | | // 14. iteration called in WriteToWAL(write_group) protected by |
2664 | | // log_write_mutex_. This is done by write group leader when |
2665 | | // two-write-queues is disabled and write needs to sync logs. |
2666 | | // 15. back() called in ConcurrentWriteToWAL() protected by log_write_mutex_. |
2667 | | // This can be done by the write group leader if two-write-queues is |
2668 | | // enabled. It can also be done by another WAL-only write thread. |
2669 | | // |
2670 | | // Other observations: |
2671 | | // - back() and items with getting_synced=true are not popped, |
2672 | | // - The same thread that sets getting_synced=true will reset it. |
2673 | | // - it follows that the object referred by back() can be safely read from |
2674 | | // the write_thread_ without using mutex. Note that calling back() without |
2675 | | // mutex may be unsafe because different implementations of deque::back() may |
2676 | | // access other member variables of deque, causing undefined behaviors. |
2677 | | // Generally, do not access stl containers without proper synchronization. |
2678 | | // - it follows that the items with getting_synced=true can be safely read |
2679 | | // from the same thread that has set getting_synced=true |
2680 | | std::deque<LogWriterNumber> logs_; |
2681 | | |
2682 | | // Signaled when getting_synced becomes false for some of the logs_. |
2683 | | InstrumentedCondVar log_sync_cv_; |
2684 | | // This is the app-level state that is written to the WAL but will be used |
2685 | | // only during recovery. Using this feature enables not writing the state to |
2686 | | // memtable on normal writes and hence improving the throughput. Each new |
2687 | | // write of the state will replace the previous state entirely even if the |
2688 | | // keys in the two consecutive states do not overlap. |
2689 | | // It is protected by log_write_mutex_ when two_write_queues_ is enabled. |
2690 | | // Otherwise only the heaad of write_thread_ can access it. |
2691 | | WriteBatch cached_recoverable_state_; |
2692 | | std::atomic<bool> cached_recoverable_state_empty_ = {true}; |
2693 | | std::atomic<uint64_t> total_log_size_; |
2694 | | |
2695 | | // If this is non-empty, we need to delete these log files in background |
2696 | | // threads. Protected by log_write_mutex_. |
2697 | | autovector<log::Writer*> logs_to_free_; |
2698 | | |
2699 | | bool is_snapshot_supported_; |
2700 | | |
2701 | | std::map<uint64_t, std::map<std::string, uint64_t>> stats_history_; |
2702 | | |
2703 | | std::map<std::string, uint64_t> stats_slice_; |
2704 | | |
2705 | | bool stats_slice_initialized_ = false; |
2706 | | |
2707 | | Directories directories_; |
2708 | | |
2709 | | WriteBufferManager* write_buffer_manager_; |
2710 | | |
2711 | | WriteThread write_thread_; |
2712 | | WriteBatch tmp_batch_; |
2713 | | // The write thread when the writers have no memtable write. This will be used |
2714 | | // in 2PC to batch the prepares separately from the serial commit. |
2715 | | WriteThread nonmem_write_thread_; |
2716 | | |
2717 | | WriteController write_controller_; |
2718 | | |
2719 | | // Size of the last batch group. In slowdown mode, next write needs to |
2720 | | // sleep if it uses up the quota. |
2721 | | // Note: This is to protect memtable and compaction. If the batch only writes |
2722 | | // to the WAL its size need not to be included in this. |
2723 | | uint64_t last_batch_group_size_; |
2724 | | |
2725 | | FlushScheduler flush_scheduler_; |
2726 | | |
2727 | | TrimHistoryScheduler trim_history_scheduler_; |
2728 | | |
2729 | | SnapshotList snapshots_; |
2730 | | |
2731 | | TimestampedSnapshotList timestamped_snapshots_; |
2732 | | |
2733 | | // For each background job, pending_outputs_ keeps the current file number at |
2734 | | // the time that background job started. |
2735 | | // FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has |
2736 | | // number bigger than any of the file number in pending_outputs_. Since file |
2737 | | // numbers grow monotonically, this also means that pending_outputs_ is always |
2738 | | // sorted. After a background job is done executing, its file number is |
2739 | | // deleted from pending_outputs_, which allows PurgeObsoleteFiles() to clean |
2740 | | // it up. |
2741 | | // State is protected with db mutex. |
2742 | | std::list<uint64_t> pending_outputs_; |
2743 | | |
2744 | | // flush_queue_ and compaction_queue_ hold column families that we need to |
2745 | | // flush and compact, respectively. |
2746 | | // A column family is inserted into flush_queue_ when it satisfies condition |
2747 | | // cfd->imm()->IsFlushPending() |
2748 | | // A column family is inserted into compaction_queue_ when it satisfied |
2749 | | // condition cfd->NeedsCompaction() |
2750 | | // Column families in this list are all Ref()-erenced |
2751 | | // TODO(icanadi) Provide some kind of ReferencedColumnFamily class that will |
2752 | | // do RAII on ColumnFamilyData |
2753 | | // Column families are in this queue when they need to be flushed or |
2754 | | // compacted. Consumers of these queues are flush and compaction threads. When |
2755 | | // column family is put on this queue, we increase unscheduled_flushes_ and |
2756 | | // unscheduled_compactions_. When these variables are bigger than zero, that |
2757 | | // means we need to schedule background threads for flush and compaction. |
2758 | | // Once the background threads are scheduled, we decrease unscheduled_flushes_ |
2759 | | // and unscheduled_compactions_. That way we keep track of number of |
2760 | | // compaction and flush threads we need to schedule. This scheduling is done |
2761 | | // in MaybeScheduleFlushOrCompaction() |
2762 | | // invariant(column family present in flush_queue_ <==> |
2763 | | // ColumnFamilyData::pending_flush_ == true) |
2764 | | std::deque<FlushRequest> flush_queue_; |
2765 | | // invariant(column family present in compaction_queue_ <==> |
2766 | | // ColumnFamilyData::pending_compaction_ == true) |
2767 | | std::deque<ColumnFamilyData*> compaction_queue_; |
2768 | | |
2769 | | // A map to store file numbers and filenames of the files to be purged |
2770 | | std::unordered_map<uint64_t, PurgeFileInfo> purge_files_; |
2771 | | |
2772 | | // A vector to store the file numbers that have been assigned to certain |
2773 | | // JobContext. Current implementation tracks table and blob files only. |
2774 | | std::unordered_set<uint64_t> files_grabbed_for_purge_; |
2775 | | |
2776 | | // A queue to store log writers to close. Protected by db mutex_. |
2777 | | std::deque<log::Writer*> logs_to_free_queue_; |
2778 | | |
2779 | | std::deque<SuperVersion*> superversions_to_free_queue_; |
2780 | | |
2781 | | int unscheduled_flushes_; |
2782 | | |
2783 | | int unscheduled_compactions_; |
2784 | | |
2785 | | // count how many background compactions are running or have been scheduled in |
2786 | | // the BOTTOM pool |
2787 | | int bg_bottom_compaction_scheduled_; |
2788 | | |
2789 | | // count how many background compactions are running or have been scheduled |
2790 | | int bg_compaction_scheduled_; |
2791 | | |
2792 | | // stores the number of compactions are currently running |
2793 | | int num_running_compactions_; |
2794 | | |
2795 | | // number of background memtable flush jobs, submitted to the HIGH pool |
2796 | | int bg_flush_scheduled_; |
2797 | | |
2798 | | // stores the number of flushes are currently running |
2799 | | int num_running_flushes_; |
2800 | | |
2801 | | // number of background obsolete file purge jobs, submitted to the HIGH pool |
2802 | | int bg_purge_scheduled_; |
2803 | | |
2804 | | std::deque<ManualCompactionState*> manual_compaction_dequeue_; |
2805 | | |
2806 | | // shall we disable deletion of obsolete files |
2807 | | // if 0 the deletion is enabled. |
2808 | | // if non-zero, files will not be getting deleted |
2809 | | // This enables two different threads to call |
2810 | | // EnableFileDeletions() and DisableFileDeletions() |
2811 | | // without any synchronization |
2812 | | int disable_delete_obsolete_files_; |
2813 | | |
2814 | | // Number of times FindObsoleteFiles has found deletable files and the |
2815 | | // corresponding call to PurgeObsoleteFiles has not yet finished. |
2816 | | int pending_purge_obsolete_files_; |
2817 | | |
2818 | | // last time when DeleteObsoleteFiles with full scan was executed. Originally |
2819 | | // initialized with startup time. |
2820 | | uint64_t delete_obsolete_files_last_run_; |
2821 | | |
2822 | | // The thread that wants to switch memtable, can wait on this cv until the |
2823 | | // pending writes to memtable finishes. |
2824 | | std::condition_variable switch_cv_; |
2825 | | // The mutex used by switch_cv_. mutex_ should be acquired beforehand. |
2826 | | std::mutex switch_mutex_; |
2827 | | // Number of threads intending to write to memtable |
2828 | | std::atomic<size_t> pending_memtable_writes_ = {}; |
2829 | | |
2830 | | // A flag indicating whether the current rocksdb database has any |
2831 | | // data that is not yet persisted into either WAL or SST file. |
2832 | | // Used when disableWAL is true. |
2833 | | std::atomic<bool> has_unpersisted_data_; |
2834 | | |
2835 | | // if an attempt was made to flush all column families that |
2836 | | // the oldest log depends on but uncommitted data in the oldest |
2837 | | // log prevents the log from being released. |
2838 | | // We must attempt to free the dependent memtables again |
2839 | | // at a later time after the transaction in the oldest |
2840 | | // log is fully commited. |
2841 | | bool unable_to_release_oldest_log_; |
2842 | | |
2843 | | // Number of running IngestExternalFile() or CreateColumnFamilyWithImport() |
2844 | | // calls. |
2845 | | // REQUIRES: mutex held |
2846 | | int num_running_ingest_file_; |
2847 | | |
2848 | | WalManager wal_manager_; |
2849 | | |
2850 | | // A value of > 0 temporarily disables scheduling of background work |
2851 | | int bg_work_paused_; |
2852 | | |
2853 | | // A value of > 0 temporarily disables scheduling of background compaction |
2854 | | int bg_compaction_paused_; |
2855 | | |
2856 | | // Guard against multiple concurrent refitting |
2857 | | bool refitting_level_; |
2858 | | |
2859 | | // Indicate DB was opened successfully |
2860 | | bool opened_successfully_; |
2861 | | |
2862 | | // The min threshold to triggere bottommost compaction for removing |
2863 | | // garbages, among all column families. |
2864 | | SequenceNumber bottommost_files_mark_threshold_ = kMaxSequenceNumber; |
2865 | | |
2866 | | LogsWithPrepTracker logs_with_prep_tracker_; |
2867 | | |
2868 | | // Callback for compaction to check if a key is visible to a snapshot. |
2869 | | // REQUIRES: mutex held |
2870 | | std::unique_ptr<SnapshotChecker> snapshot_checker_; |
2871 | | |
2872 | | // Callback for when the cached_recoverable_state_ is written to memtable |
2873 | | // Only to be set during initialization |
2874 | | std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_; |
2875 | | |
2876 | | // Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog(). |
2877 | | // Currently, internally it has a global timer instance for running the tasks. |
2878 | | PeriodicTaskScheduler periodic_task_scheduler_; |
2879 | | |
2880 | | // It contains the implementations for each periodic task. |
2881 | | std::map<PeriodicTaskType, const PeriodicTaskFunc> periodic_task_functions_; |
2882 | | |
2883 | | // When set, we use a separate queue for writes that don't write to memtable. |
2884 | | // In 2PC these are the writes at Prepare phase. |
2885 | | const bool two_write_queues_; |
2886 | | const bool manual_wal_flush_; |
2887 | | |
2888 | | // LastSequence also indicates last published sequence visibile to the |
2889 | | // readers. Otherwise LastPublishedSequence should be used. |
2890 | | const bool last_seq_same_as_publish_seq_; |
2891 | | // It indicates that a customized gc algorithm must be used for |
2892 | | // flush/compaction and if it is not provided vis SnapshotChecker, we should |
2893 | | // disable gc to be safe. |
2894 | | const bool use_custom_gc_; |
2895 | | // Flag to indicate that the DB instance shutdown has been initiated. This |
2896 | | // different from shutting_down_ atomic in that it is set at the beginning |
2897 | | // of shutdown sequence, specifically in order to prevent any background |
2898 | | // error recovery from going on in parallel. The latter, shutting_down_, |
2899 | | // is set a little later during the shutdown after scheduling memtable |
2900 | | // flushes |
2901 | | std::atomic<bool> shutdown_initiated_; |
2902 | | // Flag to indicate whether sst_file_manager object was allocated in |
2903 | | // DB::Open() or passed to us |
2904 | | bool own_sfm_; |
2905 | | |
2906 | | // Flag to check whether Close() has been called on this DB |
2907 | | bool closed_; |
2908 | | // save the closing status, for re-calling the close() |
2909 | | Status closing_status_; |
2910 | | // mutex for DB::Close() |
2911 | | InstrumentedMutex closing_mutex_; |
2912 | | |
2913 | | // Conditional variable to coordinate installation of atomic flush results. |
2914 | | // With atomic flush, each bg thread installs the result of flushing multiple |
2915 | | // column families, and different threads can flush different column |
2916 | | // families. It's difficult to rely on one thread to perform batch |
2917 | | // installation for all threads. This is different from the non-atomic flush |
2918 | | // case. |
2919 | | // atomic_flush_install_cv_ makes sure that threads install atomic flush |
2920 | | // results sequentially. Flush results of memtables with lower IDs get |
2921 | | // installed to MANIFEST first. |
2922 | | InstrumentedCondVar atomic_flush_install_cv_; |
2923 | | |
2924 | | bool wal_in_db_path_; |
2925 | | std::atomic<uint64_t> max_total_wal_size_; |
2926 | | |
2927 | | BlobFileCompletionCallback blob_callback_; |
2928 | | |
2929 | | // Pointer to WriteBufferManager stalling interface. |
2930 | | std::unique_ptr<StallInterface> wbm_stall_; |
2931 | | |
2932 | | // seqno_to_time_mapping_ stores the sequence number to time mapping, it's not |
2933 | | // thread safe, both read and write need db mutex hold. |
2934 | | SeqnoToTimeMapping seqno_to_time_mapping_; |
2935 | | |
2936 | | // Stop write token that is acquired when first LockWAL() is called. |
2937 | | // Destroyed when last UnlockWAL() is called. Controlled by DB mutex. |
2938 | | // See lock_wal_count_ |
2939 | | std::unique_ptr<WriteControllerToken> lock_wal_write_token_; |
2940 | | |
2941 | | // The number of LockWAL called without matching UnlockWAL call. |
2942 | | // See also lock_wal_write_token_ |
2943 | | uint32_t lock_wal_count_; |
2944 | | }; |
2945 | | |
2946 | | class GetWithTimestampReadCallback : public ReadCallback { |
2947 | | public: |
2948 | | explicit GetWithTimestampReadCallback(SequenceNumber seq) |
2949 | 584 | : ReadCallback(seq) {} |
2950 | 0 | bool IsVisibleFullCheck(SequenceNumber seq) override { |
2951 | 0 | return seq <= max_visible_seq_; |
2952 | 0 | } |
2953 | | }; |
2954 | | |
2955 | | Options SanitizeOptions(const std::string& db, const Options& src, |
2956 | | bool read_only = false, |
2957 | | Status* logger_creation_s = nullptr); |
2958 | | |
2959 | | DBOptions SanitizeOptions(const std::string& db, const DBOptions& src, |
2960 | | bool read_only = false, |
2961 | | Status* logger_creation_s = nullptr); |
2962 | | |
2963 | | CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions, |
2964 | | const MutableCFOptions& mutable_cf_options); |
2965 | | |
2966 | | // Return the earliest log file to keep after the memtable flush is |
2967 | | // finalized. |
2968 | | // `cfd_to_flush` is the column family whose memtable (specified in |
2969 | | // `memtables_to_flush`) will be flushed and thus will not depend on any WAL |
2970 | | // file. |
2971 | | // The function is only applicable to 2pc mode. |
2972 | | uint64_t PrecomputeMinLogNumberToKeep2PC( |
2973 | | VersionSet* vset, const ColumnFamilyData& cfd_to_flush, |
2974 | | const autovector<VersionEdit*>& edit_list, |
2975 | | const autovector<MemTable*>& memtables_to_flush, |
2976 | | LogsWithPrepTracker* prep_tracker); |
2977 | | // For atomic flush. |
2978 | | uint64_t PrecomputeMinLogNumberToKeep2PC( |
2979 | | VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, |
2980 | | const autovector<autovector<VersionEdit*>>& edit_lists, |
2981 | | const autovector<const autovector<MemTable*>*>& memtables_to_flush, |
2982 | | LogsWithPrepTracker* prep_tracker); |
2983 | | |
2984 | | // In non-2PC mode, WALs with log number < the returned number can be |
2985 | | // deleted after the cfd_to_flush column family is flushed successfully. |
2986 | | uint64_t PrecomputeMinLogNumberToKeepNon2PC( |
2987 | | VersionSet* vset, const ColumnFamilyData& cfd_to_flush, |
2988 | | const autovector<VersionEdit*>& edit_list); |
2989 | | // For atomic flush. |
2990 | | uint64_t PrecomputeMinLogNumberToKeepNon2PC( |
2991 | | VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, |
2992 | | const autovector<autovector<VersionEdit*>>& edit_lists); |
2993 | | |
2994 | | // `cfd_to_flush` is the column family whose memtable will be flushed and thus |
2995 | | // will not depend on any WAL file. nullptr means no memtable is being flushed. |
2996 | | // The function is only applicable to 2pc mode. |
2997 | | uint64_t FindMinPrepLogReferencedByMemTable( |
2998 | | VersionSet* vset, const autovector<MemTable*>& memtables_to_flush); |
2999 | | // For atomic flush. |
3000 | | uint64_t FindMinPrepLogReferencedByMemTable( |
3001 | | VersionSet* vset, |
3002 | | const autovector<const autovector<MemTable*>*>& memtables_to_flush); |
3003 | | |
3004 | | // Fix user-supplied options to be reasonable |
3005 | | template <class T, class V> |
3006 | 28.3k | static void ClipToRange(T* ptr, V minvalue, V maxvalue) { |
3007 | 28.3k | if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; |
3008 | 28.3k | if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; |
3009 | 28.3k | } Unexecuted instantiation: db_impl_open.cc:void rocksdb::ClipToRange<int, int>(int*, int, int) column_family.cc:void rocksdb::ClipToRange<unsigned long, unsigned long>(unsigned long*, unsigned long, unsigned long) Line | Count | Source | 3006 | 28.3k | static void ClipToRange(T* ptr, V minvalue, V maxvalue) { | 3007 | 28.3k | if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; | 3008 | 28.3k | if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; | 3009 | 28.3k | } |
|
3010 | | |
3011 | | inline Status DBImpl::FailIfCfHasTs( |
3012 | 1.16M | const ColumnFamilyHandle* column_family) const { |
3013 | 1.16M | if (!column_family) { |
3014 | 0 | return Status::InvalidArgument("column family handle cannot be null"); |
3015 | 0 | } |
3016 | 1.16M | assert(column_family); |
3017 | 1.16M | const Comparator* const ucmp = column_family->GetComparator(); |
3018 | 1.16M | assert(ucmp); |
3019 | 1.16M | if (ucmp->timestamp_size() > 0) { |
3020 | 0 | std::ostringstream oss; |
3021 | 0 | oss << "cannot call this method on column family " |
3022 | 0 | << column_family->GetName() << " that enables timestamp"; |
3023 | 0 | return Status::InvalidArgument(oss.str()); |
3024 | 0 | } |
3025 | 1.16M | return Status::OK(); |
3026 | 1.16M | } |
3027 | | |
3028 | | inline Status DBImpl::FailIfTsMismatchCf(ColumnFamilyHandle* column_family, |
3029 | 0 | const Slice& ts) const { |
3030 | 0 | if (!column_family) { |
3031 | 0 | return Status::InvalidArgument("column family handle cannot be null"); |
3032 | 0 | } |
3033 | 0 | assert(column_family); |
3034 | 0 | const Comparator* const ucmp = column_family->GetComparator(); |
3035 | 0 | assert(ucmp); |
3036 | 0 | if (0 == ucmp->timestamp_size()) { |
3037 | 0 | std::stringstream oss; |
3038 | 0 | oss << "cannot call this method on column family " |
3039 | 0 | << column_family->GetName() << " that does not enable timestamp"; |
3040 | 0 | return Status::InvalidArgument(oss.str()); |
3041 | 0 | } |
3042 | 0 | const size_t ts_sz = ts.size(); |
3043 | 0 | if (ts_sz != ucmp->timestamp_size()) { |
3044 | 0 | std::stringstream oss; |
3045 | 0 | oss << "Timestamp sizes mismatch: expect " << ucmp->timestamp_size() << ", " |
3046 | 0 | << ts_sz << " given"; |
3047 | 0 | return Status::InvalidArgument(oss.str()); |
3048 | 0 | } |
3049 | 0 | return Status::OK(); |
3050 | 0 | } |
3051 | | |
3052 | | inline Status DBImpl::FailIfReadCollapsedHistory(const ColumnFamilyData* cfd, |
3053 | | const SuperVersion* sv, |
3054 | 0 | const Slice& ts) const { |
3055 | | // Reaching to this point means the timestamp size matching sanity check in |
3056 | | // `DBImpl::FailIfTsMismatchCf` already passed. So we skip that and assume |
3057 | | // column family has the same user-defined timestamp format as `ts`. |
3058 | 0 | const Comparator* const ucmp = cfd->user_comparator(); |
3059 | 0 | assert(ucmp); |
3060 | 0 | const std::string& full_history_ts_low = sv->full_history_ts_low; |
3061 | 0 | assert(full_history_ts_low.empty() || |
3062 | 0 | full_history_ts_low.size() == ts.size()); |
3063 | 0 | if (!full_history_ts_low.empty() && |
3064 | 0 | ucmp->CompareTimestamp(ts, full_history_ts_low) < 0) { |
3065 | 0 | std::stringstream oss; |
3066 | 0 | oss << "Read timestamp: " << ucmp->TimestampToString(ts) |
3067 | 0 | << " is smaller than full_history_ts_low: " |
3068 | 0 | << ucmp->TimestampToString(full_history_ts_low) << std::endl; |
3069 | 0 | return Status::InvalidArgument(oss.str()); |
3070 | 0 | } |
3071 | 0 | return Status::OK(); |
3072 | 0 | } |
3073 | | } // namespace ROCKSDB_NAMESPACE |