Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/db_impl/db_impl.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#pragma once
10
11
#include <atomic>
12
#include <cstdint>
13
#include <deque>
14
#include <functional>
15
#include <limits>
16
#include <list>
17
#include <map>
18
#include <set>
19
#include <string>
20
#include <unordered_map>
21
#include <utility>
22
#include <vector>
23
24
#include "db/column_family.h"
25
#include "db/compaction/compaction_iterator.h"
26
#include "db/compaction/compaction_job.h"
27
#include "db/error_handler.h"
28
#include "db/event_helpers.h"
29
#include "db/external_sst_file_ingestion_job.h"
30
#include "db/flush_job.h"
31
#include "db/flush_scheduler.h"
32
#include "db/import_column_family_job.h"
33
#include "db/internal_stats.h"
34
#include "db/log_writer.h"
35
#include "db/logs_with_prep_tracker.h"
36
#include "db/memtable_list.h"
37
#include "db/periodic_task_scheduler.h"
38
#include "db/post_memtable_callback.h"
39
#include "db/pre_release_callback.h"
40
#include "db/range_del_aggregator.h"
41
#include "db/read_callback.h"
42
#include "db/seqno_to_time_mapping.h"
43
#include "db/snapshot_checker.h"
44
#include "db/snapshot_impl.h"
45
#include "db/trim_history_scheduler.h"
46
#include "db/version_edit.h"
47
#include "db/wal_manager.h"
48
#include "db/write_controller.h"
49
#include "db/write_thread.h"
50
#include "logging/event_logger.h"
51
#include "memtable/wbwi_memtable.h"
52
#include "monitoring/instrumented_mutex.h"
53
#include "options/db_options.h"
54
#include "port/port.h"
55
#include "rocksdb/attribute_groups.h"
56
#include "rocksdb/db.h"
57
#include "rocksdb/env.h"
58
#include "rocksdb/memtablerep.h"
59
#include "rocksdb/status.h"
60
#include "rocksdb/trace_reader_writer.h"
61
#include "rocksdb/transaction_log.h"
62
#include "rocksdb/user_write_callback.h"
63
#include "rocksdb/utilities/replayer.h"
64
#include "rocksdb/utilities/write_batch_with_index.h"
65
#include "rocksdb/write_buffer_manager.h"
66
#include "table/merging_iterator.h"
67
#include "util/autovector.h"
68
#include "util/hash.h"
69
#include "util/repeatable_thread.h"
70
#include "util/stop_watch.h"
71
#include "util/thread_local.h"
72
73
namespace ROCKSDB_NAMESPACE {
74
75
class Arena;
76
class ArenaWrappedDBIter;
77
class InMemoryStatsHistoryIterator;
78
class MemTable;
79
class PersistentStatsHistoryIterator;
80
class TableCache;
81
class TaskLimiterToken;
82
class Version;
83
class VersionEdit;
84
class VersionSet;
85
class WriteCallback;
86
struct JobContext;
87
struct ExternalSstFileInfo;
88
struct MemTableInfo;
89
90
// Class to maintain directories for all database paths other than main one.
91
class Directories {
92
 public:
93
  IOStatus SetDirectories(FileSystem* fs, const std::string& dbname,
94
                          const std::string& wal_dir,
95
                          const std::vector<DbPath>& data_paths);
96
97
0
  FSDirectory* GetDataDir(size_t path_id) const {
98
0
    assert(path_id < data_dirs_.size());
99
0
    FSDirectory* ret_dir = data_dirs_[path_id].get();
100
0
    if (ret_dir == nullptr) {
101
      // Should use db_dir_
102
0
      return db_dir_.get();
103
0
    }
104
0
    return ret_dir;
105
0
  }
106
107
0
  FSDirectory* GetWalDir() {
108
0
    if (wal_dir_) {
109
0
      return wal_dir_.get();
110
0
    }
111
0
    return db_dir_.get();
112
0
  }
113
114
124k
  FSDirectory* GetDbDir() { return db_dir_.get(); }
115
116
40.2k
  IOStatus Close(const IOOptions& options, IODebugContext* dbg) {
117
    // close all directories for all database paths
118
40.2k
    IOStatus s = IOStatus::OK();
119
120
    // The default implementation for Close() in Directory/FSDirectory class
121
    // "NotSupported" status, the upper level interface should be able to
122
    // handle this error so that Close() does not fail after upgrading when
123
    // run on FileSystems that have not implemented `Directory::Close()` or
124
    // `FSDirectory::Close()` yet
125
126
40.2k
    if (db_dir_) {
127
40.2k
      IOStatus temp_s = db_dir_->Close(options, dbg);
128
40.2k
      if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) {
129
0
        s = std::move(temp_s);
130
0
      }
131
40.2k
    }
132
133
    // Attempt to close everything even if one fails
134
40.2k
    s.PermitUncheckedError();
135
136
40.2k
    if (wal_dir_) {
137
0
      IOStatus temp_s = wal_dir_->Close(options, dbg);
138
0
      if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) {
139
0
        s = std::move(temp_s);
140
0
      }
141
0
    }
142
143
40.2k
    s.PermitUncheckedError();
144
145
40.2k
    for (auto& data_dir_ptr : data_dirs_) {
146
40.2k
      if (data_dir_ptr) {
147
0
        IOStatus temp_s = data_dir_ptr->Close(options, dbg);
148
0
        if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) {
149
0
          s = std::move(temp_s);
150
0
        }
151
0
      }
152
40.2k
    }
153
154
    // Ready for caller
155
40.2k
    s.MustCheck();
156
40.2k
    return s;
157
40.2k
  }
158
159
 private:
160
  std::unique_ptr<FSDirectory> db_dir_;
161
  std::vector<std::unique_ptr<FSDirectory>> data_dirs_;
162
  std::unique_ptr<FSDirectory> wal_dir_;
163
};
164
165
struct DBOpenLogRecordReadReporter : public log::Reader::Reporter {
166
  Env* env;
167
  Logger* info_log;
168
  const char* fname;
169
  Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
170
  bool* old_log_record;
171
  void Corruption(size_t bytes, const Status& s,
172
                  uint64_t log_number = kMaxSequenceNumber) override;
173
174
  void OldLogRecord(size_t bytes) override;
175
176
0
  uint64_t GetCorruptedLogNumber() const { return corrupted_wal_number_; }
177
178
 private:
179
  uint64_t corrupted_wal_number_ = kMaxSequenceNumber;
180
};
181
182
// While DB is the public interface of RocksDB, and DBImpl is the actual
183
// class implementing it. It's the entrance of the core RocksdB engine.
184
// All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a
185
// DBImpl internally.
186
// Other than functions implementing the DB interface, some public
187
// functions are there for other internal components to call. For
188
// example, TransactionDB directly calls DBImpl::WriteImpl() and
189
// BlobDB directly calls DBImpl::GetImpl(). Some other functions
190
// are for sub-components to call. For example, ColumnFamilyHandleImpl
191
// calls DBImpl::FindObsoleteFiles().
192
//
193
// Since it's a very large class, the definition of the functions is
194
// divided in several db_impl_*.cc files, besides db_impl.cc.
195
class DBImpl : public DB {
196
 public:
197
  DBImpl(const DBOptions& options, const std::string& dbname,
198
         const bool seq_per_batch = false, const bool batch_per_txn = true,
199
         bool read_only = false);
200
  // No copying allowed
201
  DBImpl(const DBImpl&) = delete;
202
  void operator=(const DBImpl&) = delete;
203
204
  virtual ~DBImpl();
205
206
  // ---- Implementations of the DB interface ----
207
208
  using DB::Resume;
209
  Status Resume() override;
210
211
  using DB::Put;
212
  Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
213
             const Slice& key, const Slice& value) override;
214
  Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
215
             const Slice& key, const Slice& ts, const Slice& value) override;
216
217
  using DB::PutEntity;
218
  Status PutEntity(const WriteOptions& options,
219
                   ColumnFamilyHandle* column_family, const Slice& key,
220
                   const WideColumns& columns) override;
221
  Status PutEntity(const WriteOptions& options, const Slice& key,
222
                   const AttributeGroups& attribute_groups) override;
223
224
  using DB::Merge;
225
  Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
226
               const Slice& key, const Slice& value) override;
227
  Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
228
               const Slice& key, const Slice& ts, const Slice& value) override;
229
230
  using DB::Delete;
231
  Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
232
                const Slice& key) override;
233
  Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
234
                const Slice& key, const Slice& ts) override;
235
236
  using DB::SingleDelete;
237
  Status SingleDelete(const WriteOptions& options,
238
                      ColumnFamilyHandle* column_family,
239
                      const Slice& key) override;
240
  Status SingleDelete(const WriteOptions& options,
241
                      ColumnFamilyHandle* column_family, const Slice& key,
242
                      const Slice& ts) override;
243
244
  using DB::DeleteRange;
245
  Status DeleteRange(const WriteOptions& options,
246
                     ColumnFamilyHandle* column_family, const Slice& begin_key,
247
                     const Slice& end_key) override;
248
  Status DeleteRange(const WriteOptions& options,
249
                     ColumnFamilyHandle* column_family, const Slice& begin_key,
250
                     const Slice& end_key, const Slice& ts) override;
251
252
  using DB::Write;
253
  Status Write(const WriteOptions& options, WriteBatch* updates) override;
254
255
  using DB::WriteWithCallback;
256
  Status WriteWithCallback(const WriteOptions& options, WriteBatch* updates,
257
                           UserWriteCallback* user_write_cb) override;
258
259
  Status IngestWriteBatchWithIndex(
260
      const WriteOptions& options,
261
      std::shared_ptr<WriteBatchWithIndex> wbwi) override;
262
263
  using DB::Get;
264
  Status Get(const ReadOptions& _read_options,
265
             ColumnFamilyHandle* column_family, const Slice& key,
266
             PinnableSlice* value, std::string* timestamp) override;
267
268
  using DB::GetEntity;
269
  Status GetEntity(const ReadOptions& options,
270
                   ColumnFamilyHandle* column_family, const Slice& key,
271
                   PinnableWideColumns* columns) override;
272
  Status GetEntity(const ReadOptions& options, const Slice& key,
273
                   PinnableAttributeGroups* result) override;
274
275
  using DB::GetMergeOperands;
276
  Status GetMergeOperands(const ReadOptions& options,
277
                          ColumnFamilyHandle* column_family, const Slice& key,
278
                          PinnableSlice* merge_operands,
279
                          GetMergeOperandsOptions* get_merge_operands_options,
280
0
                          int* number_of_operands) override {
281
0
    GetImplOptions get_impl_options;
282
0
    get_impl_options.column_family = column_family;
283
0
    get_impl_options.merge_operands = merge_operands;
284
0
    get_impl_options.get_merge_operands_options = get_merge_operands_options;
285
0
    get_impl_options.number_of_operands = number_of_operands;
286
0
    get_impl_options.get_value = false;
287
0
    return GetImpl(options, key, get_impl_options);
288
0
  }
289
290
  using DB::MultiGet;
291
  // This MultiGet is a batched version, which may be faster than calling Get
292
  // multiple times, especially if the keys have some spatial locality that
293
  // enables them to be queried in the same SST files/set of files. The larger
294
  // the batch size, the more scope for batching and performance improvement
295
  // The values and statuses parameters are arrays with number of elements
296
  // equal to keys.size(). This allows the storage for those to be alloacted
297
  // by the caller on the stack for small batches
298
  void MultiGet(const ReadOptions& _read_options, const size_t num_keys,
299
                ColumnFamilyHandle** column_families, const Slice* keys,
300
                PinnableSlice* values, std::string* timestamps,
301
                Status* statuses, const bool sorted_input = false) override;
302
303
  void MultiGetWithCallback(
304
      const ReadOptions& _read_options, ColumnFamilyHandle* column_family,
305
      ReadCallback* callback,
306
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
307
308
  using DB::MultiGetEntity;
309
310
  void MultiGetEntity(const ReadOptions& options,
311
                      ColumnFamilyHandle* column_family, size_t num_keys,
312
                      const Slice* keys, PinnableWideColumns* results,
313
                      Status* statuses, bool sorted_input) override;
314
315
  void MultiGetEntity(const ReadOptions& options, size_t num_keys,
316
                      ColumnFamilyHandle** column_families, const Slice* keys,
317
                      PinnableWideColumns* results, Status* statuses,
318
                      bool sorted_input) override;
319
  void MultiGetEntity(const ReadOptions& options, size_t num_keys,
320
                      const Slice* keys,
321
                      PinnableAttributeGroups* results) override;
322
323
  void MultiGetEntityWithCallback(
324
      const ReadOptions& read_options, ColumnFamilyHandle* column_family,
325
      ReadCallback* callback,
326
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
327
328
  Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
329
                            const std::string& column_family,
330
14.6k
                            ColumnFamilyHandle** handle) override {
331
    // TODO: plumb Env::IOActivity, Env::IOPriority
332
14.6k
    return CreateColumnFamily(ReadOptions(), WriteOptions(), cf_options,
333
14.6k
                              column_family, handle);
334
14.6k
  }
335
  virtual Status CreateColumnFamily(const ReadOptions& read_options,
336
                                    const WriteOptions& write_options,
337
                                    const ColumnFamilyOptions& cf_options,
338
                                    const std::string& column_family,
339
                                    ColumnFamilyHandle** handle);
340
  Status CreateColumnFamilies(
341
      const ColumnFamilyOptions& cf_options,
342
      const std::vector<std::string>& column_family_names,
343
0
      std::vector<ColumnFamilyHandle*>* handles) override {
344
    // TODO: plumb Env::IOActivity, Env::IOPriority
345
0
    return CreateColumnFamilies(ReadOptions(), WriteOptions(), cf_options,
346
0
                                column_family_names, handles);
347
0
  }
348
  virtual Status CreateColumnFamilies(
349
      const ReadOptions& read_options, const WriteOptions& write_options,
350
      const ColumnFamilyOptions& cf_options,
351
      const std::vector<std::string>& column_family_names,
352
      std::vector<ColumnFamilyHandle*>* handles);
353
354
  Status CreateColumnFamilies(
355
      const std::vector<ColumnFamilyDescriptor>& column_families,
356
0
      std::vector<ColumnFamilyHandle*>* handles) override {
357
    // TODO: plumb Env::IOActivity, Env::IOPriority
358
0
    return CreateColumnFamilies(ReadOptions(), WriteOptions(), column_families,
359
0
                                handles);
360
0
  }
361
  virtual Status CreateColumnFamilies(
362
      const ReadOptions& read_options, const WriteOptions& write_options,
363
      const std::vector<ColumnFamilyDescriptor>& column_families,
364
      std::vector<ColumnFamilyHandle*>* handles);
365
  Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
366
  Status DropColumnFamilies(
367
      const std::vector<ColumnFamilyHandle*>& column_families) override;
368
369
  // Returns false if key doesn't exist in the database and true if it may.
370
  // If value_found is not passed in as null, then return the value if found in
371
  // memory. On return, if value was found, then value_found will be set to true
372
  // , otherwise false.
373
  using DB::KeyMayExist;
374
  bool KeyMayExist(const ReadOptions& options,
375
                   ColumnFamilyHandle* column_family, const Slice& key,
376
                   std::string* value, std::string* timestamp,
377
                   bool* value_found = nullptr) override;
378
379
  using DB::NewIterator;
380
  Iterator* NewIterator(const ReadOptions& _read_options,
381
                        ColumnFamilyHandle* column_family) override;
382
  Status NewIterators(const ReadOptions& _read_options,
383
                      const std::vector<ColumnFamilyHandle*>& column_families,
384
                      std::vector<Iterator*>* iterators) override;
385
386
  using DB::NewMultiScan;
387
  std::unique_ptr<MultiScan> NewMultiScan(
388
      const ReadOptions& _read_options, ColumnFamilyHandle* column_family,
389
      const MultiScanArgs& scan_opts) override;
390
391
  const Snapshot* GetSnapshot() override;
392
  void ReleaseSnapshot(const Snapshot* snapshot) override;
393
394
  std::unique_ptr<Iterator> NewCoalescingIterator(
395
      const ReadOptions& options,
396
      const std::vector<ColumnFamilyHandle*>& column_families) override;
397
398
  std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
399
      const ReadOptions& options,
400
      const std::vector<ColumnFamilyHandle*>& column_families) override;
401
402
  // Create a timestamped snapshot. This snapshot can be shared by multiple
403
  // readers. If any of them uses it for write conflict checking, then
404
  // is_write_conflict_boundary is true. For simplicity, set it to true by
405
  // default.
406
  std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
407
      SequenceNumber snapshot_seq, uint64_t ts);
408
  std::shared_ptr<const SnapshotImpl> GetTimestampedSnapshot(uint64_t ts) const;
409
  void ReleaseTimestampedSnapshotsOlderThan(
410
      uint64_t ts, size_t* remaining_total_ss = nullptr);
411
  Status GetTimestampedSnapshots(uint64_t ts_lb, uint64_t ts_ub,
412
                                 std::vector<std::shared_ptr<const Snapshot>>&
413
                                     timestamped_snapshots) const;
414
415
  using DB::GetProperty;
416
  bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property,
417
                   std::string* value) override;
418
  using DB::GetMapProperty;
419
  bool GetMapProperty(ColumnFamilyHandle* column_family, const Slice& property,
420
                      std::map<std::string, std::string>* value) override;
421
  using DB::GetIntProperty;
422
  bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property,
423
                      uint64_t* value) override;
424
  using DB::GetAggregatedIntProperty;
425
  bool GetAggregatedIntProperty(const Slice& property,
426
                                uint64_t* aggregated_value) override;
427
  using DB::GetApproximateSizes;
428
  Status GetApproximateSizes(const SizeApproximationOptions& options,
429
                             ColumnFamilyHandle* column_family,
430
                             const Range* range, int n,
431
                             uint64_t* sizes) override;
432
  using DB::GetApproximateMemTableStats;
433
  void GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
434
                                   const Range& range, uint64_t* const count,
435
                                   uint64_t* const size) override;
436
  using DB::CompactRange;
437
  Status CompactRange(const CompactRangeOptions& options,
438
                      ColumnFamilyHandle* column_family, const Slice* begin,
439
                      const Slice* end) override;
440
441
  using DB::CompactFiles;
442
  Status CompactFiles(
443
      const CompactionOptions& compact_options,
444
      ColumnFamilyHandle* column_family,
445
      const std::vector<std::string>& input_file_names, const int output_level,
446
      const int output_path_id = -1,
447
      std::vector<std::string>* const output_file_names = nullptr,
448
      CompactionJobInfo* compaction_job_info = nullptr) override;
449
450
  Status PauseBackgroundWork() override;
451
  Status ContinueBackgroundWork() override;
452
453
  Status EnableAutoCompaction(
454
      const std::vector<ColumnFamilyHandle*>& column_family_handles) override;
455
456
  void EnableManualCompaction() override;
457
  void DisableManualCompaction() override;
458
  void AbortAllCompactions() override;
459
  void ResumeAllCompactions() override;
460
461
  using DB::SetOptions;
462
  Status SetOptions(
463
      const std::unordered_map<ColumnFamilyHandle*,
464
                               std::unordered_map<std::string, std::string>>&
465
          column_families_opts_map) override;
466
467
  Status SetDBOptions(
468
      const std::unordered_map<std::string, std::string>& options_map) override;
469
470
  using DB::NumberLevels;
471
  int NumberLevels(ColumnFamilyHandle* column_family) override;
472
  using DB::Level0StopWriteTrigger;
473
  int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) override;
474
  const std::string& GetName() const override;
475
  Env* GetEnv() const override;
476
  FileSystem* GetFileSystem() const override;
477
  using DB::GetOptions;
478
  Options GetOptions(ColumnFamilyHandle* column_family) const override;
479
  using DB::GetDBOptions;
480
  DBOptions GetDBOptions() const override;
481
  using DB::Flush;
482
  Status Flush(const FlushOptions& options,
483
               ColumnFamilyHandle* column_family) override;
484
  Status Flush(
485
      const FlushOptions& options,
486
      const std::vector<ColumnFamilyHandle*>& column_families) override;
487
0
  Status FlushWAL(bool sync) override {
488
0
    FlushWALOptions options;
489
0
    options.sync = sync;
490
0
    return FlushWAL(options);
491
0
  }
492
493
  Status FlushWAL(const FlushWALOptions& options) override;
494
495
  virtual Status FlushWAL(const WriteOptions& write_options, bool sync);
496
  bool WALBufferIsEmpty();
497
  Status SyncWAL() override;
498
  Status LockWAL() override;
499
  Status UnlockWAL() override;
500
501
  SequenceNumber GetLatestSequenceNumber() const override;
502
503
  // IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire
504
  // and release db_mutex
505
  Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family,
506
                                  std::string ts_low) override;
507
508
  // GetFullHistoryTsLow(ColumnFamilyHandle*, std::string*) will acquire and
509
  // release db_mutex
510
  Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
511
                             std::string* ts_low) override;
512
513
  Status GetNewestUserDefinedTimestamp(ColumnFamilyHandle* column_family,
514
                                       std::string* newest_timestamp) override;
515
516
  Status GetDbIdentity(std::string& identity) const override;
517
518
  virtual Status GetDbIdentityFromIdentityFile(const IOOptions& opts,
519
                                               std::string* identity) const;
520
521
  Status GetDbSessionId(std::string& session_id) const override;
522
523
  ColumnFamilyHandle* DefaultColumnFamily() const override;
524
525
  ColumnFamilyHandle* PersistentStatsColumnFamily() const;
526
527
  Status Close() override;
528
529
  Status DisableFileDeletions() override;
530
531
  Status EnableFileDeletions() override;
532
533
  virtual bool IsFileDeletionsEnabled() const;
534
535
  Status GetStatsHistory(
536
      uint64_t start_time, uint64_t end_time,
537
      std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
538
539
  using DB::ResetStats;
540
  Status ResetStats() override;
541
  // All the returned filenames start with "/"
542
  Status GetLiveFiles(std::vector<std::string>&, uint64_t* manifest_file_size,
543
                      bool flush_memtable = true) override;
544
  Status GetSortedWalFiles(VectorWalPtr& files) override;
545
  Status GetSortedWalFilesImpl(VectorWalPtr& files, bool need_seqnos);
546
547
  // Get the known flushed sizes of WALs that might still be written to
548
  // or have pending sync.
549
  // NOTE: unlike alive_wal_files_, this function includes WALs that might
550
  // be obsolete (but not obsolete to a pending Checkpoint) and not yet fully
551
  // synced.
552
  Status GetOpenWalSizes(std::map<uint64_t, uint64_t>& number_to_size);
553
  Status GetCurrentWalFile(std::unique_ptr<WalFile>* current_wal_file) override;
554
  Status GetCreationTimeOfOldestFile(uint64_t* creation_time) override;
555
556
  Status GetUpdatesSince(
557
      SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter,
558
      const TransactionLogIterator::ReadOptions& read_options =
559
          TransactionLogIterator::ReadOptions()) override;
560
  Status DeleteFilesInRanges(ColumnFamilyHandle* column_family,
561
                             const RangeOpt* ranges, size_t n,
562
                             bool include_end = true);
563
564
  void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) override;
565
566
  Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) override;
567
568
  Status GetLiveFilesStorageInfo(
569
      const LiveFilesStorageInfoOptions& opts,
570
      std::vector<LiveFileStorageInfo>* files) override;
571
572
  // Obtains the meta data of the specified column family of the DB.
573
  // TODO(yhchiang): output parameter is placed in the end in this codebase.
574
  void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
575
                               ColumnFamilyMetaData* metadata) override;
576
577
  // Get column family metadata with filtering based on key range and level
578
  void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
579
                               const GetColumnFamilyMetaDataOptions& options,
580
                               ColumnFamilyMetaData* metadata) override;
581
582
  void GetAllColumnFamilyMetaData(
583
      std::vector<ColumnFamilyMetaData>* metadata) override;
584
585
  Status SuggestCompactRange(ColumnFamilyHandle* column_family,
586
                             const Slice* begin, const Slice* end) override;
587
588
  Status PromoteL0(ColumnFamilyHandle* column_family,
589
                   int target_level) override;
590
591
  using DB::IngestExternalFile;
592
  Status IngestExternalFile(
593
      ColumnFamilyHandle* column_family,
594
      const std::vector<std::string>& external_files,
595
      const IngestExternalFileOptions& ingestion_options) override;
596
597
  using DB::IngestExternalFiles;
598
  Status IngestExternalFiles(
599
      const std::vector<IngestExternalFileArg>& args) override;
600
601
  using DB::CreateColumnFamilyWithImport;
602
  Status CreateColumnFamilyWithImport(
603
      const ColumnFamilyOptions& options, const std::string& column_family_name,
604
      const ImportColumnFamilyOptions& import_options,
605
      const std::vector<const ExportImportFilesMetaData*>& metadatas,
606
      ColumnFamilyHandle** handle) override;
607
608
  using DB::ClipColumnFamily;
609
  Status ClipColumnFamily(ColumnFamilyHandle* column_family,
610
                          const Slice& begin_key,
611
                          const Slice& end_key) override;
612
613
  using DB::VerifyFileChecksums;
614
  Status VerifyFileChecksums(const ReadOptions& read_options) override;
615
616
  using DB::VerifyChecksum;
617
  Status VerifyChecksum(const ReadOptions& /*read_options*/) override;
618
  // Verify the checksums of files in db. Currently only tables are checked.
619
  //
620
  // read_options: controls file I/O behavior, e.g. read ahead size while
621
  //               reading all the live table files.
622
  //
623
  // use_file_checksum: if false, verify the block checksums of all live table
624
  //                    in db. Otherwise, obtain the file checksums and compare
625
  //                    with the MANIFEST. Currently, file checksums are
626
  //                    recomputed by reading all table files.
627
  //
628
  // Returns: OK if there is no file whose file or block checksum mismatches.
629
  Status VerifyChecksumInternal(const ReadOptions& read_options,
630
                                bool use_file_checksum);
631
632
  Status VerifyFullFileChecksum(const std::string& file_checksum_expected,
633
                                const std::string& func_name_expected,
634
                                const std::string& fpath,
635
                                const ReadOptions& read_options);
636
637
  using DB::StartTrace;
638
  Status StartTrace(const TraceOptions& options,
639
                    std::unique_ptr<TraceWriter>&& trace_writer) override;
640
641
  using DB::EndTrace;
642
  Status EndTrace() override;
643
644
  using DB::NewDefaultReplayer;
645
  Status NewDefaultReplayer(const std::vector<ColumnFamilyHandle*>& handles,
646
                            std::unique_ptr<TraceReader>&& reader,
647
                            std::unique_ptr<Replayer>* replayer) override;
648
649
  using DB::StartBlockCacheTrace;
650
  Status StartBlockCacheTrace(
651
      const TraceOptions& trace_options,
652
      std::unique_ptr<TraceWriter>&& trace_writer) override;
653
654
  Status StartBlockCacheTrace(
655
      const BlockCacheTraceOptions& options,
656
      std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) override;
657
658
  using DB::EndBlockCacheTrace;
659
  Status EndBlockCacheTrace() override;
660
661
  using DB::StartIOTrace;
662
  Status StartIOTrace(const TraceOptions& options,
663
                      std::unique_ptr<TraceWriter>&& trace_writer) override;
664
665
  using DB::EndIOTrace;
666
  Status EndIOTrace() override;
667
668
  using DB::GetPropertiesOfAllTables;
669
  Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
670
                                  TablePropertiesCollection* props) override;
671
  Status GetPropertiesOfTablesInRange(
672
      ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
673
      TablePropertiesCollection* props) override;
674
675
  Status GetPropertiesOfTablesByLevel(
676
      ColumnFamilyHandle* column_family,
677
      std::vector<std::unique_ptr<TablePropertiesCollection>>* props_by_level)
678
      override;
679
680
  // ---- End of implementations of the DB interface ----
681
  SystemClock* GetSystemClock() const;
682
683
  struct GetImplOptions {
684
    ColumnFamilyHandle* column_family = nullptr;
685
    PinnableSlice* value = nullptr;
686
    PinnableWideColumns* columns = nullptr;
687
    std::string* timestamp = nullptr;
688
    bool* value_found = nullptr;
689
    ReadCallback* callback = nullptr;
690
    bool* is_blob_index = nullptr;
691
    // If true return value associated with key via value pointer else return
692
    // all merge operands for key via merge_operands pointer
693
    bool get_value = true;
694
    // Pointer to an array of size
695
    // get_merge_operands_options.expected_max_number_of_operands allocated by
696
    // user
697
    PinnableSlice* merge_operands = nullptr;
698
    GetMergeOperandsOptions* get_merge_operands_options = nullptr;
699
    int* number_of_operands = nullptr;
700
  };
701
702
  Status GetImpl(const ReadOptions& read_options,
703
                 ColumnFamilyHandle* column_family, const Slice& key,
704
                 PinnableSlice* value);
705
706
  Status GetImpl(const ReadOptions& read_options,
707
                 ColumnFamilyHandle* column_family, const Slice& key,
708
                 PinnableSlice* value, std::string* timestamp);
709
710
  // Function that Get and KeyMayExist call with no_io true or false
711
  // Note: 'value_found' from KeyMayExist propagates here
712
  // This function is also called by GetMergeOperands
713
  // If get_impl_options.get_value = true get value associated with
714
  // get_impl_options.key via get_impl_options.value
715
  // If get_impl_options.get_value = false get merge operands associated with
716
  // get_impl_options.key via get_impl_options.merge_operands
717
  virtual Status GetImpl(const ReadOptions& options, const Slice& key,
718
                         GetImplOptions& get_impl_options);
719
720
  // If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
721
  ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
722
                                      ColumnFamilyHandleImpl* cfh,
723
                                      SuperVersion* sv, SequenceNumber snapshot,
724
                                      ReadCallback* read_callback,
725
                                      bool expose_blob_index = false,
726
                                      bool allow_refresh = true);
727
728
22.1k
  virtual SequenceNumber GetLastPublishedSequence() const {
729
22.1k
    if (last_seq_same_as_publish_seq_) {
730
22.1k
      return versions_->LastSequence();
731
22.1k
    } else {
732
0
      return versions_->LastPublishedSequence();
733
0
    }
734
22.1k
  }
735
736
  // REQUIRES: joined the main write queue if two_write_queues is disabled, and
737
  // the second write queue otherwise.
738
  virtual void SetLastPublishedSequence(SequenceNumber seq);
739
  // Returns LastSequence in last_seq_same_as_publish_seq_
740
  // mode and LastAllocatedSequence otherwise. This is useful when visiblility
741
  // depends also on data written to the WAL but not to the memtable.
742
  SequenceNumber TEST_GetLastVisibleSequence() const;
743
744
  // Similar to Write() but will call the callback once on the single write
745
  // thread to determine whether it is safe to perform the write.
746
  virtual Status WriteWithCallback(const WriteOptions& write_options,
747
                                   WriteBatch* my_batch,
748
                                   WriteCallback* callback,
749
                                   UserWriteCallback* user_write_cb = nullptr);
750
751
  // Returns the sequence number that is guaranteed to be smaller than or equal
752
  // to the sequence number of any key that could be inserted into the current
753
  // memtables. It can then be assumed that any write with a larger(or equal)
754
  // sequence number will be present in this memtable or a later memtable.
755
  //
756
  // If the earliest sequence number could not be determined,
757
  // kMaxSequenceNumber will be returned.
758
  //
759
  // If include_history=true, will also search Memtables in MemTableList
760
  // History.
761
  SequenceNumber GetEarliestMemTableSequenceNumber(SuperVersion* sv,
762
                                                   bool include_history);
763
764
  // For a given key, check to see if there are any records for this key
765
  // in the memtables, including memtable history.  If cache_only is false,
766
  // SST files will also be checked.
767
  //
768
  // `key` should NOT have user-defined timestamp appended to user key even if
769
  // timestamp is enabled.
770
  //
771
  // If a key is found, *found_record_for_key will be set to true and
772
  // *seq will be set to the stored sequence number for the latest
773
  // operation on this key or kMaxSequenceNumber if unknown. If user-defined
774
  // timestamp is enabled for this column family and timestamp is not nullptr,
775
  // then *timestamp will be set to the stored timestamp for the latest
776
  // operation on this key.
777
  // If no key is found, *found_record_for_key will be set to false.
778
  //
779
  // Note: If cache_only=false, it is possible for *seq to be set to 0 if
780
  // the sequence number has been cleared from the record.  If the caller is
781
  // holding an active db snapshot, we know the missing sequence must be less
782
  // than the snapshot's sequence number (sequence numbers are only cleared
783
  // when there are no earlier active snapshots).
784
  //
785
  // If NotFound is returned and found_record_for_key is set to false, then no
786
  // record for this key was found.  If the caller is holding an active db
787
  // snapshot, we know that no key could have existing after this snapshot
788
  // (since we do not compact keys that have an earlier snapshot).
789
  //
790
  // Only records newer than or at `lower_bound_seq` are guaranteed to be
791
  // returned. Memtables and files may not be checked if it only contains data
792
  // older than `lower_bound_seq`.
793
  //
794
  // Returns OK or NotFound on success,
795
  // other status on unexpected error.
796
  // TODO(andrewkr): this API need to be aware of range deletion operations
797
  Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
798
                                 bool cache_only,
799
                                 SequenceNumber lower_bound_seq,
800
                                 SequenceNumber* seq, std::string* timestamp,
801
                                 bool* found_record_for_key,
802
                                 bool* is_blob_index);
803
804
  Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
805
                           const Slice& lower_bound, const Slice upper_bound);
806
  Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
807
                                  const Slice& lower_bound,
808
                                  const Slice upper_bound);
809
810
  // Similar to GetSnapshot(), but also lets the db know that this snapshot
811
  // will be used for transaction write-conflict checking.  The DB can then
812
  // make sure not to compact any keys that would prevent a write-conflict from
813
  // being detected.
814
  const Snapshot* GetSnapshotForWriteConflictBoundary();
815
816
  // max_file_num_to_ignore allows bottom level compaction to filter out newly
817
  // compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will
818
  // disable the filtering
819
  // If `final_output_level` is not nullptr, it is set to manual compaction's
820
  // output level if returned status is OK, and it may or may not be set to
821
  // manual compaction's output level if returned status is not OK.
822
  Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
823
                             int output_level,
824
                             const CompactRangeOptions& compact_range_options,
825
                             const Slice* begin, const Slice* end,
826
                             bool exclusive, bool disallow_trivial_move,
827
                             uint64_t max_file_num_to_ignore,
828
                             const std::string& trim_ts,
829
                             int* final_output_level = nullptr);
830
831
  // Return an internal iterator over the current state of the database.
832
  // The keys of this iterator are internal keys (see format.h).
833
  // The returned iterator should be deleted when no longer needed.
834
  // If allow_unprepared_value is true, the returned iterator may defer reading
835
  // the value and so will require PrepareValue() to be called before value();
836
  // allow_unprepared_value = false is convenient when this optimization is not
837
  // useful, e.g. when reading the whole column family.
838
  //
839
  // read_options.ignore_range_deletions determines whether range tombstones are
840
  // processed in the returned interator internally, i.e., whether range
841
  // tombstone covered keys are in this iterator's output.
842
  // @param read_options Must outlive the returned iterator.
843
  InternalIterator* NewInternalIterator(
844
      const ReadOptions& read_options, Arena* arena, SequenceNumber sequence,
845
      ColumnFamilyHandle* column_family = nullptr,
846
      bool allow_unprepared_value = false);
847
848
  // Note: to support DB iterator refresh, memtable range tombstones in the
849
  // underlying merging iterator needs to be refreshed. If db_iter is not
850
  // nullptr, db_iter->SetMemtableRangetombstoneIter() is called with the
851
  // memtable range tombstone iterator used by the underlying merging iterator.
852
  // This range tombstone iterator can be refreshed later by db_iter.
853
  // @param read_options Must outlive the returned iterator.
854
  InternalIterator* NewInternalIterator(const ReadOptions& read_options,
855
                                        ColumnFamilyData* cfd,
856
                                        SuperVersion* super_version,
857
                                        Arena* arena, SequenceNumber sequence,
858
                                        bool allow_unprepared_value,
859
                                        ArenaWrappedDBIter* db_iter = nullptr);
860
861
0
  LogsWithPrepTracker* logs_with_prep_tracker() {
862
0
    return &logs_with_prep_tracker_;
863
0
  }
864
865
  struct BGJobLimits {
866
    int max_flushes;
867
    int max_compactions;
868
  };
869
  // Returns maximum background flushes and compactions allowed to be scheduled
870
  BGJobLimits GetBGJobLimits() const;
871
  // Need a static version that can be called during SanitizeOptions().
872
  static BGJobLimits GetBGJobLimits(int max_background_flushes,
873
                                    int max_background_compactions,
874
                                    int max_background_jobs,
875
                                    bool parallelize_compactions);
876
877
  // move logs pending closing from job_context to the DB queue and
878
  // schedule a purge
879
  void ScheduleBgLogWriterClose(JobContext* job_context);
880
881
  uint64_t MinLogNumberToKeep();
882
883
  uint64_t MinLogNumberToRecycle();
884
885
  // Returns the lower bound file number for SSTs that won't be deleted, even if
886
  // they're obsolete. This lower bound is used internally to prevent newly
887
  // created flush/compaction output files from being deleted before they're
888
  // installed. This technique avoids the need for tracking the exact numbers of
889
  // files pending creation, although it prevents more files than necessary from
890
  // being deleted.
891
  uint64_t MinObsoleteSstNumberToKeep();
892
893
  uint64_t GetObsoleteSstFilesSize();
894
895
  uint64_t MinOptionsFileNumberToKeep();
896
897
  // Returns the list of live files in 'live' and the list
898
  // of all files in the filesystem in 'candidate_files'.
899
  // If force == false and the last call was less than
900
  // db_options_.delete_obsolete_files_period_micros microseconds ago,
901
  // it will not fill up the job_context
902
  void FindObsoleteFiles(JobContext* job_context, bool force,
903
                         bool no_full_scan = false);
904
905
  // Diffs the files listed in filenames and those that do not
906
  // belong to live files are possibly removed. Also, removes all the
907
  // files in sst_delete_files and log_delete_files.
908
  // It is not necessary to hold the mutex when invoking this method.
909
  // If FindObsoleteFiles() was run, we need to also run
910
  // PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
911
  void PurgeObsoleteFiles(JobContext& background_contet,
912
                          bool schedule_only = false);
913
914
  // Schedule a background job to actually delete obsolete files.
915
  void SchedulePurge();
916
917
0
  const SnapshotList& snapshots() const { return snapshots_; }
918
919
  // load list of snapshots to `snap_vector` that is no newer than `max_seq`
920
  // in ascending order.
921
  // `oldest_write_conflict_snapshot` is filled with the oldest snapshot
922
  // which satisfies SnapshotImpl.is_write_conflict_boundary_ = true.
923
  void LoadSnapshots(std::vector<SequenceNumber>* snap_vector,
924
                     SequenceNumber* oldest_write_conflict_snapshot,
925
0
                     const SequenceNumber& max_seq) const {
926
0
    InstrumentedMutexLock l(mutex());
927
0
    snapshots().GetAll(snap_vector, oldest_write_conflict_snapshot, max_seq);
928
0
  }
929
930
12.0k
  const ImmutableDBOptions& immutable_db_options() const {
931
12.0k
    return immutable_db_options_;
932
12.0k
  }
933
934
  // Cancel all background jobs, including flush, compaction, background
935
  // purging, stats dumping threads, etc. If `wait` = true, wait for the
936
  // running jobs to abort or finish before returning. Otherwise, only
937
  // sends the signals.
938
  void CancelAllBackgroundWork(bool wait);
939
940
  // Find Super version and reference it. Based on options, it might return
941
  // the thread local cached one.
942
  // Call ReturnAndCleanupSuperVersion() when it is no longer needed.
943
  SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);
944
945
  // Similar to the previous function but looks up based on a column family id.
946
  // nullptr will be returned if this column family no longer exists.
947
  // REQUIRED: this function should only be called on the write thread or if the
948
  // mutex is held.
949
  SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id);
950
951
  // Un-reference the super version and clean it up if it is the last reference.
952
  void CleanupSuperVersion(SuperVersion* sv);
953
954
  // Un-reference the super version and return it to thread local cache if
955
  // needed. If it is the last reference of the super version. Clean it up
956
  // after un-referencing it.
957
  void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, SuperVersion* sv);
958
959
  // Similar to the previous function but looks up based on a column family id.
960
  // nullptr will be returned if this column family no longer exists.
961
  // REQUIRED: this function should only be called on the write thread.
962
  void ReturnAndCleanupSuperVersion(uint32_t colun_family_id, SuperVersion* sv);
963
964
  // REQUIRED: this function should only be called on the write thread or if the
965
  // mutex is held.  Return value only valid until next call to this function or
966
  // mutex is released.
967
  ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);
968
969
  // Same as above, should called without mutex held and not on write thread.
970
  std::unique_ptr<ColumnFamilyHandle> GetColumnFamilyHandleUnlocked(
971
      uint32_t column_family_id);
972
973
  // Returns the number of currently running flushes.
974
  // REQUIREMENT: mutex_ must be held when calling this function.
975
0
  int num_running_flushes() {
976
0
    mutex_.AssertHeld();
977
0
    return num_running_flushes_;
978
0
  }
979
980
0
  const WriteController& write_controller() { return write_controller_; }
981
982
  // hollow transactions shell used for recovery.
983
  // these will then be passed to TransactionDB so that
984
  // locks can be reacquired before writing can resume.
985
  struct RecoveredTransaction {
986
    std::string name_;
987
    bool unprepared_;
988
989
    struct BatchInfo {
990
      uint64_t log_number_;
991
      // TODO(lth): For unprepared, the memory usage here can be big for
992
      // unprepared transactions. This is only useful for rollbacks, and we
993
      // can in theory just keep keyset for that.
994
      WriteBatch* batch_;
995
      // Number of sub-batches. A new sub-batch is created if txn attempts to
996
      // insert a duplicate key,seq to memtable. This is currently used in
997
      // WritePreparedTxn/WriteUnpreparedTxn.
998
      size_t batch_cnt_;
999
    };
1000
1001
    // This maps the seq of the first key in the batch to BatchInfo, which
1002
    // contains WriteBatch and other information relevant to the batch.
1003
    //
1004
    // For WriteUnprepared, batches_ can have size greater than 1, but for
1005
    // other write policies, it must be of size 1.
1006
    std::map<SequenceNumber, BatchInfo> batches_;
1007
1008
    explicit RecoveredTransaction(const uint64_t log, const std::string& name,
1009
                                  WriteBatch* batch, SequenceNumber seq,
1010
                                  size_t batch_cnt, bool unprepared)
1011
0
        : name_(name), unprepared_(unprepared) {
1012
0
      batches_[seq] = {log, batch, batch_cnt};
1013
0
    }
1014
1015
0
    ~RecoveredTransaction() {
1016
0
      for (auto& it : batches_) {
1017
0
        delete it.second.batch_;
1018
0
      }
1019
0
    }
1020
1021
    void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch,
1022
0
                  size_t batch_cnt, bool unprepared) {
1023
0
      assert(batches_.count(seq) == 0);
1024
0
      batches_[seq] = {log_number, batch, batch_cnt};
1025
      // Prior state must be unprepared, since the prepare batch must be the
1026
      // last batch.
1027
0
      assert(unprepared_);
1028
0
      unprepared_ = unprepared;
1029
0
    }
1030
  };
1031
1032
67.4k
  bool allow_2pc() const { return immutable_db_options_.allow_2pc; }
1033
1034
  std::unordered_map<std::string, RecoveredTransaction*>
1035
0
  recovered_transactions() {
1036
0
    return recovered_transactions_;
1037
0
  }
1038
1039
0
  RecoveredTransaction* GetRecoveredTransaction(const std::string& name) {
1040
0
    auto it = recovered_transactions_.find(name);
1041
0
    if (it == recovered_transactions_.end()) {
1042
0
      return nullptr;
1043
0
    } else {
1044
0
      return it->second;
1045
0
    }
1046
0
  }
1047
1048
  void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
1049
                                  WriteBatch* batch, SequenceNumber seq,
1050
0
                                  size_t batch_cnt, bool unprepared_batch) {
1051
    // For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple
1052
    // times for every unprepared batch encountered during recovery.
1053
    //
1054
    // If the transaction is prepared, then the last call to
1055
    // InsertRecoveredTransaction will have unprepared_batch = false.
1056
0
    auto rtxn = recovered_transactions_.find(name);
1057
0
    if (rtxn == recovered_transactions_.end()) {
1058
0
      recovered_transactions_[name] = new RecoveredTransaction(
1059
0
          log, name, batch, seq, batch_cnt, unprepared_batch);
1060
0
    } else {
1061
0
      rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch);
1062
0
    }
1063
0
    logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log);
1064
0
  }
1065
1066
0
  void DeleteRecoveredTransaction(const std::string& name) {
1067
0
    auto it = recovered_transactions_.find(name);
1068
0
    assert(it != recovered_transactions_.end());
1069
0
    auto* trx = it->second;
1070
0
    recovered_transactions_.erase(it);
1071
0
    for (const auto& info : trx->batches_) {
1072
0
      logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(
1073
0
          info.second.log_number_);
1074
0
    }
1075
0
    delete trx;
1076
0
  }
1077
1078
0
  void DeleteAllRecoveredTransactions() {
1079
0
    for (auto it = recovered_transactions_.begin();
1080
0
         it != recovered_transactions_.end(); ++it) {
1081
0
      delete it->second;
1082
0
    }
1083
0
    recovered_transactions_.clear();
1084
0
  }
1085
1086
0
  void AddToLogsToFreeQueue(log::Writer* log_writer) {
1087
0
    mutex_.AssertHeld();
1088
0
    wals_to_free_queue_.push_back(log_writer);
1089
0
  }
1090
1091
0
  void AddSuperVersionsToFreeQueue(SuperVersion* sv) {
1092
0
    superversions_to_free_queue_.push_back(sv);
1093
0
  }
1094
1095
  void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
1096
1097
  // Fill JobContext with snapshot information needed by flush and compaction.
1098
  void InitSnapshotContext(JobContext* job_context);
1099
1100
  // Not thread-safe.
1101
  void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback);
1102
1103
52.6k
  InstrumentedMutex* mutex() const { return &mutex_; }
1104
1105
  // Initialize a brand new DB. The DB directory is expected to be empty before
1106
  // calling it. Push new manifest file name into `new_filenames`.
1107
  Status NewDB(std::vector<std::string>* new_filenames);
1108
1109
  // This is to be used only by internal rocksdb classes.
1110
  static Status Open(const DBOptions& db_options, const std::string& name,
1111
                     const std::vector<ColumnFamilyDescriptor>& column_families,
1112
                     std::vector<ColumnFamilyHandle*>* handles,
1113
                     std::unique_ptr<DB>* dbptr, const bool seq_per_batch,
1114
                     const bool batch_per_txn, const bool is_retry,
1115
                     bool* can_retry);
1116
1117
  static IOStatus CreateAndNewDirectory(
1118
      FileSystem* fs, const std::string& dirname,
1119
      std::unique_ptr<FSDirectory>* directory);
1120
1121
  // find stats map from stats_history_ with smallest timestamp in
1122
  // the range of [start_time, end_time)
1123
  bool FindStatsByTime(uint64_t start_time, uint64_t end_time,
1124
                       uint64_t* new_time,
1125
                       std::map<std::string, uint64_t>* stats_map);
1126
1127
  // Print information of all tombstones of all iterators to the std::string
1128
  // This is only used by ldb. The output might be capped. Tombstones
1129
  // printed out are not guaranteed to be in any order.
1130
  Status TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
1131
                                     int max_entries_to_print,
1132
                                     std::string* out_str);
1133
1134
0
  VersionSet* GetVersionSet() const { return versions_.get(); }
1135
1136
  Status WaitForCompact(
1137
      const WaitForCompactOptions& wait_for_compact_options) override;
1138
1139
#ifndef NDEBUG
1140
  // Compact any files in the named level that overlap [*begin, *end]
1141
  Status TEST_CompactRange(int level, const Slice* begin, const Slice* end,
1142
                           ColumnFamilyHandle* column_family = nullptr,
1143
                           bool disallow_trivial_move = false);
1144
1145
  Status TEST_SwitchWAL();
1146
1147
  bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; }
1148
1149
  bool TEST_IsLogGettingFlushed() {
1150
    return alive_wal_files_.begin()->getting_flushed;
1151
  }
1152
1153
  Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
1154
1155
  // Force current memtable contents to be flushed.
1156
  Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
1157
                            ColumnFamilyHandle* cfh = nullptr);
1158
1159
  Status TEST_FlushMemTable(ColumnFamilyData* cfd,
1160
                            const FlushOptions& flush_opts);
1161
1162
  // Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This
1163
  // is because in certain cases, we can flush column families, wait for the
1164
  // flush to complete, but delete the column family handle before the wait
1165
  // finishes. For example in CompactRange.
1166
  Status TEST_AtomicFlushMemTables(
1167
      const autovector<ColumnFamilyData*>& provided_candidate_cfds,
1168
      const FlushOptions& flush_opts);
1169
1170
  // Wait for background threads to complete scheduled work.
1171
  Status TEST_WaitForBackgroundWork();
1172
1173
  // Wait for memtable compaction
1174
  Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
1175
1176
  Status TEST_WaitForCompact();
1177
  Status TEST_WaitForCompact(
1178
      const WaitForCompactOptions& wait_for_compact_options);
1179
1180
  // Wait for any background purge
1181
  Status TEST_WaitForPurge();
1182
1183
  // Get the background error status
1184
  Status TEST_GetBGError();
1185
1186
  bool TEST_IsRecoveryInProgress();
1187
1188
  // Return the maximum overlapping data (in bytes) at next level for any
1189
  // file at a level >= 1.
1190
  uint64_t TEST_MaxNextLevelOverlappingBytes(
1191
      ColumnFamilyHandle* column_family = nullptr);
1192
1193
  // Return the current manifest file no.
1194
  uint64_t TEST_Current_Manifest_FileNo();
1195
1196
  // Returns the number that'll be assigned to the next file that's created.
1197
  uint64_t TEST_Current_Next_FileNo();
1198
1199
  // get total level0 file size. Only for testing.
1200
  uint64_t TEST_GetLevel0TotalSize();
1201
1202
  void TEST_GetFilesMetaData(
1203
      ColumnFamilyHandle* column_family,
1204
      std::vector<std::vector<FileMetaData>>* metadata,
1205
      std::vector<std::shared_ptr<BlobFileMetaData>>* blob_metadata = nullptr);
1206
1207
  void TEST_LockMutex();
1208
1209
  void TEST_UnlockMutex();
1210
1211
  InstrumentedMutex* TEST_Mutex() { return &mutex_; }
1212
1213
  void TEST_SignalAllBgCv();
1214
1215
  // REQUIRES: mutex locked
1216
  void* TEST_BeginWrite();
1217
1218
  // REQUIRES: mutex locked
1219
  // pass the pointer that you got from TEST_BeginWrite()
1220
  void TEST_EndWrite(void* w);
1221
1222
  uint64_t TEST_MaxTotalInMemoryState() const {
1223
    return max_total_in_memory_state_;
1224
  }
1225
1226
  size_t TEST_LogsToFreeSize();
1227
1228
  uint64_t TEST_LogfileNumber();
1229
1230
  uint64_t TEST_wals_total_size() const {
1231
    return wals_total_size_.LoadRelaxed();
1232
  }
1233
1234
  void TEST_GetAllBlockCaches(std::unordered_set<const Cache*>* cache_set);
1235
1236
  // Return the lastest MutableCFOptions of a column family
1237
  Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family,
1238
                                        MutableCFOptions* mutable_cf_options);
1239
1240
  Cache* TEST_table_cache() { return table_cache_.get(); }
1241
1242
  WriteController& TEST_write_controler() { return write_controller_; }
1243
1244
  uint64_t TEST_FindMinLogContainingOutstandingPrep();
1245
  uint64_t TEST_FindMinPrepLogReferencedByMemTable();
1246
  size_t TEST_PreparedSectionCompletedSize();
1247
  size_t TEST_LogsWithPrepSize();
1248
1249
  int TEST_BGCompactionsAllowed() const;
1250
  int TEST_BGFlushesAllowed() const;
1251
  int TEST_NumRunningBottomCompactions() const;
1252
  size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
1253
  void TEST_WaitForPeriodicTaskRun(std::function<void()> callback) const;
1254
  SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const;
1255
  const autovector<uint64_t>& TEST_GetFilesToQuarantine() const;
1256
  size_t TEST_EstimateInMemoryStatsHistorySize() const;
1257
1258
  uint64_t TEST_GetCurrentLogNumber() const {
1259
    InstrumentedMutexLock l(mutex());
1260
    assert(!logs_.empty());
1261
    return logs_.back().number;
1262
  }
1263
1264
  void TEST_DeleteObsoleteFiles();
1265
1266
  const std::unordered_set<uint64_t>& TEST_GetFilesGrabbedForPurge() const {
1267
    return files_grabbed_for_purge_;
1268
  }
1269
1270
  const PeriodicTaskScheduler& TEST_GetPeriodicTaskScheduler() const;
1271
1272
  static Status TEST_ValidateOptions(const DBOptions& db_options) {
1273
    return ValidateOptions(db_options);
1274
  }
1275
#endif  // NDEBUG
1276
1277
  // In certain configurations, verify that the table/blob file cache only
1278
  // contains entries for live files, to check for effective leaks of open
1279
  // files. This can only be called when purging of obsolete files has
1280
  // "settled," such as during parts of DB Close().
1281
  void TEST_VerifyNoObsoleteFilesCached(bool db_mutex_already_held) const;
1282
1283
  // persist stats to column family "_persistent_stats"
1284
  void PersistStats();
1285
1286
  // dump rocksdb.stats to LOG
1287
  void DumpStats();
1288
1289
  // flush LOG out of application buffer
1290
  void FlushInfoLog();
1291
1292
  // For the background timer job
1293
  void RecordSeqnoToTimeMapping();
1294
1295
  // Compactions rely on an event triggers like flush/compaction/SetOptions.
1296
  // We need to trigger periodic compactions even when there is no such trigger.
1297
  // This function checks and schedules available compactions and will run
1298
  // periodically.
1299
  void TriggerPeriodicCompaction();
1300
1301
  // REQUIRES: DB mutex held
1302
  std::pair<SequenceNumber, uint64_t> GetSeqnoToTimeSample() const;
1303
1304
  // REQUIRES: DB mutex held or during open
1305
  void EnsureSeqnoToTimeMapping(const MinAndMaxPreserveSeconds& preserve_secs);
1306
1307
  // Only called during open
1308
  void PrepopulateSeqnoToTimeMapping(
1309
      const MinAndMaxPreserveSeconds& preserve_secs);
1310
1311
  // Interface to block and signal the DB in case of stalling writes by
1312
  // WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface.
1313
  // When DB needs to be blocked or signalled by WriteBufferManager,
1314
  // state_ is changed accordingly.
1315
  class WBMStallInterface : public StallInterface {
1316
   public:
1317
    enum State {
1318
      BLOCKED = 0,
1319
      RUNNING,
1320
    };
1321
1322
40.2k
    WBMStallInterface() : state_cv_(&state_mutex_) {
1323
40.2k
      MutexLock lock(&state_mutex_);
1324
40.2k
      state_ = State::RUNNING;
1325
40.2k
    }
1326
1327
0
    void SetState(State state) {
1328
0
      MutexLock lock(&state_mutex_);
1329
0
      state_ = state;
1330
0
    }
1331
1332
    // Change the state_ to State::BLOCKED and wait until its state is
1333
    // changed by WriteBufferManager. When stall is cleared, Signal() is
1334
    // called to change the state and unblock the DB.
1335
0
    void Block() override {
1336
0
      MutexLock lock(&state_mutex_);
1337
0
      while (state_ == State::BLOCKED) {
1338
0
        TEST_SYNC_POINT("WBMStallInterface::BlockDB");
1339
0
        state_cv_.Wait();
1340
0
      }
1341
0
    }
1342
1343
    // Called from WriteBufferManager. This function changes the state_
1344
    // to State::RUNNING indicating the stall is cleared and DB can proceed.
1345
40.2k
    void Signal() override {
1346
40.2k
      {
1347
40.2k
        MutexLock lock(&state_mutex_);
1348
40.2k
        state_ = State::RUNNING;
1349
40.2k
      }
1350
40.2k
      state_cv_.Signal();
1351
40.2k
    }
1352
1353
   private:
1354
    // Conditional variable and mutex to block and
1355
    // signal the DB during stalling process.
1356
    port::Mutex state_mutex_;
1357
    port::CondVar state_cv_;
1358
    // state represting whether DB is running or blocked because of stall by
1359
    // WriteBufferManager.
1360
    State state_;
1361
  };
1362
1363
  static void TEST_ResetDbSessionIdGen();
1364
  static std::string GenerateDbSessionId(Env* env);
1365
1366
0
  bool seq_per_batch() const { return seq_per_batch_; }
1367
1368
 protected:
1369
  const std::string dbname_;
1370
  // TODO(peterd): unify with VersionSet::db_id_
1371
  std::string db_id_;
1372
  // db_session_id_ is an identifier that gets reset
1373
  // every time the DB is opened
1374
  std::string db_session_id_;
1375
  std::unique_ptr<VersionSet> versions_;
1376
  // Flag to check whether we allocated and own the info log file
1377
  bool own_info_log_;
1378
  Status init_logger_creation_s_;
1379
  const DBOptions initial_db_options_;
1380
  Env* const env_;
1381
  std::shared_ptr<IOTracer> io_tracer_;
1382
  const ImmutableDBOptions immutable_db_options_;
1383
  FileSystemPtr fs_;
1384
  MutableDBOptions mutable_db_options_;
1385
  Statistics* stats_;
1386
  std::unordered_map<std::string, RecoveredTransaction*>
1387
      recovered_transactions_;
1388
  std::unique_ptr<Tracer> tracer_;
1389
  InstrumentedMutex trace_mutex_;
1390
  BlockCacheTracer block_cache_tracer_;
1391
1392
  // constant false canceled flag, used when the compaction is not manual
1393
  const std::atomic<bool> kManualCompactionCanceledFalse_{false};
1394
1395
  // State below is protected by mutex_
1396
  // With two_write_queues enabled, some of the variables that accessed during
1397
  // WriteToWAL need different synchronization: wal_empty_, alive_wal_files_,
1398
  // logs_, cur_wal_number_. Refer to the definition of each variable below for
1399
  // more description.
1400
  //
1401
  // Protects access to most ColumnFamilyData methods, see more in comment for
1402
  // each method.
1403
  //
1404
  // `mutex_` can be a hot lock in some workloads, so it deserves dedicated
1405
  // cachelines.
1406
  mutable CacheAlignedInstrumentedMutex mutex_;
1407
1408
  ColumnFamilyHandleImpl* default_cf_handle_ = nullptr;
1409
  InternalStats* default_cf_internal_stats_ = nullptr;
1410
1411
  // table_cache_ provides its own synchronization
1412
  std::shared_ptr<Cache> table_cache_;
1413
1414
  ErrorHandler error_handler_;
1415
1416
  // Unified interface for logging events
1417
  EventLogger event_logger_;
1418
1419
  // only used for dynamically adjusting max_total_wal_size. it is a sum of
1420
  // [write_buffer_size * max_write_buffer_number] over all column families
1421
  std::atomic<uint64_t> max_total_in_memory_state_ = 0;
1422
1423
  // The options to access storage files
1424
  const FileOptions file_options_;
1425
1426
  // Additonal options for compaction and flush
1427
  FileOptions file_options_for_compaction_;
1428
1429
  std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
1430
1431
  // Increase the sequence number after writing each batch, whether memtable is
1432
  // disabled for that or not. Otherwise the sequence number is increased after
1433
  // writing each key into memtable. This implies that when disable_memtable is
1434
  // set, the seq is not increased at all.
1435
  //
1436
  // Default: false
1437
  const bool seq_per_batch_;
1438
  // This determines during recovery whether we expect one writebatch per
1439
  // recovered transaction, or potentially multiple writebatches per
1440
  // transaction. For WriteUnprepared, this is set to false, since multiple
1441
  // batches can exist per transaction.
1442
  //
1443
  // Default: true
1444
  const bool batch_per_txn_;
1445
1446
  // Each flush or compaction gets its own job id. this counter makes sure
1447
  // they're unique
1448
  std::atomic<int> next_job_id_ = 1;
1449
1450
  std::atomic<bool> shutting_down_ = false;
1451
1452
  // No new background jobs can be queued if true. This is used to prevent new
1453
  // background jobs from being queued after WaitForCompact() completes waiting
1454
  // all background jobs then attempts to close when close_db_ option is true.
1455
  bool reject_new_background_jobs_ = false;
1456
1457
  // RecoveryContext struct stores the context about version edits along
1458
  // with corresponding column_family_data and column_family_options.
1459
  class RecoveryContext {
1460
   public:
1461
40.2k
    ~RecoveryContext() {
1462
54.8k
      for (auto& edit_list : edit_lists_) {
1463
153k
        for (auto* edit : edit_list) {
1464
153k
          delete edit;
1465
153k
        }
1466
54.8k
      }
1467
40.2k
    }
1468
1469
153k
    void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) {
1470
153k
      assert(cfd != nullptr);
1471
153k
      if (map_.find(cfd->GetID()) == map_.end()) {
1472
54.8k
        uint32_t size = static_cast<uint32_t>(map_.size());
1473
54.8k
        map_.emplace(cfd->GetID(), size);
1474
54.8k
        cfds_.emplace_back(cfd);
1475
54.8k
        edit_lists_.emplace_back(autovector<VersionEdit*>());
1476
54.8k
      }
1477
153k
      uint32_t i = map_[cfd->GetID()];
1478
153k
      edit_lists_[i].emplace_back(new VersionEdit(edit));
1479
153k
    }
1480
1481
    std::unordered_map<uint32_t, uint32_t> map_;  // cf_id to index;
1482
    autovector<ColumnFamilyData*> cfds_;
1483
    autovector<autovector<VersionEdit*>> edit_lists_;
1484
    // All existing data files (SST files and Blob files) found during DB::Open.
1485
    std::vector<std::string> existing_data_files_;
1486
    bool is_new_db_ = false;
1487
  };
1488
1489
  // Persist options to options file. Must be holding options_mutex_.
1490
  // Will lock DB mutex if !db_mutex_already_held.
1491
  Status WriteOptionsFile(const WriteOptions& write_options,
1492
                          bool db_mutex_already_held);
1493
1494
  Status CompactRangeInternal(const CompactRangeOptions& options,
1495
                              ColumnFamilyHandle* column_family,
1496
                              const Slice* begin, const Slice* end,
1497
                              const std::string& trim_ts);
1498
1499
  // The following two functions can only be called when:
1500
  // 1. WriteThread::Writer::EnterUnbatched() is used.
1501
  // 2. db_mutex is NOT held
1502
  Status RenameTempFileToOptionsFile(const std::string& file_name,
1503
                                     bool is_remote_compaction_enabled);
1504
  Status DeleteObsoleteOptionsFiles();
1505
1506
  void NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds,
1507
                                    FlushReason flush_reason);
1508
1509
  void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
1510
                          const MutableCFOptions& mutable_cf_options,
1511
                          int job_id, FlushReason flush_reason);
1512
1513
  void NotifyOnFlushCompleted(
1514
      ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
1515
      std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info);
1516
1517
  void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1518
                               const Status& st,
1519
                               const CompactionJobStats& job_stats, int job_id);
1520
1521
  void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction* c,
1522
                                   const Status& st,
1523
                                   const CompactionJobStats& job_stats,
1524
                                   int job_id);
1525
  void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
1526
                              const MemTableInfo& mem_table_info);
1527
1528
  void NotifyOnExternalFileIngested(
1529
      ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
1530
1531
  Status FlushAllColumnFamilies(const FlushOptions& flush_options,
1532
                                FlushReason flush_reason);
1533
1534
  virtual Status FlushForGetLiveFiles(bool force_atomic_flush = false);
1535
1536
  void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
1537
1538
  void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
1539
1540
  void EraseThreadStatusDbInfo() const;
1541
1542
  // For CFs that has updates in `wbwi`, their memtable will be switched,
1543
  // and `wbwi` will be added as the latest immutable memtable.
1544
  //
1545
  // REQUIRES: this thread is currently at the front of the main writer queue.
1546
  // @param prep_log refers to the WAL that contains prepare record
1547
  // for the transaction based on wbwi.
1548
  // @param assigned_seqno Sequence numbers for the ingested memtable.
1549
  // @param last_seqno the value of versions_->LastSequence() after the write
1550
  // ingests `wbwi` is done.
1551
  // @param memtable_updated Whether the same write that ingests wbwi has
1552
  // updated memtable. This is useful for determining whether to set bg
1553
  // error when IngestWBWIAsMemtable fails.
1554
  Status IngestWBWIAsMemtable(std::shared_ptr<WriteBatchWithIndex> wbwi,
1555
                              const WBWIMemTable::SeqnoRange& assigned_seqno,
1556
                              uint64_t min_prep_log, SequenceNumber last_seqno,
1557
                              bool memtable_updated, bool ignore_missing_cf);
1558
1559
  // If disable_memtable is set the application logic must guarantee that the
1560
  // batch will still be skipped from memtable during the recovery. An excption
1561
  // to this is seq_per_batch_ mode, in which since each batch already takes one
1562
  // seq, it is ok for the batch to write to memtable during recovery as long as
1563
  // it only takes one sequence number: i.e., no duplicate keys.
1564
  // In WriteCommitted it is guarnateed since disable_memtable is used for
1565
  // prepare batch which will be written to memtable later during the commit,
1566
  // and in WritePrepared it is guaranteed since it will be used only for WAL
1567
  // markers which will never be written to memtable. If the commit marker is
1568
  // accompanied with CommitTimeWriteBatch that is not written to memtable as
1569
  // long as it has no duplicate keys, it does not violate the one-seq-per-batch
1570
  // policy.
1571
  // batch_cnt is expected to be non-zero in seq_per_batch mode and
1572
  // indicates the number of sub-patches. A sub-patch is a subset of the write
1573
  // batch that does not have duplicate keys.
1574
  // `callback` is called before WAL write.
1575
  // See more in comment above WriteCallback::Callback().
1576
  // pre_release_callback is called after WAL write and before memtable write.
1577
  // See more in comment above PreReleaseCallback::Callback().
1578
  // post_memtable_callback is called after memtable write but before publishing
1579
  // the sequence number to readers.
1580
  //
1581
  // The main write queue. This is the only write queue that updates
1582
  // LastSequence. When using one write queue, the same sequence also indicates
1583
  // the last published sequence.
1584
  Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
1585
                   WriteCallback* callback = nullptr,
1586
                   UserWriteCallback* user_write_cb = nullptr,
1587
                   uint64_t* wal_used = nullptr, uint64_t log_ref = 0,
1588
                   bool disable_memtable = false, uint64_t* seq_used = nullptr,
1589
                   size_t batch_cnt = 0,
1590
                   PreReleaseCallback* pre_release_callback = nullptr,
1591
                   PostMemTableCallback* post_memtable_callback = nullptr,
1592
                   std::shared_ptr<WriteBatchWithIndex> wbwi = nullptr);
1593
1594
  Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
1595
                            WriteCallback* callback = nullptr,
1596
                            UserWriteCallback* user_write_cb = nullptr,
1597
                            uint64_t* wal_used = nullptr, uint64_t log_ref = 0,
1598
                            bool disable_memtable = false,
1599
                            uint64_t* seq_used = nullptr);
1600
1601
  // Write only to memtables without joining any write queue
1602
  Status UnorderedWriteMemtable(const WriteOptions& write_options,
1603
                                WriteBatch* my_batch, WriteCallback* callback,
1604
                                uint64_t log_ref, SequenceNumber seq,
1605
                                const size_t sub_batch_cnt);
1606
1607
  // Whether the batch requires to be assigned with an order
1608
  enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder };
1609
  // Whether it requires publishing last sequence or not
1610
  enum PublishLastSeq : bool { kDontPublishLastSeq, kDoPublishLastSeq };
1611
1612
  // Join the write_thread to write the batch only to the WAL. It is the
1613
  // responsibility of the caller to also write the write batch to the memtable
1614
  // if it required.
1615
  //
1616
  // sub_batch_cnt is expected to be non-zero when assign_order = kDoAssignOrder
1617
  // indicating the number of sub-batches in my_batch. A sub-patch is a subset
1618
  // of the write batch that does not have duplicate keys. When seq_per_batch is
1619
  // not set, each key is a separate sub_batch. Otherwise each duplicate key
1620
  // marks start of a new sub-batch.
1621
  Status WriteImplWALOnly(
1622
      WriteThread* write_thread, const WriteOptions& options,
1623
      WriteBatch* updates, WriteCallback* callback,
1624
      UserWriteCallback* user_write_cb, uint64_t* wal_used,
1625
      const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
1626
      PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
1627
      const PublishLastSeq publish_last_seq, const bool disable_memtable);
1628
1629
  // write cached_recoverable_state_ to memtable if it is not empty
1630
  // The writer must be the leader in write_thread_ and holding mutex_
1631
  Status WriteRecoverableState();
1632
1633
  // Actual implementation of Close()
1634
  virtual Status CloseImpl();
1635
1636
  // Recover the descriptor from persistent storage.  May do a significant
1637
  // amount of work to recover recently logged updates.  Any changes to
1638
  // be made to the descriptor are added to *edit.
1639
  // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
1640
  // skipped.
1641
  // recovery_ctx stores the context about version edits and all those
1642
  // edits are persisted to new Manifest after successfully syncing the new WAL.
1643
  virtual Status Recover(
1644
      const std::vector<ColumnFamilyDescriptor>& column_families,
1645
      bool read_only = false, bool error_if_wal_file_exists = false,
1646
      bool error_if_data_exists_in_wals = false, bool is_retry = false,
1647
      uint64_t* recovered_seq = nullptr,
1648
      RecoveryContext* recovery_ctx = nullptr, bool* can_retry = nullptr);
1649
1650
86.8k
  virtual bool OwnTablesAndLogs() const { return true; }
1651
1652
  // Read/create DB identity file (as appropriate), and write DB ID to
1653
  // version_edit if provided.
1654
  Status SetupDBId(const WriteOptions& write_options, bool read_only,
1655
                   bool is_new_db, bool is_retry, VersionEdit* version_edit);
1656
  // Assign db_id_ and write DB ID to version_edit if provided.
1657
  void SetDBId(std::string&& id, bool read_only, VersionEdit* version_edit);
1658
1659
  // Collect a deduplicated collection of paths used by this DB, including
1660
  // dbname_, DBOptions.db_paths, ColumnFamilyOptions.cf_paths.
1661
  std::set<std::string> CollectAllDBPaths();
1662
1663
  // REQUIRES: db mutex held when calling this function, but the db mutex can
1664
  // be released and re-acquired. Db mutex will be held when the function
1665
  // returns.
1666
  // It stores all existing data files (SST and Blob) in RecoveryContext. In
1667
  // the meantime, we find out the largest file number present in the paths, and
1668
  // bump up the version set's next_file_number_ to be 1 + largest_file_number.
1669
  // recovery_ctx stores the context about version edits. All those edits are
1670
  // persisted to new Manifest after successfully syncing the new WAL.
1671
  Status MaybeUpdateNextFileNumber(RecoveryContext* recovery_ctx);
1672
1673
  // Track existing data files, including both referenced and unreferenced SST
1674
  // and Blob files in SstFileManager. This is only called during DB::Open and
1675
  // it's called before any file deletion start so that their deletion can be
1676
  // properly rate limited.
1677
  // Files may not be referenced in the MANIFEST because (e.g.
1678
  // 1. It's best effort recovery;
1679
  // 2. The VersionEdits referencing the SST files are appended to
1680
  // RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
1681
  // still not synced to MANIFEST during recovery.)
1682
  //
1683
  // If the file is referenced in Manifest (typically that's the
1684
  // vast majority of all files), since it already has the file size
1685
  // on record, we don't need to query the file system. Otherwise, we query the
1686
  // file system for the size of an unreferenced file.
1687
  // REQUIRES: mutex unlocked
1688
  void TrackExistingDataFiles(
1689
      const std::vector<std::string>& existing_data_files);
1690
1691
  // Untrack data files in sst manager. This is only called during DB::Close on
1692
  // an unowned SstFileManager, to return it to a consistent state.
1693
  // REQUIRES: mutex unlocked
1694
  void UntrackDataFiles();
1695
1696
  // SetDbSessionId() should be called in the constuctor DBImpl()
1697
  // to ensure that db_session_id_ gets updated every time the DB is opened
1698
  void SetDbSessionId();
1699
1700
  Status FailIfCfHasTs(const ColumnFamilyHandle* column_family) const;
1701
  Status FailIfTsMismatchCf(ColumnFamilyHandle* column_family,
1702
                            const Slice& ts) const;
1703
1704
  // Check that the read timestamp `ts` is at or above the `full_history_ts_low`
1705
  // timestamp in a `SuperVersion`. It's necessary to do this check after
1706
  // grabbing the SuperVersion. If the check passed, the referenced SuperVersion
1707
  // this read holds on to can ensure the read won't be affected if
1708
  // `full_history_ts_low` is increased concurrently, and this achieves that
1709
  // without explicitly locking by piggybacking the SuperVersion.
1710
  Status FailIfReadCollapsedHistory(const ColumnFamilyData* cfd,
1711
                                    const SuperVersion* sv,
1712
                                    const Slice& ts) const;
1713
1714
  // recovery_ctx stores the context about version edits and
1715
  // LogAndApplyForRecovery persist all those edits to new Manifest after
1716
  // successfully syncing new WAL.
1717
  // LogAndApplyForRecovery should be called only once during recovery and it
1718
  // should be called when RocksDB writes to a first new MANIFEST since this
1719
  // recovery.
1720
  Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);
1721
1722
  // Schedule background work to open and validate SST files asynchronously.
1723
  // Called when open_files_async is enabled.
1724
  void ScheduleAsyncFileOpening();
1725
1726
  // Mark async file opening as not needed. Used by subclasses that load
1727
  // table files through a different mechanism (e.g., ReactiveVersionSet).
1728
  void MarkAsyncFileOpenNotNeeded();
1729
1730
  // Background work function for async file opening.
1731
  static void BGWorkAsyncFileOpen(void* arg);
1732
1733
  void InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
1734
1735
  // Return true to proceed with current WAL record whose content is stored in
1736
  // `batch`. Return false to skip current WAL record.
1737
  bool InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
1738
                                          const std::string& wal_fname,
1739
                                          log::Reader::Reporter& reporter,
1740
                                          Status& status, bool& stop_replay,
1741
                                          WriteBatch& batch);
1742
1743
  // Indicate DB was opened successfully
1744
  bool opened_successfully_ = false;
1745
1746
 private:
1747
  friend class DB;
1748
  friend class DBImplSecondary;
1749
  friend class ErrorHandler;
1750
  friend class InternalStats;
1751
  friend class PessimisticTransaction;
1752
  friend class TransactionBaseImpl;
1753
  friend class WriteCommittedTxn;
1754
  friend class WritePreparedTxn;
1755
  friend class WritePreparedTxnDB;
1756
  friend class WriteBatchWithIndex;
1757
  friend class WriteUnpreparedTxnDB;
1758
  friend class WriteUnpreparedTxn;
1759
1760
  friend class ForwardIterator;
1761
  friend struct SuperVersion;
1762
  friend class CompactedDBImpl;
1763
  friend class DBImplFollower;
1764
#ifndef NDEBUG
1765
  friend class DBTest_ConcurrentFlushWAL_Test;
1766
  friend class DBTest_MixedSlowdownOptionsStop_Test;
1767
  friend class DBCompactionTest_CompactBottomLevelFilesWithDeletions_Test;
1768
  friend class DBCompactionTest_CompactionDuringShutdown_Test;
1769
  friend class DBCompactionTest_DelayCompactBottomLevelFilesWithDeletions_Test;
1770
  friend class DBCompactionTest_DisableCompactBottomLevelFiles_Test;
1771
  friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test;
1772
  friend class DBTest2_ReadCallbackTest_Test;
1773
  friend class WriteCallbackPTest_WriteWithCallbackTest_Test;
1774
  friend class XFTransactionWriteHandler;
1775
  friend class DBBlobIndexTest;
1776
  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
1777
  friend class CompactionServiceTest_PreservedOptionsLocalCompaction_Test;
1778
  friend class CompactionServiceTest_PreservedOptionsRemoteCompaction_Test;
1779
#endif
1780
1781
  struct CompactionState;
1782
  struct PrepickedCompaction;
1783
  struct PurgeFileInfo;
1784
1785
  struct WriteContext {
1786
    SuperVersionContext superversion_context;
1787
    autovector<ReadOnlyMemTable*> memtables_to_free_;
1788
1789
    explicit WriteContext(bool create_superversion = false)
1790
497k
        : superversion_context(create_superversion) {}
1791
1792
497k
    ~WriteContext() {
1793
497k
      superversion_context.Clean();
1794
497k
      for (auto& m : memtables_to_free_) {
1795
0
        delete m;
1796
0
      }
1797
497k
    }
1798
  };
1799
1800
  struct WalFileNumberSize {
1801
74.8k
    explicit WalFileNumberSize(uint64_t _number) : number(_number) {}
1802
0
    WalFileNumberSize() {}
1803
495k
    void AddSize(uint64_t new_size) { size += new_size; }
1804
    uint64_t number;
1805
    uint64_t size = 0;
1806
    bool getting_flushed = false;
1807
  };
1808
1809
  struct LogWriterNumber {
1810
    // pass ownership of _writer
1811
    LogWriterNumber(uint64_t _number, log::Writer* _writer)
1812
41.9k
        : number(_number), writer(_writer) {}
1813
1814
1.72k
    log::Writer* ReleaseWriter() {
1815
1.72k
      auto* w = writer;
1816
1.72k
      writer = nullptr;
1817
1.72k
      return w;
1818
1.72k
    }
1819
40.2k
    Status ClearWriter() {
1820
40.2k
      Status s;
1821
40.2k
      if (writer->file()) {
1822
        // TODO: plumb Env::IOActivity, Env::IOPriority
1823
40.2k
        s = writer->WriteBuffer(WriteOptions());
1824
40.2k
        if (attempt_truncate_size < SIZE_MAX &&
1825
0
            attempt_truncate_size < writer->file()->GetFileSize()) {
1826
0
          Status s2 = writer->file()->writable_file()->Truncate(
1827
0
              attempt_truncate_size, IOOptions{}, nullptr);
1828
          // This is just a best effort attempt
1829
0
          s2.PermitUncheckedError();
1830
0
        }
1831
40.2k
      }
1832
40.2k
      delete writer;
1833
40.2k
      writer = nullptr;
1834
40.2k
      return s;
1835
40.2k
    }
1836
1837
1.72k
    bool IsSyncing() { return getting_synced; }
1838
1839
0
    uint64_t GetPreSyncSize() {
1840
0
      assert(getting_synced);
1841
0
      return pre_sync_size;
1842
0
    }
1843
1844
1.72k
    void PrepareForSync() {
1845
1.72k
      assert(!getting_synced);
1846
      // Ensure the head of logs_ is marked as getting_synced if any is.
1847
1.72k
      getting_synced = true;
1848
      // If last sync failed on a later WAL, this could be a fully synced
1849
      // and closed WAL that just needs to be recorded as synced in the
1850
      // manifest.
1851
1.72k
      if (writer->file()) {
1852
        // Size is expected to be monotonically increasing.
1853
1.72k
        assert(writer->file()->GetFlushedSize() >= pre_sync_size);
1854
1.72k
        pre_sync_size = writer->file()->GetFlushedSize();
1855
1.72k
      }
1856
1.72k
    }
1857
1858
1.72k
    void FinishSync() {
1859
1.72k
      assert(getting_synced);
1860
1.72k
      getting_synced = false;
1861
1.72k
    }
1862
1863
0
    void SetAttemptTruncateSize(uint64_t size) {
1864
0
      assert(attempt_truncate_size == SIZE_MAX);
1865
0
      attempt_truncate_size = size;
1866
0
    }
1867
1868
    uint64_t number;
1869
    // Visual Studio doesn't support deque's member to be noncopyable because
1870
    // of a std::unique_ptr as a member.
1871
    log::Writer* writer;  // own
1872
1873
   private:
1874
    // true for some prefix of logs_
1875
    bool getting_synced = false;
1876
    // The size of the file before the sync happens. This amount is guaranteed
1877
    // to be persisted even if appends happen during sync so it can be used for
1878
    // tracking the synced size in MANIFEST.
1879
    uint64_t pre_sync_size = 0;
1880
    // When < SIZE_MAX, attempt to truncate the WAL to this size on close,
1881
    // because a bad entry was written to it beyond that point and it likely
1882
    // won't be recoverable with the bad entry.
1883
    uint64_t attempt_truncate_size = SIZE_MAX;
1884
  };
1885
1886
  struct WalContext {
1887
    explicit WalContext(bool need_sync = false)
1888
495k
        : need_wal_sync(need_sync), need_wal_dir_sync(need_sync) {}
1889
    bool need_wal_sync = false;
1890
    bool need_wal_dir_sync = false;
1891
    log::Writer* writer = nullptr;
1892
    WalFileNumberSize* wal_file_number_size = nullptr;
1893
    uint64_t prev_size = SIZE_MAX;
1894
  };
1895
1896
  // PurgeFileInfo is a structure to hold information of files to be deleted in
1897
  // purge_files_
1898
  struct PurgeFileInfo {
1899
    std::string fname;
1900
    std::string dir_to_sync;
1901
    FileType type;
1902
    uint64_t number;
1903
    int job_id;
1904
    PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num,
1905
                  int jid)
1906
0
        : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {}
1907
  };
1908
1909
  // Argument required by background flush thread.
1910
  struct BGFlushArg {
1911
    BGFlushArg()
1912
        : cfd_(nullptr),
1913
          max_memtable_id_(0),
1914
          superversion_context_(nullptr),
1915
0
          flush_reason_(FlushReason::kOthers) {}
1916
    BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id,
1917
               SuperVersionContext* superversion_context,
1918
               FlushReason flush_reason, bool atomic_flush)
1919
1.72k
        : cfd_(cfd),
1920
1.72k
          max_memtable_id_(max_memtable_id),
1921
1.72k
          superversion_context_(superversion_context),
1922
1.72k
          flush_reason_(flush_reason),
1923
1.72k
          atomic_flush_(atomic_flush) {}
1924
1925
    // Column family to flush.
1926
    ColumnFamilyData* cfd_;
1927
    // Maximum ID of memtable to flush. In this column family, memtables with
1928
    // IDs smaller than this value must be flushed before this flush completes.
1929
    uint64_t max_memtable_id_;
1930
    // Pointer to a SuperVersionContext object. After flush completes, RocksDB
1931
    // installs a new superversion for the column family. This operation
1932
    // requires a SuperVersionContext object (currently embedded in JobContext).
1933
    SuperVersionContext* superversion_context_;
1934
    FlushReason flush_reason_;
1935
    // Whether this flush should use atomic flush code path.
1936
    bool atomic_flush_ = false;
1937
  };
1938
1939
  // Argument passed to flush thread.
1940
  struct FlushThreadArg {
1941
    DBImpl* db_;
1942
1943
    Env::Priority thread_pri_;
1944
  };
1945
1946
  // Information for a manual compaction
1947
  struct ManualCompactionState {
1948
    ManualCompactionState(ColumnFamilyData* _cfd, int _input_level,
1949
                          int _output_level, uint32_t _output_path_id,
1950
                          bool _exclusive, bool _disallow_trivial_move,
1951
                          std::atomic<bool>* _canceled)
1952
2.14k
        : cfd(_cfd),
1953
2.14k
          input_level(_input_level),
1954
2.14k
          output_level(_output_level),
1955
2.14k
          output_path_id(_output_path_id),
1956
2.14k
          exclusive(_exclusive),
1957
2.14k
          disallow_trivial_move(_disallow_trivial_move),
1958
2.14k
          canceled(_canceled ? *_canceled : canceled_internal_storage) {}
1959
    // When _canceled is not provided by ther user, we assign the reference of
1960
    // canceled_internal_storage to it to consolidate canceled and
1961
    // manual_compaction_paused since DisableManualCompaction() might be
1962
    // called
1963
1964
    ColumnFamilyData* cfd;
1965
    int input_level;
1966
    int output_level;
1967
    uint32_t output_path_id;
1968
    Status status;
1969
    bool done = false;
1970
    bool in_progress = false;    // compaction request being processed?
1971
    bool incomplete = false;     // only part of requested range compacted
1972
    bool exclusive;              // current behavior of only one manual
1973
    bool disallow_trivial_move;  // Force actual compaction to run
1974
    const InternalKey* begin = nullptr;  // nullptr means beginning of key range
1975
    const InternalKey* end = nullptr;    // nullptr means end of key range
1976
    InternalKey* manual_end = nullptr;   // how far we are compacting
1977
    InternalKey tmp_storage;   // Used to keep track of compaction progress
1978
    InternalKey tmp_storage1;  // Used to keep track of compaction progress
1979
1980
    // When the user provides a canceled pointer in CompactRangeOptions, the
1981
    // above varaibe is the reference of the user-provided
1982
    // `canceled`, otherwise, it is the reference of canceled_internal_storage
1983
    std::atomic<bool> canceled_internal_storage = false;
1984
    std::atomic<bool>& canceled;  // Compaction canceled pointer reference
1985
  };
1986
  struct PrepickedCompaction {
1987
    // background compaction takes ownership of `compaction`.
1988
    // TODO(hx235): consider using std::shared_ptr for easier ownership
1989
    // management
1990
    Compaction* compaction;
1991
    // caller retains ownership of `manual_compaction_state` as it is reused
1992
    // across background compactions.
1993
    ManualCompactionState* manual_compaction_state;  // nullptr if non-manual
1994
    // task limiter token is requested during compaction picking.
1995
    std::unique_ptr<TaskLimiterToken> task_token;
1996
    // If true, `compaction` is picked temporarily to express compaction intent
1997
    // and will be released before re-picking a real compaction based on the
1998
    // updated LSM shape when thread associated with `compaction` is ready to
1999
    // run
2000
    bool need_repick;
2001
  };
2002
2003
  struct CompactionArg {
2004
    // caller retains ownership of `db`.
2005
    DBImpl* db;
2006
    // background compaction takes ownership of `prepicked_compaction`.
2007
    PrepickedCompaction* prepicked_compaction;
2008
    Env::Priority compaction_pri_;
2009
  };
2010
2011
0
  static bool IsRecoveryFlush(FlushReason flush_reason) {
2012
0
    return flush_reason == FlushReason::kErrorRecoveryRetryFlush ||
2013
0
           flush_reason == FlushReason::kErrorRecovery;
2014
0
  }
2015
  // Initialize the built-in column family for persistent stats. Depending on
2016
  // whether on-disk persistent stats have been enabled before, it may either
2017
  // create a new column family and column family handle or just a column family
2018
  // handle.
2019
  // Required: DB mutex held
2020
  Status InitPersistStatsColumnFamily();
2021
2022
  // Persistent Stats column family has two format version key which are used
2023
  // for compatibility check. Write format version if it's created for the
2024
  // first time, read format version and check compatibility if recovering
2025
  // from disk. This function requires DB mutex held at entrance but may
2026
  // release and re-acquire DB mutex in the process.
2027
  // Required: DB mutex held
2028
  Status PersistentStatsProcessFormatVersion();
2029
2030
  Status ResumeImpl(DBRecoverContext context);
2031
2032
  void MaybeIgnoreError(Status* s) const;
2033
2034
  const Status CreateArchivalDirectory();
2035
2036
  // Create a column family, without some of the follow-up work yet
2037
  Status CreateColumnFamilyImpl(const ReadOptions& read_options,
2038
                                const WriteOptions& write_options,
2039
                                const ColumnFamilyOptions& cf_options,
2040
                                const std::string& cf_name,
2041
                                ColumnFamilyHandle** handle);
2042
2043
  // Follow-up work to user creating a column family or (families)
2044
  Status WrapUpCreateColumnFamilies(
2045
      const WriteOptions& write_options,
2046
      const std::vector<const ColumnFamilyOptions*>& cf_options);
2047
2048
  Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);
2049
2050
  // Delete any unneeded files and stale in-memory entries.
2051
  void DeleteObsoleteFiles();
2052
  // Delete obsolete files and log status and information of file deletion
2053
  void DeleteObsoleteFileImpl(int job_id, const std::string& fname,
2054
                              const std::string& path_to_sync, FileType type,
2055
                              uint64_t number);
2056
2057
  // Background process needs to call
2058
  //     auto x = CaptureCurrentFileNumberInPendingOutputs()
2059
  //     auto file_num = versions_->NewFileNumber();
2060
  //     <do something>
2061
  //     ReleaseFileNumberFromPendingOutputs(x)
2062
  // This will protect any file with number `file_num` or greater from being
2063
  // deleted while <do something> is running.
2064
  // -----------
2065
  // This function will capture current file number and append it to
2066
  // pending_outputs_. This will prevent any background process to delete any
2067
  // file created after this point.
2068
  std::list<uint64_t>::iterator CaptureCurrentFileNumberInPendingOutputs();
2069
  // This function should be called with the result of
2070
  // CaptureCurrentFileNumberInPendingOutputs(). It then marks that any file
2071
  // created between the calls CaptureCurrentFileNumberInPendingOutputs() and
2072
  // ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live
2073
  // and blocked by any other pending_outputs_ calls)
2074
  void ReleaseFileNumberFromPendingOutputs(
2075
      std::unique_ptr<std::list<uint64_t>::iterator>& v);
2076
2077
  // Similar to pending_outputs, preserve OPTIONS file. Used for remote
2078
  // compaction.
2079
  std::list<uint64_t>::iterator CaptureOptionsFileNumber();
2080
  void ReleaseOptionsFileNumber(
2081
      std::unique_ptr<std::list<uint64_t>::iterator>& v);
2082
2083
  // Sets bg error if there is an error writing to WAL.
2084
  IOStatus SyncClosedWals(const WriteOptions& write_options,
2085
                          JobContext* job_context, VersionEdit* synced_wals,
2086
                          bool error_recovery_in_prog);
2087
2088
  // Flush the in-memory write buffer to storage.  Switches to a new
2089
  // log-file/memtable and writes a new descriptor iff successful. Then
2090
  // installs a new super version for the column family.
2091
  Status FlushMemTableToOutputFile(ColumnFamilyData* cfd,
2092
                                   const MutableCFOptions& mutable_cf_options,
2093
                                   bool* madeProgress, JobContext* job_context,
2094
                                   FlushReason flush_reason,
2095
                                   SuperVersionContext* superversion_context,
2096
                                   LogBuffer* log_buffer,
2097
                                   Env::Priority thread_pri);
2098
2099
  // Flush the memtables of (multiple) column families to multiple files on
2100
  // persistent storage.
2101
  Status FlushMemTablesToOutputFiles(
2102
      const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
2103
      JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
2104
2105
  Status AtomicFlushMemTablesToOutputFiles(
2106
      const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
2107
      JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
2108
2109
  // REQUIRES: log_numbers are sorted in ascending order
2110
  // corrupted_wal_found is set to true if we recover from a corrupted log file.
2111
  Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
2112
                         SequenceNumber* next_sequence, bool read_only,
2113
                         bool is_retry, bool* corrupted_wal_found,
2114
                         RecoveryContext* recovery_ctx);
2115
2116
  void SetupLogFilesRecovery(
2117
      const std::vector<uint64_t>& wal_numbers,
2118
      std::unordered_map<int, VersionEdit>* version_edits, int* job_id,
2119
      uint64_t* min_wal_number);
2120
2121
  Status ProcessLogFiles(const std::vector<uint64_t>& wal_numbers,
2122
                         bool read_only, bool is_retry, uint64_t min_wal_number,
2123
                         int job_id, SequenceNumber* next_sequence,
2124
                         std::unordered_map<int, VersionEdit>* version_edits,
2125
                         bool* corrupted_wal_found,
2126
                         RecoveryContext* recovery_ctx);
2127
2128
  Status ProcessLogFile(
2129
      uint64_t wal_number, uint64_t min_wal_number, bool is_retry,
2130
      bool read_only, int job_id, SequenceNumber* next_sequence,
2131
      bool* stop_replay_for_corruption, bool* stop_replay_by_wal_filter,
2132
      uint64_t* corrupted_wal_number, bool* corrupted_wal_found,
2133
      std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
2134
      PredecessorWALInfo& predecessor_wal_info);
2135
2136
  void SetupLogFileProcessing(uint64_t wal_number);
2137
2138
  Status InitializeLogReader(uint64_t wal_number, bool is_retry,
2139
                             std::string& fname,
2140
2141
                             bool stop_replay_for_corruption,
2142
                             uint64_t min_wal_number,
2143
                             const PredecessorWALInfo& predecessor_wal_info,
2144
                             bool* const old_log_record,
2145
                             Status* const reporter_status,
2146
                             DBOpenLogRecordReadReporter* reporter,
2147
                             std::unique_ptr<log::Reader>& reader);
2148
  Status ProcessLogRecord(
2149
      Slice record, const std::unique_ptr<log::Reader>& reader,
2150
      const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
2151
      const std::string& fname, bool read_only, int job_id,
2152
      const std::function<void()>& logFileDropped,
2153
      DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum,
2154
      SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence,
2155
      bool* stop_replay_for_corruption, Status* status,
2156
      bool* stop_replay_by_wal_filter,
2157
      std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);
2158
2159
  Status InitializeWriteBatchForLogRecord(
2160
      Slice record, const std::unique_ptr<log::Reader>& reader,
2161
      const UnorderedMap<uint32_t, size_t>& running_ts_sz, WriteBatch* batch,
2162
      std::unique_ptr<WriteBatch>& new_batch, WriteBatch*& batch_to_use,
2163
      uint64_t* record_checksum);
2164
2165
  void MaybeReviseStopReplayForCorruption(
2166
      SequenceNumber sequence, SequenceNumber const* const next_sequence,
2167
      bool* stop_replay_for_corruption);
2168
2169
  Status InsertLogRecordToMemtable(WriteBatch* batch_to_use,
2170
                                   uint64_t wal_number,
2171
                                   SequenceNumber* next_sequence,
2172
                                   bool* has_valid_writes, bool read_only);
2173
2174
  Status MaybeWriteLevel0TableForRecovery(
2175
      bool has_valid_writes, bool read_only, uint64_t wal_number, int job_id,
2176
      SequenceNumber const* const next_sequence,
2177
      std::unordered_map<int, VersionEdit>* version_edits, bool* flushed);
2178
2179
  Status HandleNonOkStatusOrOldLogRecord(
2180
      uint64_t wal_number, SequenceNumber const* const next_sequence,
2181
      Status status, const DBOpenLogRecordReadReporter& reporter,
2182
      bool* old_log_record, bool* stop_replay_for_corruption,
2183
      uint64_t* corrupted_wal_number, bool* corrupted_wal_found);
2184
2185
  Status UpdatePredecessorWALInfo(uint64_t wal_number,
2186
                                  const SequenceNumber last_seqno_observed,
2187
                                  const std::string& fname,
2188
                                  PredecessorWALInfo& predecessor_wal_info);
2189
2190
  void FinishLogFileProcessing(const Status& status,
2191
                               const SequenceNumber* next_sequence);
2192
2193
  // Return `Status::Corruption()` when `stop_replay_for_corruption == true` and
2194
  // exits inconsistency between SST and WAL data
2195
  Status MaybeHandleStopReplayForCorruptionForInconsistency(
2196
      bool stop_replay_for_corruption, uint64_t corrupted_wal_number);
2197
2198
  Status MaybeFlushFinalMemtableOrRestoreActiveLogFiles(
2199
      const std::vector<uint64_t>& wal_numbers, bool read_only, int job_id,
2200
      bool flushed, std::unordered_map<int, VersionEdit>* version_edits,
2201
      RecoveryContext* recovery_ctx);
2202
2203
  // Check that DB sequence number is not set back during recovery between
2204
  // replaying of WAL files and between replaying of WriteBatches.
2205
  Status CheckSeqnoNotSetBackDuringRecovery(SequenceNumber prev_next_seqno,
2206
                                            SequenceNumber current_next_seqno);
2207
2208
  void FinishLogFilesRecovery(int job_id, const Status& status);
2209
  // The following two methods are used to flush a memtable to
2210
  // storage. The first one is used at database RecoveryTime (when the
2211
  // database is opened) and is heavyweight because it holds the mutex
2212
  // for the entire period. The second method WriteLevel0Table supports
2213
  // concurrent flush memtables to storage.
2214
  Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
2215
                                     MemTable* mem, VersionEdit* edit);
2216
2217
  // Get the size of a log file and, if truncate is true, truncate the
2218
  // log file to its actual size, thereby freeing preallocated space.
2219
  // Return success even if truncate fails
2220
  Status GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
2221
                                    WalFileNumberSize* log);
2222
2223
  // Restore alive_wal_files_ and wals_total_size_ after recovery.
2224
  // It needs to run only when there's no flush during recovery
2225
  // (e.g. avoid_flush_during_recovery=true). May also trigger flush
2226
  // in case wals_total_size > max_total_wal_size.
2227
  Status RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers);
2228
2229
  // num_bytes: for slowdown case, delay time is calculated based on
2230
  //            `num_bytes` going through.
2231
  Status DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
2232
                    const WriteOptions& write_options);
2233
2234
  // Begin stalling of writes when memory usage increases beyond a certain
2235
  // threshold.
2236
  void WriteBufferManagerStallWrites();
2237
2238
  Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
2239
                                      WriteBatch* my_batch);
2240
2241
  // REQUIRES: mutex locked and in write thread.
2242
  Status ScheduleFlushes(WriteContext* context);
2243
2244
  void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds);
2245
2246
  Status TrimMemtableHistory(WriteContext* context);
2247
2248
  // Switches the current live memtable to immutable/read-only memtable.
2249
  // A new WAL is created if the current WAL is not empty.
2250
  // If `new_imm` is not nullptr, it will be added as the newest immutable
2251
  // memtable, if and only if OK status is returned.
2252
  // `last_seqno` needs to be provided if `new_imm` is not nullptr. It is
2253
  // the value of versions_->LastSequence() after the write that ingests new_imm
2254
  // is done.
2255
  //
2256
  // REQUIRES: mutex_ is held
2257
  // REQUIRES: this thread is currently at the front of the writer queue
2258
  // REQUIRES: this thread is currently at the front of the 2nd writer queue if
2259
  // two_write_queues_ is true (This is to simplify the reasoning.)
2260
  Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
2261
                        ReadOnlyMemTable* new_imm = nullptr,
2262
                        SequenceNumber last_seqno = 0);
2263
2264
  // Select and output column families qualified for atomic flush in
2265
  // `selected_cfds`. If `provided_candidate_cfds` is non-empty, it will be used
2266
  // as candidate CFs to select qualified ones from. Otherwise, all column
2267
  // families are used as candidate to select from.
2268
  //
2269
  // REQUIRES: mutex held
2270
  void SelectColumnFamiliesForAtomicFlush(
2271
      autovector<ColumnFamilyData*>* selected_cfds,
2272
      const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},
2273
      FlushReason flush_reason = FlushReason::kOthers);
2274
2275
  // Force current memtable contents to be flushed.
2276
  Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
2277
                       FlushReason flush_reason,
2278
                       bool entered_write_thread = false);
2279
2280
  // Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds`
2281
  // (if non-empty) or amomg all column families and atomically record the
2282
  // result to the MANIFEST.
2283
  Status AtomicFlushMemTables(
2284
      const FlushOptions& options, FlushReason flush_reason,
2285
      const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},
2286
      bool entered_write_thread = false);
2287
2288
  Status RetryFlushesForErrorRecovery(FlushReason flush_reason, bool wait);
2289
2290
  // Wait until flushing this column family won't stall writes
2291
  Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
2292
                                           bool* flush_needed);
2293
2294
  // Wait for memtable flushed.
2295
  // If flush_memtable_id is non-null, wait until the memtable with the ID
2296
  // gets flush. Otherwise, wait until the column family don't have any
2297
  // memtable pending flush.
2298
  // resuming_from_bg_err indicates whether the caller is attempting to resume
2299
  // from background error.
2300
  Status WaitForFlushMemTable(
2301
      ColumnFamilyData* cfd, const uint64_t* flush_memtable_id = nullptr,
2302
      bool resuming_from_bg_err = false,
2303
0
      std::optional<FlushReason> flush_reason = std::nullopt) {
2304
0
    return WaitForFlushMemTables({cfd}, {flush_memtable_id},
2305
0
                                 resuming_from_bg_err, flush_reason);
2306
0
  }
2307
  // Wait for memtables to be flushed for multiple column families.
2308
  Status WaitForFlushMemTables(
2309
      const autovector<ColumnFamilyData*>& cfds,
2310
      const autovector<const uint64_t*>& flush_memtable_ids,
2311
      bool resuming_from_bg_err, std::optional<FlushReason> flush_reason);
2312
2313
1.72k
  inline void WaitForPendingWrites() {
2314
1.72k
    mutex_.AssertHeld();
2315
1.72k
    TEST_SYNC_POINT("DBImpl::WaitForPendingWrites:BeforeBlock");
2316
    // In case of pipelined write is enabled, wait for all pending memtable
2317
    // writers.
2318
1.72k
    if (immutable_db_options_.enable_pipelined_write) {
2319
      // Memtable writers may call DB::Get in case max_successive_merges > 0,
2320
      // which may lock mutex. Unlocking mutex here to avoid deadlock.
2321
0
      mutex_.Unlock();
2322
0
      write_thread_.WaitForMemTableWriters();
2323
0
      mutex_.Lock();
2324
0
    }
2325
2326
1.72k
    if (immutable_db_options_.unordered_write) {
2327
      // Wait for the ones who already wrote to the WAL to finish their
2328
      // memtable write.
2329
0
      if (pending_memtable_writes_.load() != 0) {
2330
        // XXX: suspicious wait while holding DB mutex?
2331
0
        std::unique_lock<std::mutex> guard(switch_mutex_);
2332
0
        switch_cv_.wait(guard,
2333
0
                        [&] { return pending_memtable_writes_.load() == 0; });
2334
0
      }
2335
1.72k
    } else {
2336
      // (Writes are finished before the next write group starts.)
2337
1.72k
    }
2338
2339
    // Wait for any LockWAL to clear
2340
1.72k
    while (lock_wal_count_ > 0) {
2341
0
      bg_cv_.Wait();
2342
0
    }
2343
1.72k
  }
2344
2345
  // TaskType is used to identify tasks in thread-pool, currently only
2346
  // differentiate manual compaction, which could be unscheduled from the
2347
  // thread-pool.
2348
  enum class TaskType : uint8_t {
2349
    kDefault = 0,
2350
    kManualCompaction = 1,
2351
    kCount = 2,
2352
  };
2353
2354
  // Task tag is used to identity tasks in thread-pool, which is
2355
  // dbImpl obj address + type
2356
2.02k
  inline void* GetTaskTag(TaskType type) {
2357
2.02k
    return GetTaskTag(static_cast<uint8_t>(type));
2358
2.02k
  }
2359
2360
243k
  inline void* GetTaskTag(uint8_t type) {
2361
243k
    return static_cast<uint8_t*>(static_cast<void*>(this)) + type;
2362
243k
  }
2363
2364
  // REQUIRES: mutex locked and in write thread.
2365
  void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);
2366
2367
  // REQUIRES: mutex locked and in write thread.
2368
  Status SwitchWAL(WriteContext* write_context);
2369
2370
  // REQUIRES: mutex locked and in write thread.
2371
  Status HandleWriteBufferManagerFlush(WriteContext* write_context);
2372
2373
  // REQUIRES: mutex locked
2374
  Status PreprocessWrite(const WriteOptions& write_options,
2375
                         WalContext* log_context, WriteContext* write_context);
2376
2377
  // Merge write batches in the write group into merged_batch.
2378
  // Returns OK if merge is successful.
2379
  // Returns Corruption if corruption in write batch is detected.
2380
  Status MergeBatch(const WriteThread::WriteGroup& write_group,
2381
                    WriteBatch* tmp_batch, WriteBatch** merged_batch,
2382
                    size_t* write_with_wal, WriteBatch** to_be_cached_state);
2383
2384
  IOStatus WriteToWAL(const WriteBatch& merged_batch,
2385
                      const WriteOptions& write_options,
2386
                      log::Writer* log_writer, uint64_t* wal_used,
2387
                      uint64_t* log_size,
2388
                      WalFileNumberSize& wal_file_number_size,
2389
                      SequenceNumber sequence);
2390
2391
  IOStatus WriteGroupToWAL(const WriteThread::WriteGroup& write_group,
2392
                           log::Writer* log_writer, uint64_t* wal_used,
2393
                           bool need_wal_sync, bool need_wal_dir_sync,
2394
                           SequenceNumber sequence,
2395
                           WalFileNumberSize& wal_file_number_size);
2396
2397
  IOStatus ConcurrentWriteGroupToWAL(const WriteThread::WriteGroup& write_group,
2398
                                     uint64_t* wal_used,
2399
                                     SequenceNumber* last_sequence,
2400
                                     size_t seq_inc);
2401
2402
  // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
2403
  // Caller must hold mutex_.
2404
  void WriteStatusCheckOnLocked(const Status& status);
2405
2406
  // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
2407
  void WriteStatusCheck(const Status& status);
2408
2409
  // Used by WriteImpl to update bg_error_ when IO error happens, e.g., write
2410
  // WAL, sync WAL fails, if paranoid check is enabled.
2411
  void WALIOStatusCheck(const IOStatus& status);
2412
2413
  // Used by WriteImpl to update bg_error_ in case of memtable insert error.
2414
  void HandleMemTableInsertFailure(const Status& nonok_memtable_insert_status);
2415
2416
  Status CompactFilesImpl(const CompactionOptions& compact_options,
2417
                          ColumnFamilyData* cfd, Version* version,
2418
                          const std::vector<std::string>& input_file_names,
2419
                          std::vector<std::string>* const output_file_names,
2420
                          const int output_level, int output_path_id,
2421
                          JobContext* job_context, LogBuffer* log_buffer,
2422
                          CompactionJobInfo* compaction_job_info);
2423
2424
  // Helper function to perform trivial move by updating manifest metadata
2425
  // without rewriting data files. This is called when IsTrivialMove() is true.
2426
  // REQUIRES: mutex held
2427
  // Returns: Status of the trivial move operation
2428
  Status PerformTrivialMove(Compaction& c, LogBuffer* log_buffer,
2429
                            bool& compaction_released, size_t& moved_files,
2430
                            size_t& moved_bytes);
2431
2432
  // REQUIRES: mutex unlocked
2433
  void TrackOrUntrackFiles(const std::vector<std::string>& existing_data_files,
2434
                           bool track);
2435
2436
  void MaybeScheduleFlushOrCompaction();
2437
2438
  BackgroundJobPressure CaptureBackgroundJobPressure() const;
2439
  void NotifyOnBackgroundJobPressureChanged();
2440
2441
  struct FlushRequest {
2442
    FlushReason flush_reason;
2443
    // Whether this flush request should use the atomic flush code path.
2444
    bool atomic_flush = false;
2445
    // A map from column family to flush to largest memtable id to persist for
2446
    // each column family. Once all the memtables whose IDs are smaller than or
2447
    // equal to this per-column-family specified value, this flush request is
2448
    // considered to have completed its work of flushing this column family.
2449
    // After completing the work for all column families in this request, this
2450
    // flush is considered complete.
2451
    std::unordered_map<ColumnFamilyData*, uint64_t>
2452
        cfd_to_max_mem_id_to_persist;
2453
2454
#ifndef NDEBUG
2455
    int reschedule_count = 1;
2456
#endif /* !NDEBUG */
2457
  };
2458
2459
  // In case of atomic flush, generates a `FlushRequest` for the latest atomic
2460
  // cuts for these `cfds`. Atomic cuts are recorded in
2461
  // `AssignAtomicFlushSeq()`. For each entry in `cfds`, all CFDs sharing the
2462
  // same latest atomic cut must also be present.
2463
  //
2464
  // REQUIRES: mutex held
2465
  void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
2466
                            FlushReason flush_reason, FlushRequest* req);
2467
2468
  // Below functions are for executing flush, compaction in the background. A
2469
  // dequeue is the communication channel between threads that asks for the work
2470
  // to be done and the available threads in the thread pool that pick it up to
2471
  // execute it. We use these terminologies to describe the state of the work
2472
  // and its transitions:
2473
  // 1) It becomes pending once it's successfully enqueued into the
2474
  //    corresponding dequeue, a work in this state is also called unscheduled.
2475
  //    Counter `unscheduled_*_` counts work in this state.
2476
  // 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*`
2477
  //    for the work, it becomes scheduled
2478
  //    Counter `bg_*_scheduled_` counts work in this state.
2479
  // 3) Once the thread start to execute `BGWork*`, the work is popped from the
2480
  //    dequeue, it is now in running state
2481
  //    Counter `num_running_*_` counts work in this state.
2482
  // 4) Eventually, the work is finished. We don't need to specifically track
2483
  //    finished work.
2484
2485
  // Returns true if `req` is successfully enqueued.
2486
  bool EnqueuePendingFlush(const FlushRequest& req);
2487
2488
  void EnqueuePendingCompaction(ColumnFamilyData* cfd);
2489
  void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2490
                            FileType type, uint64_t number, int job_id);
2491
  static void BGWorkCompaction(void* arg);
2492
  // Runs a pre-chosen universal compaction involving bottom level in a
2493
  // separate, bottom-pri thread pool.
2494
  static void BGWorkBottomCompaction(void* arg);
2495
  static void BGWorkFlush(void* arg);
2496
  static void BGWorkPurge(void* arg);
2497
  static void UnscheduleCompactionCallback(void* arg);
2498
  static void UnscheduleFlushCallback(void* arg);
2499
  void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2500
                                Env::Priority thread_pri);
2501
  void BackgroundCallFlush(Env::Priority thread_pri);
2502
  void BackgroundCallPurge();
2503
  Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
2504
                              LogBuffer* log_buffer,
2505
                              PrepickedCompaction* prepicked_compaction,
2506
                              Env::Priority thread_pri);
2507
  Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
2508
                         LogBuffer* log_buffer, FlushReason* reason,
2509
                         bool* flush_rescheduled_to_retain_udt,
2510
                         Env::Priority thread_pri);
2511
2512
  Compaction* CreateIntendedCompactionForwardedToBottomPriorityPool(
2513
      Compaction* c);
2514
  bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
2515
                               const std::vector<CompactionInputFiles>& inputs,
2516
                               bool* sfm_bookkeeping, LogBuffer* log_buffer);
2517
2518
  // Request compaction tasks token from compaction thread limiter.
2519
  // It always succeeds if force = true or limiter is disable.
2520
  bool RequestCompactionToken(ColumnFamilyData* cfd, bool force,
2521
                              std::unique_ptr<TaskLimiterToken>* token,
2522
                              LogBuffer* log_buffer);
2523
2524
  // Return true if the `FlushRequest` can be rescheduled to retain the UDT.
2525
  // Only true if there are user-defined timestamps in the involved MemTables
2526
  // with newer than cutoff timestamp `full_history_ts_low` and not flushing
2527
  // immediately will not cause entering write stall mode.
2528
  bool ShouldRescheduleFlushRequestToRetainUDT(const FlushRequest& flush_req);
2529
2530
  // Schedule background tasks
2531
  Status StartPeriodicTaskScheduler();
2532
2533
  // Compute the repeat period for the kTriggerCompaction task, which ensures
2534
  // compactions not dependent on writes (flushes) are eventually triggered when
2535
  // there are no writes (flushes). NOT thread safe; only called during DB open
2536
  // (StartPeriodicTaskScheduler). KNOWN LIMITATION: doesn't get updated with
2537
  // dynamic option updates. (Probably not worth the extra complexity.)
2538
  uint64_t ComputeTriggerCompactionPeriod();
2539
2540
  // Cancel scheduled periodic tasks
2541
  Status CancelPeriodicTaskScheduler();
2542
2543
  Status RegisterRecordSeqnoTimeWorker();
2544
2545
  void PrintStatistics();
2546
2547
  size_t EstimateInMemoryStatsHistorySize() const;
2548
2549
  // Return the minimum empty level that could hold the total data in the
2550
  // input level. Return the input level, if such level could not be found.
2551
  int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
2552
                                   const MutableCFOptions& mutable_cf_options,
2553
                                   int level);
2554
2555
  // Move the files in the input level to the target level.
2556
  // If target_level < 0, automatically calculate the minimum level that could
2557
  // hold the data set.
2558
  Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1);
2559
2560
  // helper functions for adding and removing from flush & compaction queues
2561
  void AddToCompactionQueue(ColumnFamilyData* cfd);
2562
  ColumnFamilyData* PopFirstFromCompactionQueue();
2563
  FlushRequest PopFirstFromFlushQueue();
2564
2565
  // Pick the first unthrottled compaction with task token from queue.
2566
  ColumnFamilyData* PickCompactionFromQueue(
2567
      std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);
2568
2569
  IOStatus SyncWalImpl(bool include_current_wal,
2570
                       const WriteOptions& write_options,
2571
                       JobContext* job_context, VersionEdit* synced_wals,
2572
                       bool error_recovery_in_prog);
2573
2574
  // helper function to call after some of the logs_ were synced
2575
  void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit);
2576
  Status ApplyWALToManifest(const ReadOptions& read_options,
2577
                            const WriteOptions& write_options,
2578
                            VersionEdit* edit);
2579
  // WALs with log number up to up_to are not synced successfully.
2580
  void MarkLogsNotSynced(uint64_t up_to);
2581
2582
  SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary,
2583
                                bool lock = true);
2584
2585
  // If snapshot_seq != kMaxSequenceNumber, then this function can only be
2586
  // called from the write thread that publishes sequence numbers to readers.
2587
  // For 1) write-committed, or 2) write-prepared + one-write-queue, this will
2588
  // be the write thread performing memtable writes. For write-prepared with
2589
  // two write queues, this will be the write thread writing commit marker to
2590
  // the WAL.
2591
  // If snapshot_seq == kMaxSequenceNumber, this function is called by a caller
2592
  // ensuring no writes to the database.
2593
  std::pair<Status, std::shared_ptr<const SnapshotImpl>>
2594
  CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
2595
                                bool lock = true);
2596
2597
  uint64_t GetMaxTotalWalSize() const;
2598
2599
  FSDirectory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const;
2600
2601
  Status MaybeReleaseTimestampedSnapshotsAndCheck();
2602
2603
  Status CloseHelper();
2604
2605
  void WaitForBackgroundWork();
2606
2607
  // Background threads call this function, which is just a wrapper around
2608
  // the InstallSuperVersion() function. Background threads carry
2609
  // sv_context to allow allocation of SuperVersion object outside of holding
2610
  // the DB mutex.
2611
  // All ColumnFamily state changes go through this function. Here we analyze
2612
  // the new state and we schedule background work if we detect that the new
2613
  // state needs flush or compaction.
2614
  // See also InstallSuperVersionForConfigChange().
2615
  void InstallSuperVersionAndScheduleWork(
2616
      ColumnFamilyData* cfd, SuperVersionContext* sv_context,
2617
      std::optional<std::shared_ptr<SeqnoToTimeMapping>>
2618
          new_seqno_to_time_mapping = {});
2619
2620
  // A variant of InstallSuperVersionAndScheduleWork() that must be used for
2621
  // new CFs or for changes to mutable_cf_options. This is so that it can
2622
  // update seqno_to_time_mapping cached for the new SuperVersion as relevant.
2623
  void InstallSuperVersionForConfigChange(ColumnFamilyData* cfd,
2624
                                          SuperVersionContext* sv_context);
2625
2626
  bool GetIntPropertyInternal(ColumnFamilyData* cfd,
2627
                              const DBPropertyInfo& property_info,
2628
                              bool is_locked, uint64_t* value);
2629
  bool GetPropertyHandleOptionsStatistics(std::string* value);
2630
2631
  bool HasPendingManualCompaction();
2632
  bool HasExclusiveManualCompaction();
2633
  void AddManualCompaction(ManualCompactionState* m);
2634
  void RemoveManualCompaction(ManualCompactionState* m);
2635
  bool ShouldntRunManualCompaction(ManualCompactionState* m);
2636
  bool HaveManualCompaction(ColumnFamilyData* cfd);
2637
  bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
2638
  void UpdateFIFOCompactionStatus(const std::unique_ptr<Compaction>& c);
2639
2640
  // May open and read table files for table property.
2641
  // Should not be called while holding mutex_.
2642
  void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c,
2643
                              const Status& st,
2644
                              const CompactionJobStats& compaction_job_stats,
2645
                              const int job_id,
2646
                              CompactionJobInfo* compaction_job_info) const;
2647
  // Reserve the next 'num' file numbers for to-be-ingested external SST files,
2648
  // and return the current file_number in 'next_file_number'.
2649
  // Write a version edit to the MANIFEST.
2650
  Status ReserveFileNumbersBeforeIngestion(
2651
      ColumnFamilyData* cfd, uint64_t num,
2652
      std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
2653
      uint64_t* next_file_number);
2654
2655
  bool ShouldPurge(uint64_t file_number) const;
2656
  void MarkAsGrabbedForPurge(uint64_t file_number);
2657
2658
  size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
2659
41.9k
  Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }
2660
2661
  IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num,
2662
                     uint64_t recycle_log_number, size_t preallocate_block_size,
2663
                     const PredecessorWALInfo& predecessor_wal_info,
2664
                     log::Writer** new_log);
2665
2666
  // Validate self-consistency of DB options
2667
  static Status ValidateOptions(const DBOptions& db_options);
2668
  // Validate self-consistency of DB options and its consistency with cf options
2669
  static Status ValidateOptions(
2670
      const DBOptions& db_options,
2671
      const std::vector<ColumnFamilyDescriptor>& column_families);
2672
2673
  // Utility function to do some debug validation and sort the given vector
2674
  // of MultiGet keys
2675
  void PrepareMultiGetKeys(
2676
      const size_t num_keys, bool sorted,
2677
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs);
2678
2679
  void MultiGetCommon(const ReadOptions& options,
2680
                      ColumnFamilyHandle* column_family, const size_t num_keys,
2681
                      const Slice* keys, PinnableSlice* values,
2682
                      PinnableWideColumns* columns, std::string* timestamps,
2683
                      Status* statuses, bool sorted_input);
2684
2685
  void MultiGetCommon(const ReadOptions& options, const size_t num_keys,
2686
                      ColumnFamilyHandle** column_families, const Slice* keys,
2687
                      PinnableSlice* values, PinnableWideColumns* columns,
2688
                      std::string* timestamps, Status* statuses,
2689
                      bool sorted_input);
2690
2691
  // A structure to hold the information required to process MultiGet of keys
2692
  // belonging to one column family. For a multi column family MultiGet, there
2693
  // will be a container of these objects.
2694
  struct MultiGetKeyRangePerCf {
2695
    // For the batched MultiGet which relies on sorted keys, start specifies
2696
    // the index of first key belonging to this column family in the sorted
2697
    // list.
2698
    size_t start;
2699
2700
    // For the batched MultiGet case, num_keys specifies the number of keys
2701
    // belonging to this column family in the sorted list
2702
    size_t num_keys;
2703
2704
0
    MultiGetKeyRangePerCf() : start(0), num_keys(0) {}
2705
2706
    MultiGetKeyRangePerCf(size_t first, size_t count)
2707
0
        : start(first), num_keys(count) {}
2708
  };
2709
2710
  // A structure to contain ColumnFamilyData and the SuperVersion obtained for
2711
  // the consistent view of DB
2712
  struct ColumnFamilySuperVersionPair {
2713
    ColumnFamilyHandleImpl* cfh;
2714
    ColumnFamilyData* cfd;
2715
2716
    // SuperVersion for the column family obtained in a manner that ensures a
2717
    // consistent view across all column families in the DB
2718
    SuperVersion* super_version;
2719
    ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family,
2720
                                 SuperVersion* sv)
2721
0
        : cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)),
2722
0
          cfd(cfh->cfd()),
2723
0
          super_version(sv) {}
2724
2725
    ColumnFamilySuperVersionPair() = default;
2726
  };
2727
2728
  // A common function to obtain a consistent snapshot, which can be implicit
2729
  // if the user doesn't specify a snapshot in read_options, across
2730
  // multiple column families. It will attempt to get an implicit
2731
  // snapshot without acquiring the db_mutes, but will give up after a few
2732
  // tries and acquire the mutex if a memtable flush happens. The template
2733
  // allows both the batched and non-batched MultiGet to call this with
2734
  // either an std::unordered_map or autovector of column families.
2735
  //
2736
  // If callback is non-null, the callback is refreshed with the snapshot
2737
  // sequence number
2738
  //
2739
  // `extra_sv_ref` is used to indicate whether thread-local SuperVersion
2740
  // should be obtained with an extra ref (by GetReferencedSuperVersion()) or
2741
  // not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet
2742
  // does not require SuperVersion to be re-acquired throughout the entire
2743
  // invocation (no need extra ref), while MultiCfIterators may need the
2744
  // SuperVersion to be updated during Refresh() (requires extra ref).
2745
  //
2746
  // `sv_from_thread_local` being set to false indicates that the SuperVersion
2747
  // obtained from the ColumnFamilyData, whereas true indicates they are thread
2748
  // local.
2749
  //
2750
  // A non-OK status will be returned if for a column family that enables
2751
  // user-defined timestamp feature, the specified `ReadOptions.timestamp`
2752
  // attemps to read collapsed history.
2753
  template <class T, typename IterDerefFuncType>
2754
  Status MultiCFSnapshot(const ReadOptions& read_options,
2755
                         ReadCallback* callback,
2756
                         IterDerefFuncType iter_deref_func, T* cf_list,
2757
                         bool extra_sv_ref, SequenceNumber* snapshot,
2758
                         bool* sv_from_thread_local);
2759
2760
  // The actual implementation of the batching MultiGet. The caller is expected
2761
  // to have acquired the SuperVersion and pass in a snapshot sequence number
2762
  // in order to construct the LookupKeys. The start_key and num_keys specify
2763
  // the range of keys in the sorted_keys vector for a single column family.
2764
  Status MultiGetImpl(
2765
      const ReadOptions& read_options, size_t start_key, size_t num_keys,
2766
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2767
      SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback);
2768
2769
  void MultiGetWithCallbackImpl(
2770
      const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2771
      ReadCallback* callback,
2772
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
2773
2774
  Status DisableFileDeletionsWithLock();
2775
2776
  Status IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
2777
                                      std::string ts_low);
2778
2779
  bool ShouldReferenceSuperVersion(const MergeContext& merge_context);
2780
2781
  template <typename IterType, typename ImplType,
2782
            typename ErrorIteratorFuncType>
2783
  std::unique_ptr<IterType> NewMultiCfIterator(
2784
      const ReadOptions& _read_options,
2785
      const std::vector<ColumnFamilyHandle*>& column_families,
2786
      ErrorIteratorFuncType error_iterator_func);
2787
2788
  bool ShouldPickCompaction(bool is_prepicked,
2789
                            const PrepickedCompaction* prepicked_compaction);
2790
2791
  void ResetBottomPriCompactionIntent(ColumnFamilyData* cfd,
2792
                                      std::unique_ptr<Compaction>& c);
2793
  // Lock over the persistent DB state.  Non-nullptr iff successfully acquired.
2794
  FileLock* db_lock_ = nullptr;
2795
2796
  // Guards changes to DB and CF options to ensure consistency between
2797
  // * In-memory options objects
2798
  // * Settings in effect
2799
  // * Options file contents
2800
  // while allowing the DB mutex to be released during slow operations like
2801
  // persisting options file or modifying global periodic task timer.
2802
  // Always acquired *before* DB mutex when this one is applicable.
2803
  InstrumentedMutex options_mutex_;
2804
2805
  // Guards reads and writes to in-memory stats_history_.
2806
  InstrumentedMutex stats_history_mutex_;
2807
2808
  // In addition to mutex_, wal_write_mutex_ protects writes to logs_ and
2809
  // cur_wal_number_. With two_write_queues it also protects alive_wal_files_,
2810
  // and wal_empty_. Refer to the definition of each variable below for more
2811
  // details.
2812
  // Note: to avoid deadlock, if needed to acquire both wal_write_mutex_ and
2813
  // mutex_, the order should be first mutex_ and then wal_write_mutex_.
2814
  InstrumentedMutex wal_write_mutex_;
2815
2816
  // If zero, manual compactions are allowed to proceed. If non-zero, manual
2817
  // compactions may still be running, but will quickly fail with
2818
  // `Status::Incomplete`. The value indicates how many threads have paused
2819
  // manual compactions. It is accessed in read mode outside the DB mutex in
2820
  // compaction code paths.
2821
  std::atomic<int> manual_compaction_paused_ = false;
2822
2823
  // If non-zero, all compaction jobs (background automatic compactions,
2824
  // manual compactions via CompactRange, and foreground CompactFiles calls)
2825
  // are being aborted. Compactions will be signaled to stop. Any new
2826
  // compaction job would fail immediately. The value indicates how many threads
2827
  // have called AbortAllCompactions(). It is accessed in read mode outside the
2828
  // DB mutex in compaction code paths.
2829
  std::atomic<int> compaction_aborted_ = 0;
2830
2831
  // This condition variable is signaled on these conditions:
2832
  // * whenever bg_compaction_scheduled_ goes down to 0
2833
  // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't
2834
  // made any progress
2835
  // * whenever a compaction made any progress
2836
  // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
2837
  // (i.e. whenever a flush is done, even if it didn't make any progress)
2838
  // * whenever there is an error in background purge, flush or compaction
2839
  // * whenever num_running_ingest_file_ goes to 0.
2840
  // * whenever pending_purge_obsolete_files_ goes to 0.
2841
  // * whenever disable_delete_obsolete_files_ goes to 0.
2842
  // * whenever SetOptions successfully updates options.
2843
  // * whenever a column family is dropped.
2844
  InstrumentedCondVar bg_cv_;
2845
2846
  ColumnFamilyHandleImpl* persist_stats_cf_handle_ = nullptr;
2847
2848
  bool persistent_stats_cfd_exists_ = true;
2849
2850
  // Writes are protected by locking both mutex_ and wal_write_mutex_, and reads
2851
  // must be under either mutex_ or wal_write_mutex_. Since after ::Open,
2852
  // cur_wal_number_ is currently updated only in write_thread_, it can be read
2853
  // from the same write_thread_ without any locks.
2854
  uint64_t cur_wal_number_ = 0;
2855
2856
  // Log files that we can recycle. Must be protected by db mutex_.
2857
  std::deque<uint64_t> wal_recycle_files_;
2858
2859
  // The minimum log file number taht can be recycled, if log recycling is
2860
  // enabled. This is used to ensure that log files created by previous
2861
  // instances of the database are not recycled, as we cannot be sure they
2862
  // were created in the recyclable format.
2863
  uint64_t min_wal_number_to_recycle_ = 0;
2864
2865
  // Protected by wal_write_mutex_.
2866
  bool wal_dir_synced_ = false;
2867
2868
  // Without two_write_queues, read and writes to wal_empty_ are protected by
2869
  // mutex_. Since it is currently updated/read only in write_thread_, it can be
2870
  // accessed from the same write_thread_ without any locks. With
2871
  // two_write_queues writes, where it can be updated in different threads,
2872
  // read and writes are protected by wal_write_mutex_ instead. This is to avoid
2873
  // expensive mutex_ lock during WAL write, which update wal_empty_.
2874
  bool wal_empty_ = true;
2875
2876
  // The current WAL file and those that have not been found obsolete from
2877
  // memtable flushes. A WAL not on this list might still be pending writer
2878
  // flush and/or sync and close and might still be in logs_. alive_wal_files_
2879
  // is protected by mutex_ and wal_write_mutex_ with details as follows:
2880
  // 1. read by FindObsoleteFiles() which can be called in either application
2881
  //    thread or RocksDB bg threads, both mutex_ and wal_write_mutex_ are
2882
  //    held.
2883
  // 2. pop_front() by FindObsoleteFiles(), both mutex_ and wal_write_mutex_
2884
  //    are held.
2885
  // 3. push_back() by DBImpl::Open() and DBImpl::RestoreAliveLogFiles()
2886
  //    (actually called by Open()), only mutex_ is held because at this point,
2887
  //    the DB::Open() call has not returned success to application, and the
2888
  //    only other thread(s) that can conflict are bg threads calling
2889
  //    FindObsoleteFiles() which ensure that both mutex_ and wal_write_mutex_
2890
  //    are held when accessing alive_wal_files_.
2891
  // 4. read by DBImpl::Open() is protected by mutex_.
2892
  // 5. push_back() by SwitchMemtable(). Both mutex_ and wal_write_mutex_ are
2893
  //    held. This is done by the write group leader. Note that in the case of
2894
  //    two-write-queues, another WAL-only write thread can be writing to the
2895
  //    WAL concurrently. See 9.
2896
  // 6. read by SwitchWAL() with both mutex_ and wal_write_mutex_ held. This is
2897
  //    done by write group leader.
2898
  // 7. read by ConcurrentWriteToWAL() by the write group leader in the case of
2899
  //    two-write-queues. Only wal_write_mutex_ is held to protect concurrent
2900
  //    pop_front() by FindObsoleteFiles().
2901
  // 8. read by PreprocessWrite() by the write group leader. wal_write_mutex_
2902
  //    is held to protect the data structure from concurrent pop_front() by
2903
  //    FindObsoleteFiles().
2904
  // 9. read by ConcurrentWriteToWAL() by a WAL-only write thread in the case
2905
  //    of two-write-queues. Only wal_write_mutex_ is held. This suffices to
2906
  //    protect the data structure from concurrent push_back() by current
2907
  //    write group leader as well as pop_front() by FindObsoleteFiles().
2908
  std::deque<WalFileNumberSize> alive_wal_files_;
2909
2910
  // Total size of all "alive" WALs (for easy access without synchronization)
2911
  RelaxedAtomic<uint64_t> wals_total_size_{0};
2912
2913
  // Log files that aren't fully synced, and the current log file.
2914
  // Synchronization:
2915
  // 1. read by FindObsoleteFiles() which can be called either in application
2916
  //    thread or RocksDB bg threads. wal_write_mutex_ is always held, while
2917
  //    some reads are performed without mutex_.
2918
  // 2. pop_front() by FindObsoleteFiles() with only wal_write_mutex_ held.
2919
  // 3. read by DBImpl::Open() with both mutex_ and wal_write_mutex_.
2920
  // 4. emplace_back() by DBImpl::Open() with both mutex_ and wal_write_mutex.
2921
  //    Note that at this point, DB::Open() has not returned success to
2922
  //    application, thus the only other thread(s) that can conflict are bg
2923
  //    threads calling FindObsoleteFiles(). See 1.
2924
  // 5. iteration and clear() from CloseHelper() always hold wal_write_mutex
2925
  //    and mutex_.
2926
  // 6. back() called by APIs FlushWAL() and LockWAL() are protected by only
2927
  //    wal_write_mutex_. These two can be called by application threads after
2928
  //    DB::Open() returns success to applications.
2929
  // 7. read by SyncWAL(), another API, protected by only wal_write_mutex_.
2930
  // 8. read by MarkLogsNotSynced() and MarkLogsSynced() are protected by
2931
  //    wal_write_mutex_.
2932
  // 9. erase() by MarkLogsSynced() protected by wal_write_mutex_.
2933
  // 10. read by SyncClosedWals() protected by only wal_write_mutex_. This can
2934
  //     happen in bg flush threads after DB::Open() returns success to
2935
  //     applications.
2936
  // 11. reads, e.g. front(), iteration, and back() called by PreprocessWrite()
2937
  //     holds only the wal_write_mutex_. This is done by the write group
2938
  //     leader. A bg thread calling FindObsoleteFiles() or MarkLogsSynced()
2939
  //     can happen concurrently. This is fine because wal_write_mutex_ is used
2940
  //     by all parties. See 2, 5, 9.
2941
  // 12. reads, empty(), back() called by SwitchMemtable() hold both mutex_ and
2942
  //     wal_write_mutex_. This happens in the write group leader.
2943
  // 13. emplace_back() by SwitchMemtable() hold both mutex_ and
2944
  //     wal_write_mutex_. This happens in the write group leader. Can conflict
2945
  //     with bg threads calling FindObsoleteFiles(), MarkLogsSynced(),
2946
  //     SyncClosedWals(), etc. as well as application threads calling
2947
  //     FlushWAL(), SyncWAL(), LockWAL(). This is fine because all parties
2948
  //     require at least wal_write_mutex_.
2949
  // 14. iteration called in WriteToWAL(write_group) protected by
2950
  //     wal_write_mutex_. This is done by write group leader when
2951
  //     two-write-queues is disabled and write needs to sync logs.
2952
  // 15. back() called in ConcurrentWriteToWAL() protected by wal_write_mutex_.
2953
  //     This can be done by the write group leader if two-write-queues is
2954
  //     enabled. It can also be done by another WAL-only write thread.
2955
  //
2956
  // Other observations:
2957
  //  - back() and items with getting_synced=true are not popped,
2958
  //  - The same thread that sets getting_synced=true will reset it.
2959
  //  - it follows that the object referred by back() can be safely read from
2960
  //  the write_thread_ without using mutex. Note that calling back() without
2961
  //  mutex may be unsafe because different implementations of deque::back() may
2962
  //  access other member variables of deque, causing undefined behaviors.
2963
  //  Generally, do not access stl containers without proper synchronization.
2964
  //  - it follows that the items with getting_synced=true can be safely read
2965
  //  from the same thread that has set getting_synced=true
2966
  std::deque<LogWriterNumber> logs_;
2967
2968
  // Signaled when getting_synced becomes false for some of the logs_.
2969
  InstrumentedCondVar wal_sync_cv_;
2970
  // This is the app-level state that is written to the WAL but will be used
2971
  // only during recovery. Using this feature enables not writing the state to
2972
  // memtable on normal writes and hence improving the throughput. Each new
2973
  // write of the state will replace the previous state entirely even if the
2974
  // keys in the two consecutive states do not overlap.
2975
  // It is protected by wal_write_mutex_ when two_write_queues_ is enabled.
2976
  // Otherwise only the heaad of write_thread_ can access it.
2977
  WriteBatch cached_recoverable_state_;
2978
  std::atomic<bool> cached_recoverable_state_empty_ = {true};
2979
2980
  // If this is non-empty, we need to delete these log files in background
2981
  // threads. Protected by wal_write_mutex_.
2982
  autovector<log::Writer*> wals_to_free_;
2983
2984
  bool is_snapshot_supported_ = true;
2985
2986
  std::map<uint64_t, std::map<std::string, uint64_t>> stats_history_;
2987
2988
  std::map<std::string, uint64_t> stats_slice_;
2989
2990
  bool stats_slice_initialized_ = false;
2991
2992
  Directories directories_;
2993
2994
  WriteBufferManager* write_buffer_manager_;
2995
2996
  WriteThread write_thread_;
2997
  WriteBatch tmp_batch_;
2998
  // The write thread when the writers have no memtable write. This will be used
2999
  // in 2PC to batch the prepares separately from the serial commit.
3000
  WriteThread nonmem_write_thread_;
3001
3002
  WriteController write_controller_;
3003
3004
  // Size of the last batch group. In slowdown mode, next write needs to
3005
  // sleep if it uses up the quota.
3006
  // Note: This is to protect memtable and compaction. If the batch only writes
3007
  // to the WAL its size need not to be included in this.
3008
  uint64_t last_batch_group_size_ = 0;
3009
3010
  FlushScheduler flush_scheduler_;
3011
3012
  TrimHistoryScheduler trim_history_scheduler_;
3013
3014
  SnapshotList snapshots_;
3015
3016
  TimestampedSnapshotList timestamped_snapshots_;
3017
3018
  // For each background job, pending_outputs_ keeps the current file number at
3019
  // the time that background job started.
3020
  // FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has
3021
  // number bigger than any of the file number in pending_outputs_. Since file
3022
  // numbers grow monotonically, this also means that pending_outputs_ is always
3023
  // sorted. After a background job is done executing, its file number is
3024
  // deleted from pending_outputs_, which allows PurgeObsoleteFiles() to clean
3025
  // it up.
3026
  // State is protected with db mutex.
3027
  std::list<uint64_t> pending_outputs_;
3028
3029
  // Similar to pending_outputs_, FindObsoleteFiles()/PurgeObsoleteFiles() never
3030
  // deletes any OPTIONS file that has number bigger than any of the file number
3031
  // in min_options_file_numbers_.
3032
  std::list<uint64_t> min_options_file_numbers_;
3033
3034
  // flush_queue_ and compaction_queue_ hold column families that we need to
3035
  // flush and compact, respectively.
3036
  // A column family is inserted into flush_queue_ when it satisfies condition
3037
  // cfd->imm()->IsFlushPending()
3038
  // A column family is inserted into compaction_queue_ when it satisfied
3039
  // condition cfd->NeedsCompaction()
3040
  // Column families in this list are all Ref()-erenced
3041
  // TODO(icanadi) Provide some kind of ReferencedColumnFamily class that will
3042
  // do RAII on ColumnFamilyData
3043
  // Column families are in this queue when they need to be flushed or
3044
  // compacted. Consumers of these queues are flush and compaction threads. When
3045
  // column family is put on this queue, we increase unscheduled_flushes_ and
3046
  // unscheduled_compactions_. When these variables are bigger than zero, that
3047
  // means we need to schedule background threads for flush and compaction.
3048
  // Once the background threads are scheduled, we decrease unscheduled_flushes_
3049
  // and unscheduled_compactions_. That way we keep track of number of
3050
  // compaction and flush threads we need to schedule. This scheduling is done
3051
  // in MaybeScheduleFlushOrCompaction()
3052
  // invariant(column family present in flush_queue_ <==>
3053
  // ColumnFamilyData::pending_flush_ == true)
3054
  std::deque<FlushRequest> flush_queue_;
3055
  // invariant(column family present in compaction_queue_ <==>
3056
  // ColumnFamilyData::pending_compaction_ == true)
3057
  std::deque<ColumnFamilyData*> compaction_queue_;
3058
3059
  // A map to store file numbers and filenames of the files to be purged
3060
  std::unordered_map<uint64_t, PurgeFileInfo> purge_files_;
3061
3062
  // A vector to store the file numbers that have been assigned to certain
3063
  // JobContext. Current implementation tracks table and blob files only.
3064
  std::unordered_set<uint64_t> files_grabbed_for_purge_;
3065
3066
  // A queue to store log writers to close. Protected by db mutex_.
3067
  std::deque<log::Writer*> wals_to_free_queue_;
3068
3069
  std::deque<SuperVersion*> superversions_to_free_queue_;
3070
3071
  int unscheduled_flushes_ = 0;
3072
3073
  int unscheduled_compactions_ = 0;
3074
3075
  // count how many background compactions are running or have been scheduled in
3076
  // the BOTTOM pool
3077
  int bg_bottom_compaction_scheduled_ = 0;
3078
3079
  // count how many background compactions are running or have been scheduled
3080
  int bg_compaction_scheduled_ = 0;
3081
3082
  // stores the number of compactions are currently running
3083
  int num_running_compactions_ = 0;
3084
3085
  // stores the number of BOTTOM-priority compactions currently running
3086
  int num_running_bottom_compactions_ = 0;
3087
3088
  // number of background memtable flush jobs, submitted to the HIGH pool
3089
  int bg_flush_scheduled_ = 0;
3090
3091
  // stores the number of flushes are currently running
3092
  int num_running_flushes_ = 0;
3093
3094
  // number of background obsolete file purge jobs, submitted to the HIGH pool
3095
  int bg_purge_scheduled_ = 0;
3096
3097
  // number of pressure callbacks currently in progress (for destructor safety)
3098
  int bg_pressure_callback_in_progress_ = 0;
3099
3100
  enum class AsyncFileOpenState : uint8_t {
3101
    kNotScheduled = 0,  // Async file opening has not been scheduled.
3102
    kScheduled,         // Async file opening is in-flight in the HIGH pool.
3103
    kComplete,          // Async file opening has finished (or was not needed).
3104
  };
3105
3106
  // Tracks whether background async file opening has been scheduled/completed.
3107
  AsyncFileOpenState bg_async_file_open_state_ =
3108
      AsyncFileOpenState::kNotScheduled;
3109
3110
  std::deque<ManualCompactionState*> manual_compaction_dequeue_;
3111
3112
  // shall we disable deletion of obsolete files
3113
  // if 0 the deletion is enabled.
3114
  // if non-zero, files will not be getting deleted
3115
  // This enables two different threads to call
3116
  // EnableFileDeletions() and DisableFileDeletions()
3117
  // without any synchronization
3118
  int disable_delete_obsolete_files_ = 0;
3119
3120
  // Number of times FindObsoleteFiles has found deletable files and the
3121
  // corresponding call to PurgeObsoleteFiles has not yet finished.
3122
  int pending_purge_obsolete_files_ = 0;
3123
3124
  // last time when DeleteObsoleteFiles with full scan was executed. Originally
3125
  // initialized with startup time.
3126
  uint64_t delete_obsolete_files_last_run_;
3127
3128
  // The thread that wants to switch memtable, can wait on this cv until the
3129
  // pending writes to memtable finishes.
3130
  std::condition_variable switch_cv_;
3131
  // The mutex used by switch_cv_. mutex_ should be acquired beforehand.
3132
  std::mutex switch_mutex_;
3133
  // Number of threads intending to write to memtable
3134
  std::atomic<size_t> pending_memtable_writes_{0};
3135
3136
  // A flag indicating whether the current rocksdb database has any
3137
  // data that is not yet persisted into either WAL or SST file.
3138
  // Used when disableWAL is true.
3139
  std::atomic<bool> has_unpersisted_data_{false};
3140
3141
  // if an attempt was made to flush all column families that
3142
  // the oldest log depends on but uncommitted data in the oldest
3143
  // log prevents the log from being released.
3144
  // We must attempt to free the dependent memtables again
3145
  // at a later time after the transaction in the oldest
3146
  // log is fully commited.
3147
  bool unable_to_release_oldest_log_{false};
3148
3149
  // Number of running IngestExternalFile() or CreateColumnFamilyWithImport()
3150
  // calls.
3151
  // REQUIRES: mutex held
3152
  int num_running_ingest_file_ = 0;
3153
3154
  WalManager wal_manager_;
3155
3156
  // A value of > 0 temporarily disables scheduling of background work
3157
  int bg_work_paused_ = 0;
3158
3159
  // A value of > 0 temporarily disables scheduling of background compaction
3160
  int bg_compaction_paused_ = 0;
3161
3162
  // Guard against multiple concurrent refitting
3163
  bool refitting_level_ = false;
3164
3165
  // The min threshold to triggere bottommost compaction for removing
3166
  // garbages, among all column families.
3167
  SequenceNumber bottommost_files_mark_threshold_ = kMaxSequenceNumber;
3168
3169
  // The min threshold to trigger compactions for standalone range deletion
3170
  // files that are marked for compaction.
3171
  SequenceNumber standalone_range_deletion_files_mark_threshold_ =
3172
      kMaxSequenceNumber;
3173
3174
  LogsWithPrepTracker logs_with_prep_tracker_;
3175
3176
  // Callback for compaction to check if a key is visible to a snapshot.
3177
  // REQUIRES: mutex held
3178
  std::unique_ptr<SnapshotChecker> snapshot_checker_;
3179
3180
  // Callback for when the cached_recoverable_state_ is written to memtable
3181
  // Only to be set during initialization
3182
  std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
3183
3184
  // Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog().
3185
  // Currently, internally it has a global timer instance for running the tasks.
3186
  PeriodicTaskScheduler periodic_task_scheduler_;
3187
3188
  // It contains the implementations for each periodic task.
3189
  std::map<PeriodicTaskType, const PeriodicTaskFunc> periodic_task_functions_;
3190
3191
  // When set, we use a separate queue for writes that don't write to memtable.
3192
  // In 2PC these are the writes at Prepare phase.
3193
  const bool two_write_queues_;
3194
  const bool manual_wal_flush_;
3195
3196
  // LastSequence also indicates last published sequence visibile to the
3197
  // readers. Otherwise LastPublishedSequence should be used.
3198
  const bool last_seq_same_as_publish_seq_;
3199
  // It indicates that a customized gc algorithm must be used for
3200
  // flush/compaction and if it is not provided vis SnapshotChecker, we should
3201
  // disable gc to be safe.
3202
  const bool use_custom_gc_;
3203
  // Flag to indicate that the DB instance shutdown has been initiated. This
3204
  // different from shutting_down_ atomic in that it is set at the beginning
3205
  // of shutdown sequence, specifically in order to prevent any background
3206
  // error recovery from going on in parallel. The latter, shutting_down_,
3207
  // is set a little later during the shutdown after scheduling memtable
3208
  // flushes
3209
  std::atomic<bool> shutdown_initiated_{false};
3210
  // Flag to indicate whether sst_file_manager object was allocated in
3211
  // DB::Open() or passed to us
3212
  bool own_sfm_;
3213
3214
  // Flag to check whether Close() has been called on this DB
3215
  bool closed_ = false;
3216
  // save the closing status, for re-calling the close()
3217
  Status closing_status_;
3218
  // mutex for DB::Close()
3219
  InstrumentedMutex closing_mutex_;
3220
3221
  // Conditional variable to coordinate installation of atomic flush results.
3222
  // With atomic flush, each bg thread installs the result of flushing multiple
3223
  // column families, and different threads can flush different column
3224
  // families. It's difficult to rely on one thread to perform batch
3225
  // installation for all threads. This is different from the non-atomic flush
3226
  // case.
3227
  // atomic_flush_install_cv_ makes sure that threads install atomic flush
3228
  // results sequentially. Flush results of memtables with lower IDs get
3229
  // installed to MANIFEST first.
3230
  InstrumentedCondVar atomic_flush_install_cv_;
3231
3232
  bool wal_in_db_path_ = false;
3233
  std::atomic<uint64_t> max_total_wal_size_;
3234
3235
  BlobFileCompletionCallback blob_callback_;
3236
3237
  // Pointer to WriteBufferManager stalling interface.
3238
  std::unique_ptr<StallInterface> wbm_stall_;
3239
3240
  // seqno_to_time_mapping_ stores the sequence number to time mapping, it's not
3241
  // thread safe, both read and write need db mutex hold.
3242
  SeqnoToTimeMapping seqno_to_time_mapping_;
3243
3244
  // Stop write token that is acquired when first LockWAL() is called.
3245
  // Destroyed when last UnlockWAL() is called. Controlled by DB mutex.
3246
  // See lock_wal_count_
3247
  std::unique_ptr<WriteControllerToken> lock_wal_write_token_;
3248
3249
  // The number of LockWAL called without matching UnlockWAL call.
3250
  // See also lock_wal_write_token_
3251
  uint32_t lock_wal_count_ = 0;
3252
};
3253
3254
class GetWithTimestampReadCallback : public ReadCallback {
3255
 public:
3256
  explicit GetWithTimestampReadCallback(SequenceNumber seq)
3257
16.6k
      : ReadCallback(seq) {}
3258
0
  bool IsVisibleFullCheck(SequenceNumber seq) override {
3259
0
    return seq <= max_visible_seq_;
3260
0
  }
3261
};
3262
3263
Options SanitizeOptions(const std::string& db, const Options& src,
3264
                        bool read_only = false,
3265
                        Status* logger_creation_s = nullptr);
3266
3267
DBOptions SanitizeOptions(const std::string& db, const DBOptions& src,
3268
                          bool read_only = false,
3269
                          Status* logger_creation_s = nullptr);
3270
3271
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions,
3272
                                    const MutableCFOptions& mutable_cf_options);
3273
3274
// Return a VersionEdit for the DB's recovery when the `memtables` of the
3275
// specified column family are obsolete. Specifically, the min log number to
3276
// keep, and the WAL files that can be deleted.
3277
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
3278
    VersionSet* vset, const ColumnFamilyData& cfd,
3279
    const autovector<VersionEdit*>& edit_list,
3280
    const autovector<ReadOnlyMemTable*>& memtables,
3281
    LogsWithPrepTracker* prep_tracker);
3282
3283
// Return the earliest log file to keep after the memtable flush is
3284
// finalized.
3285
// `cfd_to_flush` is the column family whose memtable (specified in
3286
// `memtables_to_flush`) will be flushed and thus will not depend on any WAL
3287
// file.
3288
// The function is only applicable to 2pc mode.
3289
uint64_t PrecomputeMinLogNumberToKeep2PC(
3290
    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
3291
    const autovector<VersionEdit*>& edit_list,
3292
    const autovector<ReadOnlyMemTable*>& memtables_to_flush,
3293
    LogsWithPrepTracker* prep_tracker);
3294
// For atomic flush.
3295
uint64_t PrecomputeMinLogNumberToKeep2PC(
3296
    VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
3297
    const autovector<autovector<VersionEdit*>>& edit_lists,
3298
    const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush,
3299
    LogsWithPrepTracker* prep_tracker);
3300
3301
// In non-2PC mode, WALs with log number < the returned number can be
3302
// deleted after the cfd_to_flush column family is flushed successfully.
3303
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
3304
    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
3305
    const autovector<VersionEdit*>& edit_list);
3306
// For atomic flush.
3307
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
3308
    VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
3309
    const autovector<autovector<VersionEdit*>>& edit_lists);
3310
3311
// `cfd_to_flush` is the column family whose memtable will be flushed and thus
3312
// will not depend on any WAL file. nullptr means no memtable is being flushed.
3313
// The function is only applicable to 2pc mode.
3314
uint64_t FindMinPrepLogReferencedByMemTable(
3315
    VersionSet* vset, const autovector<ReadOnlyMemTable*>& memtables_to_flush);
3316
// For atomic flush.
3317
uint64_t FindMinPrepLogReferencedByMemTable(
3318
    VersionSet* vset,
3319
    const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush);
3320
3321
// Fix user-supplied options to be reasonable
3322
template <class T, class V>
3323
127k
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
3324
127k
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
3325
127k
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
3326
127k
}
column_family.cc:void rocksdb::ClipToRange<unsigned long, unsigned long>(unsigned long*, unsigned long, unsigned long)
Line
Count
Source
3323
127k
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
3324
127k
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
3325
127k
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
3326
127k
}
Unexecuted instantiation: db_impl_open.cc:void rocksdb::ClipToRange<int, int>(int*, int, int)
3327
3328
inline Status DBImpl::FailIfCfHasTs(
3329
523k
    const ColumnFamilyHandle* column_family) const {
3330
523k
  if (!column_family) {
3331
0
    return Status::InvalidArgument("column family handle cannot be null");
3332
0
  }
3333
523k
  assert(column_family);
3334
523k
  const Comparator* const ucmp = column_family->GetComparator();
3335
523k
  assert(ucmp);
3336
523k
  if (ucmp->timestamp_size() > 0) {
3337
0
    std::ostringstream oss;
3338
0
    oss << "cannot call this method on column family "
3339
0
        << column_family->GetName() << " that enables timestamp";
3340
0
    return Status::InvalidArgument(oss.str());
3341
0
  }
3342
523k
  return Status::OK();
3343
523k
}
3344
3345
inline Status DBImpl::FailIfTsMismatchCf(ColumnFamilyHandle* column_family,
3346
0
                                         const Slice& ts) const {
3347
0
  if (!column_family) {
3348
0
    return Status::InvalidArgument("column family handle cannot be null");
3349
0
  }
3350
0
  assert(column_family);
3351
0
  const Comparator* const ucmp = column_family->GetComparator();
3352
0
  assert(ucmp);
3353
0
  if (0 == ucmp->timestamp_size()) {
3354
0
    std::stringstream oss;
3355
0
    oss << "cannot call this method on column family "
3356
0
        << column_family->GetName() << " that does not enable timestamp";
3357
0
    return Status::InvalidArgument(oss.str());
3358
0
  }
3359
0
  const size_t ts_sz = ts.size();
3360
0
  if (ts_sz != ucmp->timestamp_size()) {
3361
0
    std::stringstream oss;
3362
0
    oss << "Timestamp sizes mismatch: expect " << ucmp->timestamp_size() << ", "
3363
0
        << ts_sz << " given";
3364
0
    return Status::InvalidArgument(oss.str());
3365
0
  }
3366
0
  return Status::OK();
3367
0
}
3368
3369
inline Status DBImpl::FailIfReadCollapsedHistory(const ColumnFamilyData* cfd,
3370
                                                 const SuperVersion* sv,
3371
0
                                                 const Slice& ts) const {
3372
  // Reaching to this point means the timestamp size matching sanity check in
3373
  // `DBImpl::FailIfTsMismatchCf` already passed. So we skip that and assume
3374
  // column family has the same user-defined timestamp format as `ts`.
3375
0
  const Comparator* const ucmp = cfd->user_comparator();
3376
0
  assert(ucmp);
3377
0
  const std::string& full_history_ts_low = sv->full_history_ts_low;
3378
0
  assert(full_history_ts_low.empty() ||
3379
0
         full_history_ts_low.size() == ts.size());
3380
0
  if (!full_history_ts_low.empty() &&
3381
0
      ucmp->CompareTimestamp(ts, full_history_ts_low) < 0) {
3382
0
    std::stringstream oss;
3383
0
    oss << "Read timestamp: " << ucmp->TimestampToString(ts)
3384
0
        << " is smaller than full_history_ts_low: "
3385
0
        << ucmp->TimestampToString(full_history_ts_low) << std::endl;
3386
0
    return Status::InvalidArgument(oss.str());
3387
0
  }
3388
0
  return Status::OK();
3389
0
}
3390
}  // namespace ROCKSDB_NAMESPACE