Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/db_impl/db_impl.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#pragma once
10
11
#include <atomic>
12
#include <cstdint>
13
#include <deque>
14
#include <functional>
15
#include <limits>
16
#include <list>
17
#include <map>
18
#include <set>
19
#include <string>
20
#include <unordered_map>
21
#include <utility>
22
#include <vector>
23
24
#include "db/column_family.h"
25
#include "db/compaction/compaction_iterator.h"
26
#include "db/compaction/compaction_job.h"
27
#include "db/error_handler.h"
28
#include "db/event_helpers.h"
29
#include "db/external_sst_file_ingestion_job.h"
30
#include "db/flush_job.h"
31
#include "db/flush_scheduler.h"
32
#include "db/import_column_family_job.h"
33
#include "db/internal_stats.h"
34
#include "db/log_writer.h"
35
#include "db/logs_with_prep_tracker.h"
36
#include "db/memtable_list.h"
37
#include "db/periodic_task_scheduler.h"
38
#include "db/post_memtable_callback.h"
39
#include "db/pre_release_callback.h"
40
#include "db/range_del_aggregator.h"
41
#include "db/read_callback.h"
42
#include "db/seqno_to_time_mapping.h"
43
#include "db/snapshot_checker.h"
44
#include "db/snapshot_impl.h"
45
#include "db/trim_history_scheduler.h"
46
#include "db/version_edit.h"
47
#include "db/wal_manager.h"
48
#include "db/write_controller.h"
49
#include "db/write_thread.h"
50
#include "logging/event_logger.h"
51
#include "monitoring/instrumented_mutex.h"
52
#include "options/db_options.h"
53
#include "port/port.h"
54
#include "rocksdb/attribute_groups.h"
55
#include "rocksdb/db.h"
56
#include "rocksdb/env.h"
57
#include "rocksdb/memtablerep.h"
58
#include "rocksdb/status.h"
59
#include "rocksdb/trace_reader_writer.h"
60
#include "rocksdb/transaction_log.h"
61
#include "rocksdb/user_write_callback.h"
62
#include "rocksdb/utilities/replayer.h"
63
#include "rocksdb/write_buffer_manager.h"
64
#include "table/merging_iterator.h"
65
#include "util/autovector.h"
66
#include "util/hash.h"
67
#include "util/repeatable_thread.h"
68
#include "util/stop_watch.h"
69
#include "util/thread_local.h"
70
71
namespace ROCKSDB_NAMESPACE {
72
73
class Arena;
74
class ArenaWrappedDBIter;
75
class InMemoryStatsHistoryIterator;
76
class MemTable;
77
class PersistentStatsHistoryIterator;
78
class TableCache;
79
class TaskLimiterToken;
80
class Version;
81
class VersionEdit;
82
class VersionSet;
83
class WriteCallback;
84
struct JobContext;
85
struct ExternalSstFileInfo;
86
struct MemTableInfo;
87
88
// Class to maintain directories for all database paths other than main one.
89
class Directories {
90
 public:
91
  IOStatus SetDirectories(FileSystem* fs, const std::string& dbname,
92
                          const std::string& wal_dir,
93
                          const std::vector<DbPath>& data_paths);
94
95
0
  FSDirectory* GetDataDir(size_t path_id) const {
96
0
    assert(path_id < data_dirs_.size());
97
0
    FSDirectory* ret_dir = data_dirs_[path_id].get();
98
0
    if (ret_dir == nullptr) {
99
      // Should use db_dir_
100
0
      return db_dir_.get();
101
0
    }
102
0
    return ret_dir;
103
0
  }
104
105
0
  FSDirectory* GetWalDir() {
106
0
    if (wal_dir_) {
107
0
      return wal_dir_.get();
108
0
    }
109
0
    return db_dir_.get();
110
0
  }
111
112
28.5k
  FSDirectory* GetDbDir() { return db_dir_.get(); }
113
114
11.0k
  IOStatus Close(const IOOptions& options, IODebugContext* dbg) {
115
    // close all directories for all database paths
116
11.0k
    IOStatus s = IOStatus::OK();
117
118
    // The default implementation for Close() in Directory/FSDirectory class
119
    // "NotSupported" status, the upper level interface should be able to
120
    // handle this error so that Close() does not fail after upgrading when
121
    // run on FileSystems that have not implemented `Directory::Close()` or
122
    // `FSDirectory::Close()` yet
123
124
11.0k
    if (db_dir_) {
125
11.0k
      IOStatus temp_s = db_dir_->Close(options, dbg);
126
11.0k
      if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) {
127
0
        s = std::move(temp_s);
128
0
      }
129
11.0k
    }
130
131
    // Attempt to close everything even if one fails
132
11.0k
    s.PermitUncheckedError();
133
134
11.0k
    if (wal_dir_) {
135
0
      IOStatus temp_s = wal_dir_->Close(options, dbg);
136
0
      if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) {
137
0
        s = std::move(temp_s);
138
0
      }
139
0
    }
140
141
11.0k
    s.PermitUncheckedError();
142
143
11.0k
    for (auto& data_dir_ptr : data_dirs_) {
144
11.0k
      if (data_dir_ptr) {
145
0
        IOStatus temp_s = data_dir_ptr->Close(options, dbg);
146
0
        if (!temp_s.ok() && !temp_s.IsNotSupported() && s.ok()) {
147
0
          s = std::move(temp_s);
148
0
        }
149
0
      }
150
11.0k
    }
151
152
    // Ready for caller
153
11.0k
    s.MustCheck();
154
11.0k
    return s;
155
11.0k
  }
156
157
 private:
158
  std::unique_ptr<FSDirectory> db_dir_;
159
  std::vector<std::unique_ptr<FSDirectory>> data_dirs_;
160
  std::unique_ptr<FSDirectory> wal_dir_;
161
};
162
163
// While DB is the public interface of RocksDB, and DBImpl is the actual
164
// class implementing it. It's the entrance of the core RocksdB engine.
165
// All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a
166
// DBImpl internally.
167
// Other than functions implementing the DB interface, some public
168
// functions are there for other internal components to call. For
169
// example, TransactionDB directly calls DBImpl::WriteImpl() and
170
// BlobDB directly calls DBImpl::GetImpl(). Some other functions
171
// are for sub-components to call. For example, ColumnFamilyHandleImpl
172
// calls DBImpl::FindObsoleteFiles().
173
//
174
// Since it's a very large class, the definition of the functions is
175
// divided in several db_impl_*.cc files, besides db_impl.cc.
176
class DBImpl : public DB {
177
 public:
178
  DBImpl(const DBOptions& options, const std::string& dbname,
179
         const bool seq_per_batch = false, const bool batch_per_txn = true,
180
         bool read_only = false);
181
  // No copying allowed
182
  DBImpl(const DBImpl&) = delete;
183
  void operator=(const DBImpl&) = delete;
184
185
  virtual ~DBImpl();
186
187
  // ---- Implementations of the DB interface ----
188
189
  using DB::Resume;
190
  Status Resume() override;
191
192
  using DB::Put;
193
  Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
194
             const Slice& key, const Slice& value) override;
195
  Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
196
             const Slice& key, const Slice& ts, const Slice& value) override;
197
198
  using DB::PutEntity;
199
  Status PutEntity(const WriteOptions& options,
200
                   ColumnFamilyHandle* column_family, const Slice& key,
201
                   const WideColumns& columns) override;
202
  Status PutEntity(const WriteOptions& options, const Slice& key,
203
                   const AttributeGroups& attribute_groups) override;
204
205
  using DB::Merge;
206
  Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
207
               const Slice& key, const Slice& value) override;
208
  Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
209
               const Slice& key, const Slice& ts, const Slice& value) override;
210
211
  using DB::Delete;
212
  Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
213
                const Slice& key) override;
214
  Status Delete(const WriteOptions& options, ColumnFamilyHandle* column_family,
215
                const Slice& key, const Slice& ts) override;
216
217
  using DB::SingleDelete;
218
  Status SingleDelete(const WriteOptions& options,
219
                      ColumnFamilyHandle* column_family,
220
                      const Slice& key) override;
221
  Status SingleDelete(const WriteOptions& options,
222
                      ColumnFamilyHandle* column_family, const Slice& key,
223
                      const Slice& ts) override;
224
225
  using DB::DeleteRange;
226
  Status DeleteRange(const WriteOptions& options,
227
                     ColumnFamilyHandle* column_family, const Slice& begin_key,
228
                     const Slice& end_key) override;
229
  Status DeleteRange(const WriteOptions& options,
230
                     ColumnFamilyHandle* column_family, const Slice& begin_key,
231
                     const Slice& end_key, const Slice& ts) override;
232
233
  using DB::Write;
234
  Status Write(const WriteOptions& options, WriteBatch* updates) override;
235
236
  using DB::WriteWithCallback;
237
  Status WriteWithCallback(const WriteOptions& options, WriteBatch* updates,
238
                           UserWriteCallback* user_write_cb) override;
239
240
  using DB::Get;
241
  Status Get(const ReadOptions& _read_options,
242
             ColumnFamilyHandle* column_family, const Slice& key,
243
             PinnableSlice* value, std::string* timestamp) override;
244
245
  using DB::GetEntity;
246
  Status GetEntity(const ReadOptions& options,
247
                   ColumnFamilyHandle* column_family, const Slice& key,
248
                   PinnableWideColumns* columns) override;
249
  Status GetEntity(const ReadOptions& options, const Slice& key,
250
                   PinnableAttributeGroups* result) override;
251
252
  using DB::GetMergeOperands;
253
  Status GetMergeOperands(const ReadOptions& options,
254
                          ColumnFamilyHandle* column_family, const Slice& key,
255
                          PinnableSlice* merge_operands,
256
                          GetMergeOperandsOptions* get_merge_operands_options,
257
0
                          int* number_of_operands) override {
258
0
    GetImplOptions get_impl_options;
259
0
    get_impl_options.column_family = column_family;
260
0
    get_impl_options.merge_operands = merge_operands;
261
0
    get_impl_options.get_merge_operands_options = get_merge_operands_options;
262
0
    get_impl_options.number_of_operands = number_of_operands;
263
0
    get_impl_options.get_value = false;
264
0
    return GetImpl(options, key, get_impl_options);
265
0
  }
266
267
  using DB::MultiGet;
268
  // This MultiGet is a batched version, which may be faster than calling Get
269
  // multiple times, especially if the keys have some spatial locality that
270
  // enables them to be queried in the same SST files/set of files. The larger
271
  // the batch size, the more scope for batching and performance improvement
272
  // The values and statuses parameters are arrays with number of elements
273
  // equal to keys.size(). This allows the storage for those to be alloacted
274
  // by the caller on the stack for small batches
275
  void MultiGet(const ReadOptions& _read_options, const size_t num_keys,
276
                ColumnFamilyHandle** column_families, const Slice* keys,
277
                PinnableSlice* values, std::string* timestamps,
278
                Status* statuses, const bool sorted_input = false) override;
279
280
  void MultiGetWithCallback(
281
      const ReadOptions& _read_options, ColumnFamilyHandle* column_family,
282
      ReadCallback* callback,
283
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
284
285
  using DB::MultiGetEntity;
286
287
  void MultiGetEntity(const ReadOptions& options,
288
                      ColumnFamilyHandle* column_family, size_t num_keys,
289
                      const Slice* keys, PinnableWideColumns* results,
290
                      Status* statuses, bool sorted_input) override;
291
292
  void MultiGetEntity(const ReadOptions& options, size_t num_keys,
293
                      ColumnFamilyHandle** column_families, const Slice* keys,
294
                      PinnableWideColumns* results, Status* statuses,
295
                      bool sorted_input) override;
296
  void MultiGetEntity(const ReadOptions& options, size_t num_keys,
297
                      const Slice* keys,
298
                      PinnableAttributeGroups* results) override;
299
300
  void MultiGetEntityWithCallback(
301
      const ReadOptions& read_options, ColumnFamilyHandle* column_family,
302
      ReadCallback* callback,
303
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
304
305
  Status CreateColumnFamily(const ColumnFamilyOptions& cf_options,
306
                            const std::string& column_family,
307
368
                            ColumnFamilyHandle** handle) override {
308
    // TODO: plumb Env::IOActivity, Env::IOPriority
309
368
    return CreateColumnFamily(ReadOptions(), WriteOptions(), cf_options,
310
368
                              column_family, handle);
311
368
  }
312
  virtual Status CreateColumnFamily(const ReadOptions& read_options,
313
                                    const WriteOptions& write_options,
314
                                    const ColumnFamilyOptions& cf_options,
315
                                    const std::string& column_family,
316
                                    ColumnFamilyHandle** handle);
317
  Status CreateColumnFamilies(
318
      const ColumnFamilyOptions& cf_options,
319
      const std::vector<std::string>& column_family_names,
320
0
      std::vector<ColumnFamilyHandle*>* handles) override {
321
    // TODO: plumb Env::IOActivity, Env::IOPriority
322
0
    return CreateColumnFamilies(ReadOptions(), WriteOptions(), cf_options,
323
0
                                column_family_names, handles);
324
0
  }
325
  virtual Status CreateColumnFamilies(
326
      const ReadOptions& read_options, const WriteOptions& write_options,
327
      const ColumnFamilyOptions& cf_options,
328
      const std::vector<std::string>& column_family_names,
329
      std::vector<ColumnFamilyHandle*>* handles);
330
331
  Status CreateColumnFamilies(
332
      const std::vector<ColumnFamilyDescriptor>& column_families,
333
0
      std::vector<ColumnFamilyHandle*>* handles) override {
334
    // TODO: plumb Env::IOActivity, Env::IOPriority
335
0
    return CreateColumnFamilies(ReadOptions(), WriteOptions(), column_families,
336
0
                                handles);
337
0
  }
338
  virtual Status CreateColumnFamilies(
339
      const ReadOptions& read_options, const WriteOptions& write_options,
340
      const std::vector<ColumnFamilyDescriptor>& column_families,
341
      std::vector<ColumnFamilyHandle*>* handles);
342
  Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
343
  Status DropColumnFamilies(
344
      const std::vector<ColumnFamilyHandle*>& column_families) override;
345
346
  // Returns false if key doesn't exist in the database and true if it may.
347
  // If value_found is not passed in as null, then return the value if found in
348
  // memory. On return, if value was found, then value_found will be set to true
349
  // , otherwise false.
350
  using DB::KeyMayExist;
351
  bool KeyMayExist(const ReadOptions& options,
352
                   ColumnFamilyHandle* column_family, const Slice& key,
353
                   std::string* value, std::string* timestamp,
354
                   bool* value_found = nullptr) override;
355
356
  using DB::NewIterator;
357
  Iterator* NewIterator(const ReadOptions& _read_options,
358
                        ColumnFamilyHandle* column_family) override;
359
  Status NewIterators(const ReadOptions& _read_options,
360
                      const std::vector<ColumnFamilyHandle*>& column_families,
361
                      std::vector<Iterator*>* iterators) override;
362
363
  const Snapshot* GetSnapshot() override;
364
  void ReleaseSnapshot(const Snapshot* snapshot) override;
365
366
  // EXPERIMENTAL
367
  std::unique_ptr<Iterator> NewCoalescingIterator(
368
      const ReadOptions& options,
369
      const std::vector<ColumnFamilyHandle*>& column_families) override;
370
371
  // EXPERIMENTAL
372
  std::unique_ptr<AttributeGroupIterator> NewAttributeGroupIterator(
373
      const ReadOptions& options,
374
      const std::vector<ColumnFamilyHandle*>& column_families) override;
375
376
  // Create a timestamped snapshot. This snapshot can be shared by multiple
377
  // readers. If any of them uses it for write conflict checking, then
378
  // is_write_conflict_boundary is true. For simplicity, set it to true by
379
  // default.
380
  std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
381
      SequenceNumber snapshot_seq, uint64_t ts);
382
  std::shared_ptr<const SnapshotImpl> GetTimestampedSnapshot(uint64_t ts) const;
383
  void ReleaseTimestampedSnapshotsOlderThan(
384
      uint64_t ts, size_t* remaining_total_ss = nullptr);
385
  Status GetTimestampedSnapshots(uint64_t ts_lb, uint64_t ts_ub,
386
                                 std::vector<std::shared_ptr<const Snapshot>>&
387
                                     timestamped_snapshots) const;
388
389
  using DB::GetProperty;
390
  bool GetProperty(ColumnFamilyHandle* column_family, const Slice& property,
391
                   std::string* value) override;
392
  using DB::GetMapProperty;
393
  bool GetMapProperty(ColumnFamilyHandle* column_family, const Slice& property,
394
                      std::map<std::string, std::string>* value) override;
395
  using DB::GetIntProperty;
396
  bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property,
397
                      uint64_t* value) override;
398
  using DB::GetAggregatedIntProperty;
399
  bool GetAggregatedIntProperty(const Slice& property,
400
                                uint64_t* aggregated_value) override;
401
  using DB::GetApproximateSizes;
402
  Status GetApproximateSizes(const SizeApproximationOptions& options,
403
                             ColumnFamilyHandle* column_family,
404
                             const Range* range, int n,
405
                             uint64_t* sizes) override;
406
  using DB::GetApproximateMemTableStats;
407
  void GetApproximateMemTableStats(ColumnFamilyHandle* column_family,
408
                                   const Range& range, uint64_t* const count,
409
                                   uint64_t* const size) override;
410
  using DB::CompactRange;
411
  Status CompactRange(const CompactRangeOptions& options,
412
                      ColumnFamilyHandle* column_family, const Slice* begin,
413
                      const Slice* end) override;
414
415
  using DB::CompactFiles;
416
  Status CompactFiles(
417
      const CompactionOptions& compact_options,
418
      ColumnFamilyHandle* column_family,
419
      const std::vector<std::string>& input_file_names, const int output_level,
420
      const int output_path_id = -1,
421
      std::vector<std::string>* const output_file_names = nullptr,
422
      CompactionJobInfo* compaction_job_info = nullptr) override;
423
424
  Status PauseBackgroundWork() override;
425
  Status ContinueBackgroundWork() override;
426
427
  Status EnableAutoCompaction(
428
      const std::vector<ColumnFamilyHandle*>& column_family_handles) override;
429
430
  void EnableManualCompaction() override;
431
  void DisableManualCompaction() override;
432
433
  using DB::SetOptions;
434
  Status SetOptions(
435
      ColumnFamilyHandle* column_family,
436
      const std::unordered_map<std::string, std::string>& options_map) override;
437
438
  Status SetDBOptions(
439
      const std::unordered_map<std::string, std::string>& options_map) override;
440
441
  using DB::NumberLevels;
442
  int NumberLevels(ColumnFamilyHandle* column_family) override;
443
  using DB::MaxMemCompactionLevel;
444
  int MaxMemCompactionLevel(ColumnFamilyHandle* column_family) override;
445
  using DB::Level0StopWriteTrigger;
446
  int Level0StopWriteTrigger(ColumnFamilyHandle* column_family) override;
447
  const std::string& GetName() const override;
448
  Env* GetEnv() const override;
449
  FileSystem* GetFileSystem() const override;
450
  using DB::GetOptions;
451
  Options GetOptions(ColumnFamilyHandle* column_family) const override;
452
  using DB::GetDBOptions;
453
  DBOptions GetDBOptions() const override;
454
  using DB::Flush;
455
  Status Flush(const FlushOptions& options,
456
               ColumnFamilyHandle* column_family) override;
457
  Status Flush(
458
      const FlushOptions& options,
459
      const std::vector<ColumnFamilyHandle*>& column_families) override;
460
0
  Status FlushWAL(bool sync) override {
461
    // TODO: plumb Env::IOActivity, Env::IOPriority
462
0
    return FlushWAL(WriteOptions(), sync);
463
0
  }
464
465
  virtual Status FlushWAL(const WriteOptions& write_options, bool sync);
466
  bool WALBufferIsEmpty();
467
  Status SyncWAL() override;
468
  Status LockWAL() override;
469
  Status UnlockWAL() override;
470
471
  SequenceNumber GetLatestSequenceNumber() const override;
472
473
  // IncreaseFullHistoryTsLow(ColumnFamilyHandle*, std::string) will acquire
474
  // and release db_mutex
475
  Status IncreaseFullHistoryTsLow(ColumnFamilyHandle* column_family,
476
                                  std::string ts_low) override;
477
478
  // GetFullHistoryTsLow(ColumnFamilyHandle*, std::string*) will acquire and
479
  // release db_mutex
480
  Status GetFullHistoryTsLow(ColumnFamilyHandle* column_family,
481
                             std::string* ts_low) override;
482
483
  Status GetDbIdentity(std::string& identity) const override;
484
485
  virtual Status GetDbIdentityFromIdentityFile(std::string* identity) const;
486
487
  Status GetDbSessionId(std::string& session_id) const override;
488
489
  ColumnFamilyHandle* DefaultColumnFamily() const override;
490
491
  ColumnFamilyHandle* PersistentStatsColumnFamily() const;
492
493
  Status Close() override;
494
495
  Status DisableFileDeletions() override;
496
497
  Status EnableFileDeletions() override;
498
499
  virtual bool IsFileDeletionsEnabled() const;
500
501
  Status GetStatsHistory(
502
      uint64_t start_time, uint64_t end_time,
503
      std::unique_ptr<StatsHistoryIterator>* stats_iterator) override;
504
505
  using DB::ResetStats;
506
  Status ResetStats() override;
507
  // All the returned filenames start with "/"
508
  Status GetLiveFiles(std::vector<std::string>&, uint64_t* manifest_file_size,
509
                      bool flush_memtable = true) override;
510
  Status GetSortedWalFiles(VectorWalPtr& files) override;
511
  Status GetSortedWalFilesImpl(VectorWalPtr& files, bool need_seqnos);
512
513
  // Get the known flushed sizes of WALs that might still be written to
514
  // or have pending sync.
515
  // NOTE: unlike alive_log_files_, this function includes WALs that might
516
  // be obsolete (but not obsolete to a pending Checkpoint) and not yet fully
517
  // synced.
518
  Status GetOpenWalSizes(std::map<uint64_t, uint64_t>& number_to_size);
519
  Status GetCurrentWalFile(std::unique_ptr<WalFile>* current_log_file) override;
520
  Status GetCreationTimeOfOldestFile(uint64_t* creation_time) override;
521
522
  Status GetUpdatesSince(
523
      SequenceNumber seq_number, std::unique_ptr<TransactionLogIterator>* iter,
524
      const TransactionLogIterator::ReadOptions& read_options =
525
          TransactionLogIterator::ReadOptions()) override;
526
  Status DeleteFile(std::string name) override;
527
  Status DeleteFilesInRanges(ColumnFamilyHandle* column_family,
528
                             const RangePtr* ranges, size_t n,
529
                             bool include_end = true);
530
531
  void GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) override;
532
533
  Status GetLiveFilesChecksumInfo(FileChecksumList* checksum_list) override;
534
535
  Status GetLiveFilesStorageInfo(
536
      const LiveFilesStorageInfoOptions& opts,
537
      std::vector<LiveFileStorageInfo>* files) override;
538
539
  // Obtains the meta data of the specified column family of the DB.
540
  // TODO(yhchiang): output parameter is placed in the end in this codebase.
541
  void GetColumnFamilyMetaData(ColumnFamilyHandle* column_family,
542
                               ColumnFamilyMetaData* metadata) override;
543
544
  void GetAllColumnFamilyMetaData(
545
      std::vector<ColumnFamilyMetaData>* metadata) override;
546
547
  Status SuggestCompactRange(ColumnFamilyHandle* column_family,
548
                             const Slice* begin, const Slice* end) override;
549
550
  Status PromoteL0(ColumnFamilyHandle* column_family,
551
                   int target_level) override;
552
553
  using DB::IngestExternalFile;
554
  Status IngestExternalFile(
555
      ColumnFamilyHandle* column_family,
556
      const std::vector<std::string>& external_files,
557
      const IngestExternalFileOptions& ingestion_options) override;
558
559
  using DB::IngestExternalFiles;
560
  Status IngestExternalFiles(
561
      const std::vector<IngestExternalFileArg>& args) override;
562
563
  using DB::CreateColumnFamilyWithImport;
564
  Status CreateColumnFamilyWithImport(
565
      const ColumnFamilyOptions& options, const std::string& column_family_name,
566
      const ImportColumnFamilyOptions& import_options,
567
      const std::vector<const ExportImportFilesMetaData*>& metadatas,
568
      ColumnFamilyHandle** handle) override;
569
570
  using DB::ClipColumnFamily;
571
  Status ClipColumnFamily(ColumnFamilyHandle* column_family,
572
                          const Slice& begin_key,
573
                          const Slice& end_key) override;
574
575
  using DB::VerifyFileChecksums;
576
  Status VerifyFileChecksums(const ReadOptions& read_options) override;
577
578
  using DB::VerifyChecksum;
579
  Status VerifyChecksum(const ReadOptions& /*read_options*/) override;
580
  // Verify the checksums of files in db. Currently only tables are checked.
581
  //
582
  // read_options: controls file I/O behavior, e.g. read ahead size while
583
  //               reading all the live table files.
584
  //
585
  // use_file_checksum: if false, verify the block checksums of all live table
586
  //                    in db. Otherwise, obtain the file checksums and compare
587
  //                    with the MANIFEST. Currently, file checksums are
588
  //                    recomputed by reading all table files.
589
  //
590
  // Returns: OK if there is no file whose file or block checksum mismatches.
591
  Status VerifyChecksumInternal(const ReadOptions& read_options,
592
                                bool use_file_checksum);
593
594
  Status VerifyFullFileChecksum(const std::string& file_checksum_expected,
595
                                const std::string& func_name_expected,
596
                                const std::string& fpath,
597
                                const ReadOptions& read_options);
598
599
  using DB::StartTrace;
600
  Status StartTrace(const TraceOptions& options,
601
                    std::unique_ptr<TraceWriter>&& trace_writer) override;
602
603
  using DB::EndTrace;
604
  Status EndTrace() override;
605
606
  using DB::NewDefaultReplayer;
607
  Status NewDefaultReplayer(const std::vector<ColumnFamilyHandle*>& handles,
608
                            std::unique_ptr<TraceReader>&& reader,
609
                            std::unique_ptr<Replayer>* replayer) override;
610
611
  using DB::StartBlockCacheTrace;
612
  Status StartBlockCacheTrace(
613
      const TraceOptions& trace_options,
614
      std::unique_ptr<TraceWriter>&& trace_writer) override;
615
616
  Status StartBlockCacheTrace(
617
      const BlockCacheTraceOptions& options,
618
      std::unique_ptr<BlockCacheTraceWriter>&& trace_writer) override;
619
620
  using DB::EndBlockCacheTrace;
621
  Status EndBlockCacheTrace() override;
622
623
  using DB::StartIOTrace;
624
  Status StartIOTrace(const TraceOptions& options,
625
                      std::unique_ptr<TraceWriter>&& trace_writer) override;
626
627
  using DB::EndIOTrace;
628
  Status EndIOTrace() override;
629
630
  using DB::GetPropertiesOfAllTables;
631
  Status GetPropertiesOfAllTables(ColumnFamilyHandle* column_family,
632
                                  TablePropertiesCollection* props) override;
633
  Status GetPropertiesOfTablesInRange(
634
      ColumnFamilyHandle* column_family, const Range* range, std::size_t n,
635
      TablePropertiesCollection* props) override;
636
637
  // ---- End of implementations of the DB interface ----
638
  SystemClock* GetSystemClock() const;
639
640
  struct GetImplOptions {
641
    ColumnFamilyHandle* column_family = nullptr;
642
    PinnableSlice* value = nullptr;
643
    PinnableWideColumns* columns = nullptr;
644
    std::string* timestamp = nullptr;
645
    bool* value_found = nullptr;
646
    ReadCallback* callback = nullptr;
647
    bool* is_blob_index = nullptr;
648
    // If true return value associated with key via value pointer else return
649
    // all merge operands for key via merge_operands pointer
650
    bool get_value = true;
651
    // Pointer to an array of size
652
    // get_merge_operands_options.expected_max_number_of_operands allocated by
653
    // user
654
    PinnableSlice* merge_operands = nullptr;
655
    GetMergeOperandsOptions* get_merge_operands_options = nullptr;
656
    int* number_of_operands = nullptr;
657
  };
658
659
  Status GetImpl(const ReadOptions& read_options,
660
                 ColumnFamilyHandle* column_family, const Slice& key,
661
                 PinnableSlice* value);
662
663
  Status GetImpl(const ReadOptions& read_options,
664
                 ColumnFamilyHandle* column_family, const Slice& key,
665
                 PinnableSlice* value, std::string* timestamp);
666
667
  // Function that Get and KeyMayExist call with no_io true or false
668
  // Note: 'value_found' from KeyMayExist propagates here
669
  // This function is also called by GetMergeOperands
670
  // If get_impl_options.get_value = true get value associated with
671
  // get_impl_options.key via get_impl_options.value
672
  // If get_impl_options.get_value = false get merge operands associated with
673
  // get_impl_options.key via get_impl_options.merge_operands
674
  virtual Status GetImpl(const ReadOptions& options, const Slice& key,
675
                         GetImplOptions& get_impl_options);
676
677
  // If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
678
  ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
679
                                      ColumnFamilyHandleImpl* cfh,
680
                                      SuperVersion* sv, SequenceNumber snapshot,
681
                                      ReadCallback* read_callback,
682
                                      bool expose_blob_index = false,
683
                                      bool allow_refresh = true);
684
685
1.47k
  virtual SequenceNumber GetLastPublishedSequence() const {
686
1.47k
    if (last_seq_same_as_publish_seq_) {
687
1.47k
      return versions_->LastSequence();
688
1.47k
    } else {
689
0
      return versions_->LastPublishedSequence();
690
0
    }
691
1.47k
  }
692
693
  // REQUIRES: joined the main write queue if two_write_queues is disabled, and
694
  // the second write queue otherwise.
695
  virtual void SetLastPublishedSequence(SequenceNumber seq);
696
  // Returns LastSequence in last_seq_same_as_publish_seq_
697
  // mode and LastAllocatedSequence otherwise. This is useful when visiblility
698
  // depends also on data written to the WAL but not to the memtable.
699
  SequenceNumber TEST_GetLastVisibleSequence() const;
700
701
  // Similar to Write() but will call the callback once on the single write
702
  // thread to determine whether it is safe to perform the write.
703
  virtual Status WriteWithCallback(const WriteOptions& write_options,
704
                                   WriteBatch* my_batch,
705
                                   WriteCallback* callback,
706
                                   UserWriteCallback* user_write_cb = nullptr);
707
708
  // Returns the sequence number that is guaranteed to be smaller than or equal
709
  // to the sequence number of any key that could be inserted into the current
710
  // memtables. It can then be assumed that any write with a larger(or equal)
711
  // sequence number will be present in this memtable or a later memtable.
712
  //
713
  // If the earliest sequence number could not be determined,
714
  // kMaxSequenceNumber will be returned.
715
  //
716
  // If include_history=true, will also search Memtables in MemTableList
717
  // History.
718
  SequenceNumber GetEarliestMemTableSequenceNumber(SuperVersion* sv,
719
                                                   bool include_history);
720
721
  // For a given key, check to see if there are any records for this key
722
  // in the memtables, including memtable history.  If cache_only is false,
723
  // SST files will also be checked.
724
  //
725
  // `key` should NOT have user-defined timestamp appended to user key even if
726
  // timestamp is enabled.
727
  //
728
  // If a key is found, *found_record_for_key will be set to true and
729
  // *seq will be set to the stored sequence number for the latest
730
  // operation on this key or kMaxSequenceNumber if unknown. If user-defined
731
  // timestamp is enabled for this column family and timestamp is not nullptr,
732
  // then *timestamp will be set to the stored timestamp for the latest
733
  // operation on this key.
734
  // If no key is found, *found_record_for_key will be set to false.
735
  //
736
  // Note: If cache_only=false, it is possible for *seq to be set to 0 if
737
  // the sequence number has been cleared from the record.  If the caller is
738
  // holding an active db snapshot, we know the missing sequence must be less
739
  // than the snapshot's sequence number (sequence numbers are only cleared
740
  // when there are no earlier active snapshots).
741
  //
742
  // If NotFound is returned and found_record_for_key is set to false, then no
743
  // record for this key was found.  If the caller is holding an active db
744
  // snapshot, we know that no key could have existing after this snapshot
745
  // (since we do not compact keys that have an earlier snapshot).
746
  //
747
  // Only records newer than or at `lower_bound_seq` are guaranteed to be
748
  // returned. Memtables and files may not be checked if it only contains data
749
  // older than `lower_bound_seq`.
750
  //
751
  // Returns OK or NotFound on success,
752
  // other status on unexpected error.
753
  // TODO(andrewkr): this API need to be aware of range deletion operations
754
  Status GetLatestSequenceForKey(SuperVersion* sv, const Slice& key,
755
                                 bool cache_only,
756
                                 SequenceNumber lower_bound_seq,
757
                                 SequenceNumber* seq, std::string* timestamp,
758
                                 bool* found_record_for_key,
759
                                 bool* is_blob_index);
760
761
  Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key,
762
                           const Slice& lower_bound, const Slice upper_bound);
763
  Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key,
764
                                  const Slice& lower_bound,
765
                                  const Slice upper_bound);
766
767
  // Similar to GetSnapshot(), but also lets the db know that this snapshot
768
  // will be used for transaction write-conflict checking.  The DB can then
769
  // make sure not to compact any keys that would prevent a write-conflict from
770
  // being detected.
771
  const Snapshot* GetSnapshotForWriteConflictBoundary();
772
773
  // checks if all live files exist on file system and that their file sizes
774
  // match to our in-memory records
775
  virtual Status CheckConsistency();
776
777
  // max_file_num_to_ignore allows bottom level compaction to filter out newly
778
  // compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will
779
  // disable the filtering
780
  // If `final_output_level` is not nullptr, it is set to manual compaction's
781
  // output level if returned status is OK, and it may or may not be set to
782
  // manual compaction's output level if returned status is not OK.
783
  Status RunManualCompaction(ColumnFamilyData* cfd, int input_level,
784
                             int output_level,
785
                             const CompactRangeOptions& compact_range_options,
786
                             const Slice* begin, const Slice* end,
787
                             bool exclusive, bool disallow_trivial_move,
788
                             uint64_t max_file_num_to_ignore,
789
                             const std::string& trim_ts,
790
                             int* final_output_level = nullptr);
791
792
  // Return an internal iterator over the current state of the database.
793
  // The keys of this iterator are internal keys (see format.h).
794
  // The returned iterator should be deleted when no longer needed.
795
  // If allow_unprepared_value is true, the returned iterator may defer reading
796
  // the value and so will require PrepareValue() to be called before value();
797
  // allow_unprepared_value = false is convenient when this optimization is not
798
  // useful, e.g. when reading the whole column family.
799
  //
800
  // read_options.ignore_range_deletions determines whether range tombstones are
801
  // processed in the returned interator internally, i.e., whether range
802
  // tombstone covered keys are in this iterator's output.
803
  // @param read_options Must outlive the returned iterator.
804
  InternalIterator* NewInternalIterator(
805
      const ReadOptions& read_options, Arena* arena, SequenceNumber sequence,
806
      ColumnFamilyHandle* column_family = nullptr,
807
      bool allow_unprepared_value = false);
808
809
  // Note: to support DB iterator refresh, memtable range tombstones in the
810
  // underlying merging iterator needs to be refreshed. If db_iter is not
811
  // nullptr, db_iter->SetMemtableRangetombstoneIter() is called with the
812
  // memtable range tombstone iterator used by the underlying merging iterator.
813
  // This range tombstone iterator can be refreshed later by db_iter.
814
  // @param read_options Must outlive the returned iterator.
815
  InternalIterator* NewInternalIterator(const ReadOptions& read_options,
816
                                        ColumnFamilyData* cfd,
817
                                        SuperVersion* super_version,
818
                                        Arena* arena, SequenceNumber sequence,
819
                                        bool allow_unprepared_value,
820
                                        ArenaWrappedDBIter* db_iter = nullptr);
821
822
0
  LogsWithPrepTracker* logs_with_prep_tracker() {
823
0
    return &logs_with_prep_tracker_;
824
0
  }
825
826
  struct BGJobLimits {
827
    int max_flushes;
828
    int max_compactions;
829
  };
830
  // Returns maximum background flushes and compactions allowed to be scheduled
831
  BGJobLimits GetBGJobLimits() const;
832
  // Need a static version that can be called during SanitizeOptions().
833
  static BGJobLimits GetBGJobLimits(int max_background_flushes,
834
                                    int max_background_compactions,
835
                                    int max_background_jobs,
836
                                    bool parallelize_compactions);
837
838
  // move logs pending closing from job_context to the DB queue and
839
  // schedule a purge
840
  void ScheduleBgLogWriterClose(JobContext* job_context);
841
842
  uint64_t MinLogNumberToKeep();
843
844
  uint64_t MinLogNumberToRecycle();
845
846
  // Returns the lower bound file number for SSTs that won't be deleted, even if
847
  // they're obsolete. This lower bound is used internally to prevent newly
848
  // created flush/compaction output files from being deleted before they're
849
  // installed. This technique avoids the need for tracking the exact numbers of
850
  // files pending creation, although it prevents more files than necessary from
851
  // being deleted.
852
  uint64_t MinObsoleteSstNumberToKeep();
853
854
  uint64_t GetObsoleteSstFilesSize();
855
856
  // Returns the list of live files in 'live' and the list
857
  // of all files in the filesystem in 'candidate_files'.
858
  // If force == false and the last call was less than
859
  // db_options_.delete_obsolete_files_period_micros microseconds ago,
860
  // it will not fill up the job_context
861
  void FindObsoleteFiles(JobContext* job_context, bool force,
862
                         bool no_full_scan = false);
863
864
  // Diffs the files listed in filenames and those that do not
865
  // belong to live files are possibly removed. Also, removes all the
866
  // files in sst_delete_files and log_delete_files.
867
  // It is not necessary to hold the mutex when invoking this method.
868
  // If FindObsoleteFiles() was run, we need to also run
869
  // PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
870
  void PurgeObsoleteFiles(JobContext& background_contet,
871
                          bool schedule_only = false);
872
873
  // Schedule a background job to actually delete obsolete files.
874
  void SchedulePurge();
875
876
0
  const SnapshotList& snapshots() const { return snapshots_; }
877
878
  // load list of snapshots to `snap_vector` that is no newer than `max_seq`
879
  // in ascending order.
880
  // `oldest_write_conflict_snapshot` is filled with the oldest snapshot
881
  // which satisfies SnapshotImpl.is_write_conflict_boundary_ = true.
882
  void LoadSnapshots(std::vector<SequenceNumber>* snap_vector,
883
                     SequenceNumber* oldest_write_conflict_snapshot,
884
0
                     const SequenceNumber& max_seq) const {
885
0
    InstrumentedMutexLock l(mutex());
886
0
    snapshots().GetAll(snap_vector, oldest_write_conflict_snapshot, max_seq);
887
0
  }
888
889
0
  const ImmutableDBOptions& immutable_db_options() const {
890
0
    return immutable_db_options_;
891
0
  }
892
893
  // Cancel all background jobs, including flush, compaction, background
894
  // purging, stats dumping threads, etc. If `wait` = true, wait for the
895
  // running jobs to abort or finish before returning. Otherwise, only
896
  // sends the signals.
897
  void CancelAllBackgroundWork(bool wait);
898
899
  // Find Super version and reference it. Based on options, it might return
900
  // the thread local cached one.
901
  // Call ReturnAndCleanupSuperVersion() when it is no longer needed.
902
  SuperVersion* GetAndRefSuperVersion(ColumnFamilyData* cfd);
903
904
  // Similar to the previous function but looks up based on a column family id.
905
  // nullptr will be returned if this column family no longer exists.
906
  // REQUIRED: this function should only be called on the write thread or if the
907
  // mutex is held.
908
  SuperVersion* GetAndRefSuperVersion(uint32_t column_family_id);
909
910
  // Un-reference the super version and clean it up if it is the last reference.
911
  void CleanupSuperVersion(SuperVersion* sv);
912
913
  // Un-reference the super version and return it to thread local cache if
914
  // needed. If it is the last reference of the super version. Clean it up
915
  // after un-referencing it.
916
  void ReturnAndCleanupSuperVersion(ColumnFamilyData* cfd, SuperVersion* sv);
917
918
  // Similar to the previous function but looks up based on a column family id.
919
  // nullptr will be returned if this column family no longer exists.
920
  // REQUIRED: this function should only be called on the write thread.
921
  void ReturnAndCleanupSuperVersion(uint32_t colun_family_id, SuperVersion* sv);
922
923
  // REQUIRED: this function should only be called on the write thread or if the
924
  // mutex is held.  Return value only valid until next call to this function or
925
  // mutex is released.
926
  ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t column_family_id);
927
928
  // Same as above, should called without mutex held and not on write thread.
929
  std::unique_ptr<ColumnFamilyHandle> GetColumnFamilyHandleUnlocked(
930
      uint32_t column_family_id);
931
932
  // Returns the number of currently running flushes.
933
  // REQUIREMENT: mutex_ must be held when calling this function.
934
0
  int num_running_flushes() {
935
0
    mutex_.AssertHeld();
936
0
    return num_running_flushes_;
937
0
  }
938
939
  // Returns the number of currently running compactions.
940
  // REQUIREMENT: mutex_ must be held when calling this function.
941
0
  int num_running_compactions() {
942
0
    mutex_.AssertHeld();
943
0
    return num_running_compactions_;
944
0
  }
945
946
0
  const WriteController& write_controller() { return write_controller_; }
947
948
  // hollow transactions shell used for recovery.
949
  // these will then be passed to TransactionDB so that
950
  // locks can be reacquired before writing can resume.
951
  struct RecoveredTransaction {
952
    std::string name_;
953
    bool unprepared_;
954
955
    struct BatchInfo {
956
      uint64_t log_number_;
957
      // TODO(lth): For unprepared, the memory usage here can be big for
958
      // unprepared transactions. This is only useful for rollbacks, and we
959
      // can in theory just keep keyset for that.
960
      WriteBatch* batch_;
961
      // Number of sub-batches. A new sub-batch is created if txn attempts to
962
      // insert a duplicate key,seq to memtable. This is currently used in
963
      // WritePreparedTxn/WriteUnpreparedTxn.
964
      size_t batch_cnt_;
965
    };
966
967
    // This maps the seq of the first key in the batch to BatchInfo, which
968
    // contains WriteBatch and other information relevant to the batch.
969
    //
970
    // For WriteUnprepared, batches_ can have size greater than 1, but for
971
    // other write policies, it must be of size 1.
972
    std::map<SequenceNumber, BatchInfo> batches_;
973
974
    explicit RecoveredTransaction(const uint64_t log, const std::string& name,
975
                                  WriteBatch* batch, SequenceNumber seq,
976
                                  size_t batch_cnt, bool unprepared)
977
0
        : name_(name), unprepared_(unprepared) {
978
0
      batches_[seq] = {log, batch, batch_cnt};
979
0
    }
980
981
0
    ~RecoveredTransaction() {
982
0
      for (auto& it : batches_) {
983
0
        delete it.second.batch_;
984
0
      }
985
0
    }
986
987
    void AddBatch(SequenceNumber seq, uint64_t log_number, WriteBatch* batch,
988
0
                  size_t batch_cnt, bool unprepared) {
989
0
      assert(batches_.count(seq) == 0);
990
0
      batches_[seq] = {log_number, batch, batch_cnt};
991
      // Prior state must be unprepared, since the prepare batch must be the
992
      // last batch.
993
0
      assert(unprepared_);
994
0
      unprepared_ = unprepared;
995
0
    }
996
  };
997
998
11.8k
  bool allow_2pc() const { return immutable_db_options_.allow_2pc; }
999
1000
  std::unordered_map<std::string, RecoveredTransaction*>
1001
0
  recovered_transactions() {
1002
0
    return recovered_transactions_;
1003
0
  }
1004
1005
0
  RecoveredTransaction* GetRecoveredTransaction(const std::string& name) {
1006
0
    auto it = recovered_transactions_.find(name);
1007
0
    if (it == recovered_transactions_.end()) {
1008
0
      return nullptr;
1009
0
    } else {
1010
0
      return it->second;
1011
0
    }
1012
0
  }
1013
1014
  void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
1015
                                  WriteBatch* batch, SequenceNumber seq,
1016
0
                                  size_t batch_cnt, bool unprepared_batch) {
1017
    // For WriteUnpreparedTxn, InsertRecoveredTransaction is called multiple
1018
    // times for every unprepared batch encountered during recovery.
1019
    //
1020
    // If the transaction is prepared, then the last call to
1021
    // InsertRecoveredTransaction will have unprepared_batch = false.
1022
0
    auto rtxn = recovered_transactions_.find(name);
1023
0
    if (rtxn == recovered_transactions_.end()) {
1024
0
      recovered_transactions_[name] = new RecoveredTransaction(
1025
0
          log, name, batch, seq, batch_cnt, unprepared_batch);
1026
0
    } else {
1027
0
      rtxn->second->AddBatch(seq, log, batch, batch_cnt, unprepared_batch);
1028
0
    }
1029
0
    logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log);
1030
0
  }
1031
1032
0
  void DeleteRecoveredTransaction(const std::string& name) {
1033
0
    auto it = recovered_transactions_.find(name);
1034
0
    assert(it != recovered_transactions_.end());
1035
0
    auto* trx = it->second;
1036
0
    recovered_transactions_.erase(it);
1037
0
    for (const auto& info : trx->batches_) {
1038
0
      logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(
1039
0
          info.second.log_number_);
1040
0
    }
1041
0
    delete trx;
1042
0
  }
1043
1044
0
  void DeleteAllRecoveredTransactions() {
1045
0
    for (auto it = recovered_transactions_.begin();
1046
0
         it != recovered_transactions_.end(); ++it) {
1047
0
      delete it->second;
1048
0
    }
1049
0
    recovered_transactions_.clear();
1050
0
  }
1051
1052
0
  void AddToLogsToFreeQueue(log::Writer* log_writer) {
1053
0
    mutex_.AssertHeld();
1054
0
    logs_to_free_queue_.push_back(log_writer);
1055
0
  }
1056
1057
0
  void AddSuperVersionsToFreeQueue(SuperVersion* sv) {
1058
0
    superversions_to_free_queue_.push_back(sv);
1059
0
  }
1060
1061
  void SetSnapshotChecker(SnapshotChecker* snapshot_checker);
1062
1063
  // Fill JobContext with snapshot information needed by flush and compaction.
1064
  void GetSnapshotContext(JobContext* job_context,
1065
                          std::vector<SequenceNumber>* snapshot_seqs,
1066
                          SequenceNumber* earliest_write_conflict_snapshot,
1067
                          SnapshotChecker** snapshot_checker);
1068
1069
  // Not thread-safe.
1070
  void SetRecoverableStatePreReleaseCallback(PreReleaseCallback* callback);
1071
1072
12.1k
  InstrumentedMutex* mutex() const { return &mutex_; }
1073
1074
  // Initialize a brand new DB. The DB directory is expected to be empty before
1075
  // calling it. Push new manifest file name into `new_filenames`.
1076
  Status NewDB(std::vector<std::string>* new_filenames);
1077
1078
  // This is to be used only by internal rocksdb classes.
1079
  static Status Open(const DBOptions& db_options, const std::string& name,
1080
                     const std::vector<ColumnFamilyDescriptor>& column_families,
1081
                     std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1082
                     const bool seq_per_batch, const bool batch_per_txn,
1083
                     const bool is_retry, bool* can_retry);
1084
1085
  static IOStatus CreateAndNewDirectory(
1086
      FileSystem* fs, const std::string& dirname,
1087
      std::unique_ptr<FSDirectory>* directory);
1088
1089
  // find stats map from stats_history_ with smallest timestamp in
1090
  // the range of [start_time, end_time)
1091
  bool FindStatsByTime(uint64_t start_time, uint64_t end_time,
1092
                       uint64_t* new_time,
1093
                       std::map<std::string, uint64_t>* stats_map);
1094
1095
  // Print information of all tombstones of all iterators to the std::string
1096
  // This is only used by ldb. The output might be capped. Tombstones
1097
  // printed out are not guaranteed to be in any order.
1098
  Status TablesRangeTombstoneSummary(ColumnFamilyHandle* column_family,
1099
                                     int max_entries_to_print,
1100
                                     std::string* out_str);
1101
1102
0
  VersionSet* GetVersionSet() const { return versions_.get(); }
1103
1104
  Status WaitForCompact(
1105
      const WaitForCompactOptions& wait_for_compact_options) override;
1106
1107
#ifndef NDEBUG
1108
  // Compact any files in the named level that overlap [*begin, *end]
1109
  Status TEST_CompactRange(int level, const Slice* begin, const Slice* end,
1110
                           ColumnFamilyHandle* column_family = nullptr,
1111
                           bool disallow_trivial_move = false);
1112
1113
  Status TEST_SwitchWAL();
1114
1115
  bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; }
1116
1117
  bool TEST_IsLogGettingFlushed() {
1118
    return alive_log_files_.begin()->getting_flushed;
1119
  }
1120
1121
  Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr);
1122
1123
  // Force current memtable contents to be flushed.
1124
  Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false,
1125
                            ColumnFamilyHandle* cfh = nullptr);
1126
1127
  Status TEST_FlushMemTable(ColumnFamilyData* cfd,
1128
                            const FlushOptions& flush_opts);
1129
1130
  // Flush (multiple) ColumnFamilyData without using ColumnFamilyHandle. This
1131
  // is because in certain cases, we can flush column families, wait for the
1132
  // flush to complete, but delete the column family handle before the wait
1133
  // finishes. For example in CompactRange.
1134
  Status TEST_AtomicFlushMemTables(
1135
      const autovector<ColumnFamilyData*>& provided_candidate_cfds,
1136
      const FlushOptions& flush_opts);
1137
1138
  // Wait for background threads to complete scheduled work.
1139
  Status TEST_WaitForBackgroundWork();
1140
1141
  // Wait for memtable compaction
1142
  Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
1143
1144
  Status TEST_WaitForCompact();
1145
  Status TEST_WaitForCompact(
1146
      const WaitForCompactOptions& wait_for_compact_options);
1147
1148
  // Wait for any background purge
1149
  Status TEST_WaitForPurge();
1150
1151
  // Get the background error status
1152
  Status TEST_GetBGError();
1153
1154
  // Return the maximum overlapping data (in bytes) at next level for any
1155
  // file at a level >= 1.
1156
  uint64_t TEST_MaxNextLevelOverlappingBytes(
1157
      ColumnFamilyHandle* column_family = nullptr);
1158
1159
  // Return the current manifest file no.
1160
  uint64_t TEST_Current_Manifest_FileNo();
1161
1162
  // Returns the number that'll be assigned to the next file that's created.
1163
  uint64_t TEST_Current_Next_FileNo();
1164
1165
  // get total level0 file size. Only for testing.
1166
  uint64_t TEST_GetLevel0TotalSize();
1167
1168
  void TEST_GetFilesMetaData(
1169
      ColumnFamilyHandle* column_family,
1170
      std::vector<std::vector<FileMetaData>>* metadata,
1171
      std::vector<std::shared_ptr<BlobFileMetaData>>* blob_metadata = nullptr);
1172
1173
  void TEST_LockMutex();
1174
1175
  void TEST_UnlockMutex();
1176
1177
  InstrumentedMutex* TEST_Mutex() { return &mutex_; }
1178
1179
  void TEST_SignalAllBgCv();
1180
1181
  // REQUIRES: mutex locked
1182
  void* TEST_BeginWrite();
1183
1184
  // REQUIRES: mutex locked
1185
  // pass the pointer that you got from TEST_BeginWrite()
1186
  void TEST_EndWrite(void* w);
1187
1188
  uint64_t TEST_MaxTotalInMemoryState() const {
1189
    return max_total_in_memory_state_;
1190
  }
1191
1192
  size_t TEST_LogsToFreeSize();
1193
1194
  uint64_t TEST_LogfileNumber();
1195
1196
  uint64_t TEST_total_log_size() const { return total_log_size_; }
1197
1198
  // Returns column family name to ImmutableCFOptions map.
1199
  Status TEST_GetAllImmutableCFOptions(
1200
      std::unordered_map<std::string, const ImmutableCFOptions*>* iopts_map);
1201
1202
  // Return the lastest MutableCFOptions of a column family
1203
  Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family,
1204
                                        MutableCFOptions* mutable_cf_options);
1205
1206
  Cache* TEST_table_cache() { return table_cache_.get(); }
1207
1208
  WriteController& TEST_write_controler() { return write_controller_; }
1209
1210
  uint64_t TEST_FindMinLogContainingOutstandingPrep();
1211
  uint64_t TEST_FindMinPrepLogReferencedByMemTable();
1212
  size_t TEST_PreparedSectionCompletedSize();
1213
  size_t TEST_LogsWithPrepSize();
1214
1215
  int TEST_BGCompactionsAllowed() const;
1216
  int TEST_BGFlushesAllowed() const;
1217
  size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
1218
  void TEST_WaitForPeriodicTaskRun(std::function<void()> callback) const;
1219
  SeqnoToTimeMapping TEST_GetSeqnoToTimeMapping() const;
1220
  const autovector<uint64_t>& TEST_GetFilesToQuarantine() const;
1221
  size_t TEST_EstimateInMemoryStatsHistorySize() const;
1222
1223
  uint64_t TEST_GetCurrentLogNumber() const {
1224
    InstrumentedMutexLock l(mutex());
1225
    assert(!logs_.empty());
1226
    return logs_.back().number;
1227
  }
1228
1229
  void TEST_DeleteObsoleteFiles();
1230
1231
  const std::unordered_set<uint64_t>& TEST_GetFilesGrabbedForPurge() const {
1232
    return files_grabbed_for_purge_;
1233
  }
1234
1235
  const PeriodicTaskScheduler& TEST_GetPeriodicTaskScheduler() const;
1236
1237
  static Status TEST_ValidateOptions(const DBOptions& db_options) {
1238
    return ValidateOptions(db_options);
1239
  }
1240
1241
#endif  // NDEBUG
1242
1243
  // persist stats to column family "_persistent_stats"
1244
  void PersistStats();
1245
1246
  // dump rocksdb.stats to LOG
1247
  void DumpStats();
1248
1249
  // flush LOG out of application buffer
1250
  void FlushInfoLog();
1251
1252
  // record current sequence number to time mapping. If
1253
  // populate_historical_seconds > 0 then pre-populate all the
1254
  // sequence numbers from [1, last] to map to [now minus
1255
  // populate_historical_seconds, now].
1256
  void RecordSeqnoToTimeMapping(uint64_t populate_historical_seconds);
1257
1258
  // Everytime DB's seqno to time mapping changed (which already hold the db
1259
  // mutex), we install a new SuperVersion in each column family with a shared
1260
  // copy of the new mapping while holding the db mutex.
1261
  // This is done for all column families even though the column family does not
1262
  // explicitly enabled the
1263
  // `preclude_last_level_data_seconds` or `preserve_internal_time_seconds`
1264
  // features.
1265
  // This mapping supports iterators to fulfill the
1266
  // "rocksdb.iterator.write-time" iterator property for entries in memtables.
1267
  //
1268
  // Since this new SuperVersion doesn't involve an LSM tree shape change, we
1269
  // don't schedule work after installing this SuperVersion. It returns the used
1270
  // `SuperVersionContext` for clean up after release mutex.
1271
  void InstallSeqnoToTimeMappingInSV(
1272
      std::vector<SuperVersionContext>* sv_contexts);
1273
1274
  // Interface to block and signal the DB in case of stalling writes by
1275
  // WriteBufferManager. Each DBImpl object contains ptr to WBMStallInterface.
1276
  // When DB needs to be blocked or signalled by WriteBufferManager,
1277
  // state_ is changed accordingly.
1278
  class WBMStallInterface : public StallInterface {
1279
   public:
1280
    enum State {
1281
      BLOCKED = 0,
1282
      RUNNING,
1283
    };
1284
1285
11.0k
    WBMStallInterface() : state_cv_(&state_mutex_) {
1286
11.0k
      MutexLock lock(&state_mutex_);
1287
11.0k
      state_ = State::RUNNING;
1288
11.0k
    }
1289
1290
0
    void SetState(State state) {
1291
0
      MutexLock lock(&state_mutex_);
1292
0
      state_ = state;
1293
0
    }
1294
1295
    // Change the state_ to State::BLOCKED and wait until its state is
1296
    // changed by WriteBufferManager. When stall is cleared, Signal() is
1297
    // called to change the state and unblock the DB.
1298
0
    void Block() override {
1299
0
      MutexLock lock(&state_mutex_);
1300
0
      while (state_ == State::BLOCKED) {
1301
0
        TEST_SYNC_POINT("WBMStallInterface::BlockDB");
1302
0
        state_cv_.Wait();
1303
0
      }
1304
0
    }
1305
1306
    // Called from WriteBufferManager. This function changes the state_
1307
    // to State::RUNNING indicating the stall is cleared and DB can proceed.
1308
11.0k
    void Signal() override {
1309
11.0k
      {
1310
11.0k
        MutexLock lock(&state_mutex_);
1311
11.0k
        state_ = State::RUNNING;
1312
11.0k
      }
1313
11.0k
      state_cv_.Signal();
1314
11.0k
    }
1315
1316
   private:
1317
    // Conditional variable and mutex to block and
1318
    // signal the DB during stalling process.
1319
    port::Mutex state_mutex_;
1320
    port::CondVar state_cv_;
1321
    // state represting whether DB is running or blocked because of stall by
1322
    // WriteBufferManager.
1323
    State state_;
1324
  };
1325
1326
  static void TEST_ResetDbSessionIdGen();
1327
  static std::string GenerateDbSessionId(Env* env);
1328
1329
0
  bool seq_per_batch() const { return seq_per_batch_; }
1330
1331
 protected:
1332
  const std::string dbname_;
1333
  // TODO(peterd): unify with VersionSet::db_id_
1334
  std::string db_id_;
1335
  // db_session_id_ is an identifier that gets reset
1336
  // every time the DB is opened
1337
  std::string db_session_id_;
1338
  std::unique_ptr<VersionSet> versions_;
1339
  // Flag to check whether we allocated and own the info log file
1340
  bool own_info_log_;
1341
  Status init_logger_creation_s_;
1342
  const DBOptions initial_db_options_;
1343
  Env* const env_;
1344
  std::shared_ptr<IOTracer> io_tracer_;
1345
  const ImmutableDBOptions immutable_db_options_;
1346
  FileSystemPtr fs_;
1347
  MutableDBOptions mutable_db_options_;
1348
  Statistics* stats_;
1349
  std::unordered_map<std::string, RecoveredTransaction*>
1350
      recovered_transactions_;
1351
  std::unique_ptr<Tracer> tracer_;
1352
  InstrumentedMutex trace_mutex_;
1353
  BlockCacheTracer block_cache_tracer_;
1354
1355
  // constant false canceled flag, used when the compaction is not manual
1356
  const std::atomic<bool> kManualCompactionCanceledFalse_{false};
1357
1358
  // State below is protected by mutex_
1359
  // With two_write_queues enabled, some of the variables that accessed during
1360
  // WriteToWAL need different synchronization: log_empty_, alive_log_files_,
1361
  // logs_, logfile_number_. Refer to the definition of each variable below for
1362
  // more description.
1363
  //
1364
  // `mutex_` can be a hot lock in some workloads, so it deserves dedicated
1365
  // cachelines.
1366
  mutable CacheAlignedInstrumentedMutex mutex_;
1367
1368
  ColumnFamilyHandleImpl* default_cf_handle_;
1369
  InternalStats* default_cf_internal_stats_;
1370
1371
  // table_cache_ provides its own synchronization
1372
  std::shared_ptr<Cache> table_cache_;
1373
1374
  ErrorHandler error_handler_;
1375
1376
  // Unified interface for logging events
1377
  EventLogger event_logger_;
1378
1379
  // only used for dynamically adjusting max_total_wal_size. it is a sum of
1380
  // [write_buffer_size * max_write_buffer_number] over all column families
1381
  std::atomic<uint64_t> max_total_in_memory_state_;
1382
1383
  // The options to access storage files
1384
  const FileOptions file_options_;
1385
1386
  // Additonal options for compaction and flush
1387
  FileOptions file_options_for_compaction_;
1388
1389
  std::unique_ptr<ColumnFamilyMemTablesImpl> column_family_memtables_;
1390
1391
  // Increase the sequence number after writing each batch, whether memtable is
1392
  // disabled for that or not. Otherwise the sequence number is increased after
1393
  // writing each key into memtable. This implies that when disable_memtable is
1394
  // set, the seq is not increased at all.
1395
  //
1396
  // Default: false
1397
  const bool seq_per_batch_;
1398
  // This determines during recovery whether we expect one writebatch per
1399
  // recovered transaction, or potentially multiple writebatches per
1400
  // transaction. For WriteUnprepared, this is set to false, since multiple
1401
  // batches can exist per transaction.
1402
  //
1403
  // Default: true
1404
  const bool batch_per_txn_;
1405
1406
  // Each flush or compaction gets its own job id. this counter makes sure
1407
  // they're unique
1408
  std::atomic<int> next_job_id_;
1409
1410
  std::atomic<bool> shutting_down_;
1411
1412
  // No new background jobs can be queued if true. This is used to prevent new
1413
  // background jobs from being queued after WaitForCompact() completes waiting
1414
  // all background jobs then attempts to close when close_db_ option is true.
1415
  bool reject_new_background_jobs_;
1416
1417
  // RecoveryContext struct stores the context about version edits along
1418
  // with corresponding column_family_data and column_family_options.
1419
  class RecoveryContext {
1420
   public:
1421
11.0k
    ~RecoveryContext() {
1422
11.4k
      for (auto& edit_list : edit_lists_) {
1423
23.1k
        for (auto* edit : edit_list) {
1424
23.1k
          delete edit;
1425
23.1k
        }
1426
11.4k
      }
1427
11.0k
    }
1428
1429
23.1k
    void UpdateVersionEdits(ColumnFamilyData* cfd, const VersionEdit& edit) {
1430
23.1k
      assert(cfd != nullptr);
1431
23.1k
      if (map_.find(cfd->GetID()) == map_.end()) {
1432
11.4k
        uint32_t size = static_cast<uint32_t>(map_.size());
1433
11.4k
        map_.emplace(cfd->GetID(), size);
1434
11.4k
        cfds_.emplace_back(cfd);
1435
11.4k
        mutable_cf_opts_.emplace_back(cfd->GetLatestMutableCFOptions());
1436
11.4k
        edit_lists_.emplace_back(autovector<VersionEdit*>());
1437
11.4k
      }
1438
23.1k
      uint32_t i = map_[cfd->GetID()];
1439
23.1k
      edit_lists_[i].emplace_back(new VersionEdit(edit));
1440
23.1k
    }
1441
1442
    std::unordered_map<uint32_t, uint32_t> map_;  // cf_id to index;
1443
    autovector<ColumnFamilyData*> cfds_;
1444
    autovector<const MutableCFOptions*> mutable_cf_opts_;
1445
    autovector<autovector<VersionEdit*>> edit_lists_;
1446
    // All existing data files (SST files and Blob files) found during DB::Open.
1447
    std::vector<std::string> existing_data_files_;
1448
    bool is_new_db_ = false;
1449
  };
1450
1451
  // Persist options to options file. Must be holding options_mutex_.
1452
  // Will lock DB mutex if !db_mutex_already_held.
1453
  Status WriteOptionsFile(const WriteOptions& write_options,
1454
                          bool db_mutex_already_held);
1455
1456
  Status CompactRangeInternal(const CompactRangeOptions& options,
1457
                              ColumnFamilyHandle* column_family,
1458
                              const Slice* begin, const Slice* end,
1459
                              const std::string& trim_ts);
1460
1461
  // The following two functions can only be called when:
1462
  // 1. WriteThread::Writer::EnterUnbatched() is used.
1463
  // 2. db_mutex is NOT held
1464
  Status RenameTempFileToOptionsFile(const std::string& file_name);
1465
  Status DeleteObsoleteOptionsFiles();
1466
1467
  void NotifyOnManualFlushScheduled(autovector<ColumnFamilyData*> cfds,
1468
                                    FlushReason flush_reason);
1469
1470
  void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
1471
                          const MutableCFOptions& mutable_cf_options,
1472
                          int job_id, FlushReason flush_reason);
1473
1474
  void NotifyOnFlushCompleted(
1475
      ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
1476
      std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info);
1477
1478
  void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction* c,
1479
                               const Status& st,
1480
                               const CompactionJobStats& job_stats, int job_id);
1481
1482
  void NotifyOnCompactionCompleted(ColumnFamilyData* cfd, Compaction* c,
1483
                                   const Status& st,
1484
                                   const CompactionJobStats& job_stats,
1485
                                   int job_id);
1486
  void NotifyOnMemTableSealed(ColumnFamilyData* cfd,
1487
                              const MemTableInfo& mem_table_info);
1488
1489
  void NotifyOnExternalFileIngested(
1490
      ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job);
1491
1492
  Status FlushAllColumnFamilies(const FlushOptions& flush_options,
1493
                                FlushReason flush_reason);
1494
1495
  virtual Status FlushForGetLiveFiles();
1496
1497
  void NewThreadStatusCfInfo(ColumnFamilyData* cfd) const;
1498
1499
  void EraseThreadStatusCfInfo(ColumnFamilyData* cfd) const;
1500
1501
  void EraseThreadStatusDbInfo() const;
1502
1503
  // If disable_memtable is set the application logic must guarantee that the
1504
  // batch will still be skipped from memtable during the recovery. An excption
1505
  // to this is seq_per_batch_ mode, in which since each batch already takes one
1506
  // seq, it is ok for the batch to write to memtable during recovery as long as
1507
  // it only takes one sequence number: i.e., no duplicate keys.
1508
  // In WriteCommitted it is guarnateed since disable_memtable is used for
1509
  // prepare batch which will be written to memtable later during the commit,
1510
  // and in WritePrepared it is guaranteed since it will be used only for WAL
1511
  // markers which will never be written to memtable. If the commit marker is
1512
  // accompanied with CommitTimeWriteBatch that is not written to memtable as
1513
  // long as it has no duplicate keys, it does not violate the one-seq-per-batch
1514
  // policy.
1515
  // batch_cnt is expected to be non-zero in seq_per_batch mode and
1516
  // indicates the number of sub-patches. A sub-patch is a subset of the write
1517
  // batch that does not have duplicate keys.
1518
  Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
1519
                   WriteCallback* callback = nullptr,
1520
                   UserWriteCallback* user_write_cb = nullptr,
1521
                   uint64_t* log_used = nullptr, uint64_t log_ref = 0,
1522
                   bool disable_memtable = false, uint64_t* seq_used = nullptr,
1523
                   size_t batch_cnt = 0,
1524
                   PreReleaseCallback* pre_release_callback = nullptr,
1525
                   PostMemTableCallback* post_memtable_callback = nullptr);
1526
1527
  Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
1528
                            WriteCallback* callback = nullptr,
1529
                            UserWriteCallback* user_write_cb = nullptr,
1530
                            uint64_t* log_used = nullptr, uint64_t log_ref = 0,
1531
                            bool disable_memtable = false,
1532
                            uint64_t* seq_used = nullptr);
1533
1534
  // Write only to memtables without joining any write queue
1535
  Status UnorderedWriteMemtable(const WriteOptions& write_options,
1536
                                WriteBatch* my_batch, WriteCallback* callback,
1537
                                uint64_t log_ref, SequenceNumber seq,
1538
                                const size_t sub_batch_cnt);
1539
1540
  // Whether the batch requires to be assigned with an order
1541
  enum AssignOrder : bool { kDontAssignOrder, kDoAssignOrder };
1542
  // Whether it requires publishing last sequence or not
1543
  enum PublishLastSeq : bool { kDontPublishLastSeq, kDoPublishLastSeq };
1544
1545
  // Join the write_thread to write the batch only to the WAL. It is the
1546
  // responsibility of the caller to also write the write batch to the memtable
1547
  // if it required.
1548
  //
1549
  // sub_batch_cnt is expected to be non-zero when assign_order = kDoAssignOrder
1550
  // indicating the number of sub-batches in my_batch. A sub-patch is a subset
1551
  // of the write batch that does not have duplicate keys. When seq_per_batch is
1552
  // not set, each key is a separate sub_batch. Otherwise each duplicate key
1553
  // marks start of a new sub-batch.
1554
  Status WriteImplWALOnly(
1555
      WriteThread* write_thread, const WriteOptions& options,
1556
      WriteBatch* updates, WriteCallback* callback,
1557
      UserWriteCallback* user_write_cb, uint64_t* log_used,
1558
      const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
1559
      PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
1560
      const PublishLastSeq publish_last_seq, const bool disable_memtable);
1561
1562
  // write cached_recoverable_state_ to memtable if it is not empty
1563
  // The writer must be the leader in write_thread_ and holding mutex_
1564
  Status WriteRecoverableState();
1565
1566
  // Actual implementation of Close()
1567
  virtual Status CloseImpl();
1568
1569
  // Recover the descriptor from persistent storage.  May do a significant
1570
  // amount of work to recover recently logged updates.  Any changes to
1571
  // be made to the descriptor are added to *edit.
1572
  // recovered_seq is set to less than kMaxSequenceNumber if the log's tail is
1573
  // skipped.
1574
  // recovery_ctx stores the context about version edits and all those
1575
  // edits are persisted to new Manifest after successfully syncing the new WAL.
1576
  virtual Status Recover(
1577
      const std::vector<ColumnFamilyDescriptor>& column_families,
1578
      bool read_only = false, bool error_if_wal_file_exists = false,
1579
      bool error_if_data_exists_in_wals = false, bool is_retry = false,
1580
      uint64_t* recovered_seq = nullptr,
1581
      RecoveryContext* recovery_ctx = nullptr, bool* can_retry = nullptr);
1582
1583
22.5k
  virtual bool OwnTablesAndLogs() const { return true; }
1584
1585
  // Setup DB identity file, and write DB ID to manifest if necessary.
1586
  Status SetupDBId(const WriteOptions& write_options, bool read_only,
1587
                   RecoveryContext* recovery_ctx);
1588
  // Assign db_id_ and write DB ID to manifest if necessary.
1589
  void SetDBId(std::string&& id, bool read_only, RecoveryContext* recovery_ctx);
1590
1591
  // Collect a deduplicated collection of paths used by this DB, including
1592
  // dbname_, DBOptions.db_paths, ColumnFamilyOptions.cf_paths.
1593
  std::set<std::string> CollectAllDBPaths();
1594
1595
  // REQUIRES: db mutex held when calling this function, but the db mutex can
1596
  // be released and re-acquired. Db mutex will be held when the function
1597
  // returns.
1598
  // It stores all existing data files (SST and Blob) in RecoveryContext. In
1599
  // the meantime, we find out the largest file number present in the paths, and
1600
  // bump up the version set's next_file_number_ to be 1 + largest_file_number.
1601
  // recovery_ctx stores the context about version edits. All those edits are
1602
  // persisted to new Manifest after successfully syncing the new WAL.
1603
  Status MaybeUpdateNextFileNumber(RecoveryContext* recovery_ctx);
1604
1605
  // Track existing data files, including both referenced and unreferenced SST
1606
  // and Blob files in SstFileManager. This is only called during DB::Open and
1607
  // it's called before any file deletion start so that their deletion can be
1608
  // properly rate limited.
1609
  // Files may not be referenced in the MANIFEST because (e.g.
1610
  // 1. It's best effort recovery;
1611
  // 2. The VersionEdits referencing the SST files are appended to
1612
  // RecoveryContext, DB crashes when syncing the MANIFEST, the VersionEdits are
1613
  // still not synced to MANIFEST during recovery.)
1614
  //
1615
  // If the file is referenced in Manifest (typically that's the
1616
  // vast majority of all files), since it already has the file size
1617
  // on record, we don't need to query the file system. Otherwise, we query the
1618
  // file system for the size of an unreferenced file.
1619
  void TrackExistingDataFiles(
1620
      const std::vector<std::string>& existing_data_files);
1621
1622
  // SetDbSessionId() should be called in the constuctor DBImpl()
1623
  // to ensure that db_session_id_ gets updated every time the DB is opened
1624
  void SetDbSessionId();
1625
1626
  Status FailIfCfHasTs(const ColumnFamilyHandle* column_family) const;
1627
  Status FailIfTsMismatchCf(ColumnFamilyHandle* column_family,
1628
                            const Slice& ts) const;
1629
1630
  // Check that the read timestamp `ts` is at or above the `full_history_ts_low`
1631
  // timestamp in a `SuperVersion`. It's necessary to do this check after
1632
  // grabbing the SuperVersion. If the check passed, the referenced SuperVersion
1633
  // this read holds on to can ensure the read won't be affected if
1634
  // `full_history_ts_low` is increased concurrently, and this achieves that
1635
  // without explicitly locking by piggybacking the SuperVersion.
1636
  Status FailIfReadCollapsedHistory(const ColumnFamilyData* cfd,
1637
                                    const SuperVersion* sv,
1638
                                    const Slice& ts) const;
1639
1640
  // recovery_ctx stores the context about version edits and
1641
  // LogAndApplyForRecovery persist all those edits to new Manifest after
1642
  // successfully syncing new WAL.
1643
  // LogAndApplyForRecovery should be called only once during recovery and it
1644
  // should be called when RocksDB writes to a first new MANIFEST since this
1645
  // recovery.
1646
  Status LogAndApplyForRecovery(const RecoveryContext& recovery_ctx);
1647
1648
  void InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
1649
1650
  // Return true to proceed with current WAL record whose content is stored in
1651
  // `batch`. Return false to skip current WAL record.
1652
  bool InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
1653
                                          const std::string& wal_fname,
1654
                                          log::Reader::Reporter& reporter,
1655
                                          Status& status, bool& stop_replay,
1656
                                          WriteBatch& batch);
1657
1658
 private:
1659
  friend class DB;
1660
  friend class ErrorHandler;
1661
  friend class InternalStats;
1662
  friend class PessimisticTransaction;
1663
  friend class TransactionBaseImpl;
1664
  friend class WriteCommittedTxn;
1665
  friend class WritePreparedTxn;
1666
  friend class WritePreparedTxnDB;
1667
  friend class WriteBatchWithIndex;
1668
  friend class WriteUnpreparedTxnDB;
1669
  friend class WriteUnpreparedTxn;
1670
1671
  friend class ForwardIterator;
1672
  friend struct SuperVersion;
1673
  friend class CompactedDBImpl;
1674
  friend class DBImplFollower;
1675
#ifndef NDEBUG
1676
  friend class DBTest_ConcurrentFlushWAL_Test;
1677
  friend class DBTest_MixedSlowdownOptionsStop_Test;
1678
  friend class DBCompactionTest_CompactBottomLevelFilesWithDeletions_Test;
1679
  friend class DBCompactionTest_CompactionDuringShutdown_Test;
1680
  friend class DBCompactionTest_DelayCompactBottomLevelFilesWithDeletions_Test;
1681
  friend class DBCompactionTest_DisableCompactBottomLevelFiles_Test;
1682
  friend class StatsHistoryTest_PersistentStatsCreateColumnFamilies_Test;
1683
  friend class DBTest2_ReadCallbackTest_Test;
1684
  friend class WriteCallbackPTest_WriteWithCallbackTest_Test;
1685
  friend class XFTransactionWriteHandler;
1686
  friend class DBBlobIndexTest;
1687
  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
1688
#endif
1689
1690
  struct CompactionState;
1691
  struct PrepickedCompaction;
1692
  struct PurgeFileInfo;
1693
1694
  struct WriteContext {
1695
    SuperVersionContext superversion_context;
1696
    autovector<MemTable*> memtables_to_free_;
1697
1698
    explicit WriteContext(bool create_superversion = false)
1699
1.16M
        : superversion_context(create_superversion) {}
1700
1701
1.16M
    ~WriteContext() {
1702
1.16M
      superversion_context.Clean();
1703
1.16M
      for (auto& m : memtables_to_free_) {
1704
0
        delete m;
1705
0
      }
1706
1.16M
    }
1707
  };
1708
1709
  struct LogFileNumberSize {
1710
17.0k
    explicit LogFileNumberSize(uint64_t _number) : number(_number) {}
1711
0
    LogFileNumberSize() {}
1712
1.16M
    void AddSize(uint64_t new_size) { size += new_size; }
1713
    uint64_t number;
1714
    uint64_t size = 0;
1715
    bool getting_flushed = false;
1716
  };
1717
1718
  struct LogWriterNumber {
1719
    // pass ownership of _writer
1720
    LogWriterNumber(uint64_t _number, log::Writer* _writer)
1721
11.2k
        : number(_number), writer(_writer) {}
1722
1723
133
    log::Writer* ReleaseWriter() {
1724
133
      auto* w = writer;
1725
133
      writer = nullptr;
1726
133
      return w;
1727
133
    }
1728
11.0k
    Status ClearWriter() {
1729
11.0k
      Status s;
1730
11.0k
      if (writer->file()) {
1731
        // TODO: plumb Env::IOActivity, Env::IOPriority
1732
11.0k
        s = writer->WriteBuffer(WriteOptions());
1733
11.0k
      }
1734
11.0k
      delete writer;
1735
11.0k
      writer = nullptr;
1736
11.0k
      return s;
1737
11.0k
    }
1738
1739
133
    bool IsSyncing() { return getting_synced; }
1740
1741
0
    uint64_t GetPreSyncSize() {
1742
0
      assert(getting_synced);
1743
0
      return pre_sync_size;
1744
0
    }
1745
1746
133
    void PrepareForSync() {
1747
133
      assert(!getting_synced);
1748
      // Ensure the head of logs_ is marked as getting_synced if any is.
1749
133
      getting_synced = true;
1750
      // If last sync failed on a later WAL, this could be a fully synced
1751
      // and closed WAL that just needs to be recorded as synced in the
1752
      // manifest.
1753
133
      if (writer->file()) {
1754
        // Size is expected to be monotonically increasing.
1755
133
        assert(writer->file()->GetFlushedSize() >= pre_sync_size);
1756
133
        pre_sync_size = writer->file()->GetFlushedSize();
1757
133
      }
1758
133
    }
1759
1760
133
    void FinishSync() {
1761
133
      assert(getting_synced);
1762
133
      getting_synced = false;
1763
133
    }
1764
1765
    uint64_t number;
1766
    // Visual Studio doesn't support deque's member to be noncopyable because
1767
    // of a std::unique_ptr as a member.
1768
    log::Writer* writer;  // own
1769
1770
   private:
1771
    // true for some prefix of logs_
1772
    bool getting_synced = false;
1773
    // The size of the file before the sync happens. This amount is guaranteed
1774
    // to be persisted even if appends happen during sync so it can be used for
1775
    // tracking the synced size in MANIFEST.
1776
    uint64_t pre_sync_size = 0;
1777
  };
1778
1779
  struct LogContext {
1780
    explicit LogContext(bool need_sync = false)
1781
1.16M
        : need_log_sync(need_sync), need_log_dir_sync(need_sync) {}
1782
    bool need_log_sync = false;
1783
    bool need_log_dir_sync = false;
1784
    log::Writer* writer = nullptr;
1785
    LogFileNumberSize* log_file_number_size = nullptr;
1786
  };
1787
1788
  // PurgeFileInfo is a structure to hold information of files to be deleted in
1789
  // purge_files_
1790
  struct PurgeFileInfo {
1791
    std::string fname;
1792
    std::string dir_to_sync;
1793
    FileType type;
1794
    uint64_t number;
1795
    int job_id;
1796
    PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num,
1797
                  int jid)
1798
0
        : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {}
1799
  };
1800
1801
  // Argument required by background flush thread.
1802
  struct BGFlushArg {
1803
    BGFlushArg()
1804
        : cfd_(nullptr),
1805
          max_memtable_id_(0),
1806
          superversion_context_(nullptr),
1807
0
          flush_reason_(FlushReason::kOthers) {}
1808
    BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id,
1809
               SuperVersionContext* superversion_context,
1810
               FlushReason flush_reason)
1811
        : cfd_(cfd),
1812
          max_memtable_id_(max_memtable_id),
1813
          superversion_context_(superversion_context),
1814
133
          flush_reason_(flush_reason) {}
1815
1816
    // Column family to flush.
1817
    ColumnFamilyData* cfd_;
1818
    // Maximum ID of memtable to flush. In this column family, memtables with
1819
    // IDs smaller than this value must be flushed before this flush completes.
1820
    uint64_t max_memtable_id_;
1821
    // Pointer to a SuperVersionContext object. After flush completes, RocksDB
1822
    // installs a new superversion for the column family. This operation
1823
    // requires a SuperVersionContext object (currently embedded in JobContext).
1824
    SuperVersionContext* superversion_context_;
1825
    FlushReason flush_reason_;
1826
  };
1827
1828
  // Argument passed to flush thread.
1829
  struct FlushThreadArg {
1830
    DBImpl* db_;
1831
1832
    Env::Priority thread_pri_;
1833
  };
1834
1835
  // Information for a manual compaction
1836
  struct ManualCompactionState {
1837
    ManualCompactionState(ColumnFamilyData* _cfd, int _input_level,
1838
                          int _output_level, uint32_t _output_path_id,
1839
                          bool _exclusive, bool _disallow_trivial_move,
1840
                          std::atomic<bool>* _canceled)
1841
        : cfd(_cfd),
1842
          input_level(_input_level),
1843
          output_level(_output_level),
1844
          output_path_id(_output_path_id),
1845
          exclusive(_exclusive),
1846
          disallow_trivial_move(_disallow_trivial_move),
1847
159
          canceled(_canceled ? *_canceled : canceled_internal_storage) {}
1848
    // When _canceled is not provided by ther user, we assign the reference of
1849
    // canceled_internal_storage to it to consolidate canceled and
1850
    // manual_compaction_paused since DisableManualCompaction() might be
1851
    // called
1852
1853
    ColumnFamilyData* cfd;
1854
    int input_level;
1855
    int output_level;
1856
    uint32_t output_path_id;
1857
    Status status;
1858
    bool done = false;
1859
    bool in_progress = false;    // compaction request being processed?
1860
    bool incomplete = false;     // only part of requested range compacted
1861
    bool exclusive;              // current behavior of only one manual
1862
    bool disallow_trivial_move;  // Force actual compaction to run
1863
    const InternalKey* begin = nullptr;  // nullptr means beginning of key range
1864
    const InternalKey* end = nullptr;    // nullptr means end of key range
1865
    InternalKey* manual_end = nullptr;   // how far we are compacting
1866
    InternalKey tmp_storage;      // Used to keep track of compaction progress
1867
    InternalKey tmp_storage1;     // Used to keep track of compaction progress
1868
1869
    // When the user provides a canceled pointer in CompactRangeOptions, the
1870
    // above varaibe is the reference of the user-provided
1871
    // `canceled`, otherwise, it is the reference of canceled_internal_storage
1872
    std::atomic<bool> canceled_internal_storage = false;
1873
    std::atomic<bool>& canceled;  // Compaction canceled pointer reference
1874
  };
1875
  struct PrepickedCompaction {
1876
    // background compaction takes ownership of `compaction`.
1877
    Compaction* compaction;
1878
    // caller retains ownership of `manual_compaction_state` as it is reused
1879
    // across background compactions.
1880
    ManualCompactionState* manual_compaction_state;  // nullptr if non-manual
1881
    // task limiter token is requested during compaction picking.
1882
    std::unique_ptr<TaskLimiterToken> task_token;
1883
  };
1884
1885
  struct CompactionArg {
1886
    // caller retains ownership of `db`.
1887
    DBImpl* db;
1888
    // background compaction takes ownership of `prepicked_compaction`.
1889
    PrepickedCompaction* prepicked_compaction;
1890
    Env::Priority compaction_pri_;
1891
  };
1892
1893
0
  static bool IsRecoveryFlush(FlushReason flush_reason) {
1894
0
    return flush_reason == FlushReason::kErrorRecoveryRetryFlush ||
1895
0
           flush_reason == FlushReason::kErrorRecovery;
1896
0
  }
1897
  // Initialize the built-in column family for persistent stats. Depending on
1898
  // whether on-disk persistent stats have been enabled before, it may either
1899
  // create a new column family and column family handle or just a column family
1900
  // handle.
1901
  // Required: DB mutex held
1902
  Status InitPersistStatsColumnFamily();
1903
1904
  // Persistent Stats column family has two format version key which are used
1905
  // for compatibility check. Write format version if it's created for the
1906
  // first time, read format version and check compatibility if recovering
1907
  // from disk. This function requires DB mutex held at entrance but may
1908
  // release and re-acquire DB mutex in the process.
1909
  // Required: DB mutex held
1910
  Status PersistentStatsProcessFormatVersion();
1911
1912
  Status ResumeImpl(DBRecoverContext context);
1913
1914
  void MaybeIgnoreError(Status* s) const;
1915
1916
  const Status CreateArchivalDirectory();
1917
1918
  // Create a column family, without some of the follow-up work yet
1919
  Status CreateColumnFamilyImpl(const ReadOptions& read_options,
1920
                                const WriteOptions& write_options,
1921
                                const ColumnFamilyOptions& cf_options,
1922
                                const std::string& cf_name,
1923
                                ColumnFamilyHandle** handle);
1924
1925
  // Follow-up work to user creating a column family or (families)
1926
  Status WrapUpCreateColumnFamilies(
1927
      const ReadOptions& read_options, const WriteOptions& write_options,
1928
      const std::vector<const ColumnFamilyOptions*>& cf_options);
1929
1930
  Status DropColumnFamilyImpl(ColumnFamilyHandle* column_family);
1931
1932
  // Delete any unneeded files and stale in-memory entries.
1933
  void DeleteObsoleteFiles();
1934
  // Delete obsolete files and log status and information of file deletion
1935
  void DeleteObsoleteFileImpl(int job_id, const std::string& fname,
1936
                              const std::string& path_to_sync, FileType type,
1937
                              uint64_t number);
1938
1939
  // Background process needs to call
1940
  //     auto x = CaptureCurrentFileNumberInPendingOutputs()
1941
  //     auto file_num = versions_->NewFileNumber();
1942
  //     <do something>
1943
  //     ReleaseFileNumberFromPendingOutputs(x)
1944
  // This will protect any file with number `file_num` or greater from being
1945
  // deleted while <do something> is running.
1946
  // -----------
1947
  // This function will capture current file number and append it to
1948
  // pending_outputs_. This will prevent any background process to delete any
1949
  // file created after this point.
1950
  std::list<uint64_t>::iterator CaptureCurrentFileNumberInPendingOutputs();
1951
  // This function should be called with the result of
1952
  // CaptureCurrentFileNumberInPendingOutputs(). It then marks that any file
1953
  // created between the calls CaptureCurrentFileNumberInPendingOutputs() and
1954
  // ReleaseFileNumberFromPendingOutputs() can now be deleted (if it's not live
1955
  // and blocked by any other pending_outputs_ calls)
1956
  void ReleaseFileNumberFromPendingOutputs(
1957
      std::unique_ptr<std::list<uint64_t>::iterator>& v);
1958
1959
  IOStatus SyncClosedWals(const WriteOptions& write_options,
1960
                          JobContext* job_context, VersionEdit* synced_wals,
1961
                          bool error_recovery_in_prog);
1962
1963
  // Flush the in-memory write buffer to storage.  Switches to a new
1964
  // log-file/memtable and writes a new descriptor iff successful. Then
1965
  // installs a new super version for the column family.
1966
  Status FlushMemTableToOutputFile(
1967
      ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
1968
      bool* madeProgress, JobContext* job_context, FlushReason flush_reason,
1969
      SuperVersionContext* superversion_context,
1970
      std::vector<SequenceNumber>& snapshot_seqs,
1971
      SequenceNumber earliest_write_conflict_snapshot,
1972
      SnapshotChecker* snapshot_checker, LogBuffer* log_buffer,
1973
      Env::Priority thread_pri);
1974
1975
  // Flush the memtables of (multiple) column families to multiple files on
1976
  // persistent storage.
1977
  Status FlushMemTablesToOutputFiles(
1978
      const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
1979
      JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
1980
1981
  Status AtomicFlushMemTablesToOutputFiles(
1982
      const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
1983
      JobContext* job_context, LogBuffer* log_buffer, Env::Priority thread_pri);
1984
1985
  // REQUIRES: log_numbers are sorted in ascending order
1986
  // corrupted_log_found is set to true if we recover from a corrupted log file.
1987
  Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
1988
                         SequenceNumber* next_sequence, bool read_only,
1989
                         bool is_retry, bool* corrupted_log_found,
1990
                         RecoveryContext* recovery_ctx);
1991
1992
  // The following two methods are used to flush a memtable to
1993
  // storage. The first one is used at database RecoveryTime (when the
1994
  // database is opened) and is heavyweight because it holds the mutex
1995
  // for the entire period. The second method WriteLevel0Table supports
1996
  // concurrent flush memtables to storage.
1997
  Status WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1998
                                     MemTable* mem, VersionEdit* edit);
1999
2000
  // Get the size of a log file and, if truncate is true, truncate the
2001
  // log file to its actual size, thereby freeing preallocated space.
2002
  // Return success even if truncate fails
2003
  Status GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
2004
                                    LogFileNumberSize* log);
2005
2006
  // Restore alive_log_files_ and total_log_size_ after recovery.
2007
  // It needs to run only when there's no flush during recovery
2008
  // (e.g. avoid_flush_during_recovery=true). May also trigger flush
2009
  // in case total_log_size > max_total_wal_size.
2010
  Status RestoreAliveLogFiles(const std::vector<uint64_t>& log_numbers);
2011
2012
  // num_bytes: for slowdown case, delay time is calculated based on
2013
  //            `num_bytes` going through.
2014
  Status DelayWrite(uint64_t num_bytes, WriteThread& write_thread,
2015
                    const WriteOptions& write_options);
2016
2017
  // Begin stalling of writes when memory usage increases beyond a certain
2018
  // threshold.
2019
  void WriteBufferManagerStallWrites();
2020
2021
  Status ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
2022
                                      WriteBatch* my_batch);
2023
2024
  // REQUIRES: mutex locked and in write thread.
2025
  Status ScheduleFlushes(WriteContext* context);
2026
2027
  void MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds);
2028
2029
  Status TrimMemtableHistory(WriteContext* context);
2030
2031
  Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
2032
2033
  // Select and output column families qualified for atomic flush in
2034
  // `selected_cfds`. If `provided_candidate_cfds` is non-empty, it will be used
2035
  // as candidate CFs to select qualified ones from. Otherwise, all column
2036
  // families are used as candidate to select from.
2037
  //
2038
  // REQUIRES: mutex held
2039
  void SelectColumnFamiliesForAtomicFlush(
2040
      autovector<ColumnFamilyData*>* selected_cfds,
2041
      const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},
2042
      FlushReason flush_reason = FlushReason::kOthers);
2043
2044
  // Force current memtable contents to be flushed.
2045
  Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
2046
                       FlushReason flush_reason,
2047
                       bool entered_write_thread = false);
2048
2049
  // Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds`
2050
  // (if non-empty) or amomg all column families and atomically record the
2051
  // result to the MANIFEST.
2052
  Status AtomicFlushMemTables(
2053
      const FlushOptions& options, FlushReason flush_reason,
2054
      const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},
2055
      bool entered_write_thread = false);
2056
2057
  Status RetryFlushesForErrorRecovery(FlushReason flush_reason, bool wait);
2058
2059
  // Wait until flushing this column family won't stall writes
2060
  Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
2061
                                           bool* flush_needed);
2062
2063
  // Wait for memtable flushed.
2064
  // If flush_memtable_id is non-null, wait until the memtable with the ID
2065
  // gets flush. Otherwise, wait until the column family don't have any
2066
  // memtable pending flush.
2067
  // resuming_from_bg_err indicates whether the caller is attempting to resume
2068
  // from background error.
2069
  Status WaitForFlushMemTable(ColumnFamilyData* cfd,
2070
                              const uint64_t* flush_memtable_id = nullptr,
2071
0
                              bool resuming_from_bg_err = false) {
2072
0
    return WaitForFlushMemTables({cfd}, {flush_memtable_id},
2073
0
                                 resuming_from_bg_err);
2074
0
  }
2075
  // Wait for memtables to be flushed for multiple column families.
2076
  Status WaitForFlushMemTables(
2077
      const autovector<ColumnFamilyData*>& cfds,
2078
      const autovector<const uint64_t*>& flush_memtable_ids,
2079
      bool resuming_from_bg_err);
2080
2081
133
  inline void WaitForPendingWrites() {
2082
133
    mutex_.AssertHeld();
2083
133
    TEST_SYNC_POINT("DBImpl::WaitForPendingWrites:BeforeBlock");
2084
    // In case of pipelined write is enabled, wait for all pending memtable
2085
    // writers.
2086
133
    if (immutable_db_options_.enable_pipelined_write) {
2087
      // Memtable writers may call DB::Get in case max_successive_merges > 0,
2088
      // which may lock mutex. Unlocking mutex here to avoid deadlock.
2089
0
      mutex_.Unlock();
2090
0
      write_thread_.WaitForMemTableWriters();
2091
0
      mutex_.Lock();
2092
0
    }
2093
2094
133
    if (immutable_db_options_.unordered_write) {
2095
      // Wait for the ones who already wrote to the WAL to finish their
2096
      // memtable write.
2097
0
      if (pending_memtable_writes_.load() != 0) {
2098
        // XXX: suspicious wait while holding DB mutex?
2099
0
        std::unique_lock<std::mutex> guard(switch_mutex_);
2100
0
        switch_cv_.wait(guard,
2101
0
                        [&] { return pending_memtable_writes_.load() == 0; });
2102
0
      }
2103
133
    } else {
2104
      // (Writes are finished before the next write group starts.)
2105
133
    }
2106
2107
    // Wait for any LockWAL to clear
2108
133
    while (lock_wal_count_ > 0) {
2109
0
      bg_cv_.Wait();
2110
0
    }
2111
133
  }
2112
2113
  // TaskType is used to identify tasks in thread-pool, currently only
2114
  // differentiate manual compaction, which could be unscheduled from the
2115
  // thread-pool.
2116
  enum class TaskType : uint8_t {
2117
    kDefault = 0,
2118
    kManualCompaction = 1,
2119
    kCount = 2,
2120
  };
2121
2122
  // Task tag is used to identity tasks in thread-pool, which is
2123
  // dbImpl obj address + type
2124
146
  inline void* GetTaskTag(TaskType type) {
2125
146
    return GetTaskTag(static_cast<uint8_t>(type));
2126
146
  }
2127
2128
66.6k
  inline void* GetTaskTag(uint8_t type) {
2129
66.6k
    return static_cast<uint8_t*>(static_cast<void*>(this)) + type;
2130
66.6k
  }
2131
2132
  // REQUIRES: mutex locked and in write thread.
2133
  void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);
2134
2135
  // REQUIRES: mutex locked and in write thread.
2136
  Status SwitchWAL(WriteContext* write_context);
2137
2138
  // REQUIRES: mutex locked and in write thread.
2139
  Status HandleWriteBufferManagerFlush(WriteContext* write_context);
2140
2141
  // REQUIRES: mutex locked
2142
  Status PreprocessWrite(const WriteOptions& write_options,
2143
                         LogContext* log_context, WriteContext* write_context);
2144
2145
  // Merge write batches in the write group into merged_batch.
2146
  // Returns OK if merge is successful.
2147
  // Returns Corruption if corruption in write batch is detected.
2148
  Status MergeBatch(const WriteThread::WriteGroup& write_group,
2149
                    WriteBatch* tmp_batch, WriteBatch** merged_batch,
2150
                    size_t* write_with_wal, WriteBatch** to_be_cached_state);
2151
2152
  IOStatus WriteToWAL(const WriteBatch& merged_batch,
2153
                      const WriteOptions& write_options,
2154
                      log::Writer* log_writer, uint64_t* log_used,
2155
                      uint64_t* log_size,
2156
                      LogFileNumberSize& log_file_number_size);
2157
2158
  IOStatus WriteToWAL(const WriteThread::WriteGroup& write_group,
2159
                      log::Writer* log_writer, uint64_t* log_used,
2160
                      bool need_log_sync, bool need_log_dir_sync,
2161
                      SequenceNumber sequence,
2162
                      LogFileNumberSize& log_file_number_size);
2163
2164
  IOStatus ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
2165
                                uint64_t* log_used,
2166
                                SequenceNumber* last_sequence, size_t seq_inc);
2167
2168
  // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
2169
  // Caller must hold mutex_.
2170
  void WriteStatusCheckOnLocked(const Status& status);
2171
2172
  // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
2173
  void WriteStatusCheck(const Status& status);
2174
2175
  // Used by WriteImpl to update bg_error_ when IO error happens, e.g., write
2176
  // WAL, sync WAL fails, if paranoid check is enabled.
2177
  void IOStatusCheck(const IOStatus& status);
2178
2179
  // Used by WriteImpl to update bg_error_ in case of memtable insert error.
2180
  void MemTableInsertStatusCheck(const Status& memtable_insert_status);
2181
2182
  Status CompactFilesImpl(const CompactionOptions& compact_options,
2183
                          ColumnFamilyData* cfd, Version* version,
2184
                          const std::vector<std::string>& input_file_names,
2185
                          std::vector<std::string>* const output_file_names,
2186
                          const int output_level, int output_path_id,
2187
                          JobContext* job_context, LogBuffer* log_buffer,
2188
                          CompactionJobInfo* compaction_job_info);
2189
2190
  ColumnFamilyData* GetColumnFamilyDataByName(const std::string& cf_name);
2191
2192
  void MaybeScheduleFlushOrCompaction();
2193
2194
  struct FlushRequest {
2195
    FlushReason flush_reason;
2196
    // A map from column family to flush to largest memtable id to persist for
2197
    // each column family. Once all the memtables whose IDs are smaller than or
2198
    // equal to this per-column-family specified value, this flush request is
2199
    // considered to have completed its work of flushing this column family.
2200
    // After completing the work for all column families in this request, this
2201
    // flush is considered complete.
2202
    std::unordered_map<ColumnFamilyData*, uint64_t>
2203
        cfd_to_max_mem_id_to_persist;
2204
2205
#ifndef NDEBUG
2206
    int reschedule_count = 1;
2207
#endif /* !NDEBUG */
2208
  };
2209
2210
  // In case of atomic flush, generates a `FlushRequest` for the latest atomic
2211
  // cuts for these `cfds`. Atomic cuts are recorded in
2212
  // `AssignAtomicFlushSeq()`. For each entry in `cfds`, all CFDs sharing the
2213
  // same latest atomic cut must also be present.
2214
  //
2215
  // REQUIRES: mutex held
2216
  void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
2217
                            FlushReason flush_reason, FlushRequest* req);
2218
2219
  // Below functions are for executing flush, compaction in the background. A
2220
  // dequeue is the communication channel between threads that asks for the work
2221
  // to be done and the available threads in the thread pool that pick it up to
2222
  // execute it. We use these terminologies to describe the state of the work
2223
  // and its transitions:
2224
  // 1) It becomes pending once it's successfully enqueued into the
2225
  //    corresponding dequeue, a work in this state is also called unscheduled.
2226
  //    Counter `unscheduled_*_` counts work in this state.
2227
  // 2) When `MaybeScheduleFlushOrCompaction` schedule a thread to run `BGWork*`
2228
  //    for the work, it becomes scheduled
2229
  //    Counter `bg_*_scheduled_` counts work in this state.
2230
  // 3) Once the thread start to execute `BGWork*`, the work is popped from the
2231
  //    dequeue, it is now in running state
2232
  //    Counter `num_running_*_` counts work in this state.
2233
  // 4) Eventually, the work is finished. We don't need to specifically track
2234
  //    finished work.
2235
2236
  // Returns true if `req` is successfully enqueued.
2237
  bool EnqueuePendingFlush(const FlushRequest& req);
2238
2239
  void EnqueuePendingCompaction(ColumnFamilyData* cfd);
2240
  void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
2241
                            FileType type, uint64_t number, int job_id);
2242
  static void BGWorkCompaction(void* arg);
2243
  // Runs a pre-chosen universal compaction involving bottom level in a
2244
  // separate, bottom-pri thread pool.
2245
  static void BGWorkBottomCompaction(void* arg);
2246
  static void BGWorkFlush(void* arg);
2247
  static void BGWorkPurge(void* arg);
2248
  static void UnscheduleCompactionCallback(void* arg);
2249
  static void UnscheduleFlushCallback(void* arg);
2250
  void BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
2251
                                Env::Priority thread_pri);
2252
  void BackgroundCallFlush(Env::Priority thread_pri);
2253
  void BackgroundCallPurge();
2254
  Status BackgroundCompaction(bool* madeProgress, JobContext* job_context,
2255
                              LogBuffer* log_buffer,
2256
                              PrepickedCompaction* prepicked_compaction,
2257
                              Env::Priority thread_pri);
2258
  Status BackgroundFlush(bool* madeProgress, JobContext* job_context,
2259
                         LogBuffer* log_buffer, FlushReason* reason,
2260
                         bool* flush_rescheduled_to_retain_udt,
2261
                         Env::Priority thread_pri);
2262
2263
  bool EnoughRoomForCompaction(ColumnFamilyData* cfd,
2264
                               const std::vector<CompactionInputFiles>& inputs,
2265
                               bool* sfm_bookkeeping, LogBuffer* log_buffer);
2266
2267
  // Request compaction tasks token from compaction thread limiter.
2268
  // It always succeeds if force = true or limiter is disable.
2269
  bool RequestCompactionToken(ColumnFamilyData* cfd, bool force,
2270
                              std::unique_ptr<TaskLimiterToken>* token,
2271
                              LogBuffer* log_buffer);
2272
2273
  // Return true if the `FlushRequest` can be rescheduled to retain the UDT.
2274
  // Only true if there are user-defined timestamps in the involved MemTables
2275
  // with newer than cutoff timestamp `full_history_ts_low` and not flushing
2276
  // immediately will not cause entering write stall mode.
2277
  bool ShouldRescheduleFlushRequestToRetainUDT(const FlushRequest& flush_req);
2278
2279
  // Schedule background tasks
2280
  Status StartPeriodicTaskScheduler();
2281
2282
  // Cancel scheduled periodic tasks
2283
  Status CancelPeriodicTaskScheduler();
2284
2285
  Status RegisterRecordSeqnoTimeWorker(const ReadOptions& read_options,
2286
                                       const WriteOptions& write_options,
2287
                                       bool is_new_db);
2288
2289
  void PrintStatistics();
2290
2291
  size_t EstimateInMemoryStatsHistorySize() const;
2292
2293
  // Return the minimum empty level that could hold the total data in the
2294
  // input level. Return the input level, if such level could not be found.
2295
  int FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
2296
                                   const MutableCFOptions& mutable_cf_options,
2297
                                   int level);
2298
2299
  // Move the files in the input level to the target level.
2300
  // If target_level < 0, automatically calculate the minimum level that could
2301
  // hold the data set.
2302
  Status ReFitLevel(ColumnFamilyData* cfd, int level, int target_level = -1);
2303
2304
  // helper functions for adding and removing from flush & compaction queues
2305
  void AddToCompactionQueue(ColumnFamilyData* cfd);
2306
  ColumnFamilyData* PopFirstFromCompactionQueue();
2307
  FlushRequest PopFirstFromFlushQueue();
2308
2309
  // Pick the first unthrottled compaction with task token from queue.
2310
  ColumnFamilyData* PickCompactionFromQueue(
2311
      std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);
2312
2313
  IOStatus SyncWalImpl(bool include_current_wal,
2314
                       const WriteOptions& write_options,
2315
                       JobContext* job_context, VersionEdit* synced_wals,
2316
                       bool error_recovery_in_prog);
2317
2318
  // helper function to call after some of the logs_ were synced
2319
  void MarkLogsSynced(uint64_t up_to, bool synced_dir, VersionEdit* edit);
2320
  Status ApplyWALToManifest(const ReadOptions& read_options,
2321
                            const WriteOptions& write_options,
2322
                            VersionEdit* edit);
2323
  // WALs with log number up to up_to are not synced successfully.
2324
  void MarkLogsNotSynced(uint64_t up_to);
2325
2326
  SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary,
2327
                                bool lock = true);
2328
2329
  // If snapshot_seq != kMaxSequenceNumber, then this function can only be
2330
  // called from the write thread that publishes sequence numbers to readers.
2331
  // For 1) write-committed, or 2) write-prepared + one-write-queue, this will
2332
  // be the write thread performing memtable writes. For write-prepared with
2333
  // two write queues, this will be the write thread writing commit marker to
2334
  // the WAL.
2335
  // If snapshot_seq == kMaxSequenceNumber, this function is called by a caller
2336
  // ensuring no writes to the database.
2337
  std::pair<Status, std::shared_ptr<const SnapshotImpl>>
2338
  CreateTimestampedSnapshotImpl(SequenceNumber snapshot_seq, uint64_t ts,
2339
                                bool lock = true);
2340
2341
  uint64_t GetMaxTotalWalSize() const;
2342
2343
  FSDirectory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const;
2344
2345
  Status MaybeReleaseTimestampedSnapshotsAndCheck();
2346
2347
  Status CloseHelper();
2348
2349
  void WaitForBackgroundWork();
2350
2351
  // Background threads call this function, which is just a wrapper around
2352
  // the InstallSuperVersion() function. Background threads carry
2353
  // sv_context which can have new_superversion already
2354
  // allocated.
2355
  // All ColumnFamily state changes go through this function. Here we analyze
2356
  // the new state and we schedule background work if we detect that the new
2357
  // state needs flush or compaction.
2358
  void InstallSuperVersionAndScheduleWork(
2359
      ColumnFamilyData* cfd, SuperVersionContext* sv_context,
2360
      const MutableCFOptions& mutable_cf_options);
2361
2362
  bool GetIntPropertyInternal(ColumnFamilyData* cfd,
2363
                              const DBPropertyInfo& property_info,
2364
                              bool is_locked, uint64_t* value);
2365
  bool GetPropertyHandleOptionsStatistics(std::string* value);
2366
2367
  bool HasPendingManualCompaction();
2368
  bool HasExclusiveManualCompaction();
2369
  void AddManualCompaction(ManualCompactionState* m);
2370
  void RemoveManualCompaction(ManualCompactionState* m);
2371
  bool ShouldntRunManualCompaction(ManualCompactionState* m);
2372
  bool HaveManualCompaction(ColumnFamilyData* cfd);
2373
  bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1);
2374
  void UpdateDeletionCompactionStats(const std::unique_ptr<Compaction>& c);
2375
2376
  // May open and read table files for table property.
2377
  // Should not be called while holding mutex_.
2378
  void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c,
2379
                              const Status& st,
2380
                              const CompactionJobStats& compaction_job_stats,
2381
                              const int job_id,
2382
                              CompactionJobInfo* compaction_job_info) const;
2383
  // Reserve the next 'num' file numbers for to-be-ingested external SST files,
2384
  // and return the current file_number in 'next_file_number'.
2385
  // Write a version edit to the MANIFEST.
2386
  Status ReserveFileNumbersBeforeIngestion(
2387
      ColumnFamilyData* cfd, uint64_t num,
2388
      std::unique_ptr<std::list<uint64_t>::iterator>& pending_output_elem,
2389
      uint64_t* next_file_number);
2390
2391
  bool ShouldPurge(uint64_t file_number) const;
2392
  void MarkAsGrabbedForPurge(uint64_t file_number);
2393
2394
  size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
2395
11.2k
  Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; }
2396
2397
  IOStatus CreateWAL(const WriteOptions& write_options, uint64_t log_file_num,
2398
                     uint64_t recycle_log_number, size_t preallocate_block_size,
2399
                     log::Writer** new_log);
2400
2401
  // Validate self-consistency of DB options
2402
  static Status ValidateOptions(const DBOptions& db_options);
2403
  // Validate self-consistency of DB options and its consistency with cf options
2404
  static Status ValidateOptions(
2405
      const DBOptions& db_options,
2406
      const std::vector<ColumnFamilyDescriptor>& column_families);
2407
2408
  // Utility function to do some debug validation and sort the given vector
2409
  // of MultiGet keys
2410
  void PrepareMultiGetKeys(
2411
      const size_t num_keys, bool sorted,
2412
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* key_ptrs);
2413
2414
  void MultiGetCommon(const ReadOptions& options,
2415
                      ColumnFamilyHandle* column_family, const size_t num_keys,
2416
                      const Slice* keys, PinnableSlice* values,
2417
                      PinnableWideColumns* columns, std::string* timestamps,
2418
                      Status* statuses, bool sorted_input);
2419
2420
  void MultiGetCommon(const ReadOptions& options, const size_t num_keys,
2421
                      ColumnFamilyHandle** column_families, const Slice* keys,
2422
                      PinnableSlice* values, PinnableWideColumns* columns,
2423
                      std::string* timestamps, Status* statuses,
2424
                      bool sorted_input);
2425
2426
  // A structure to hold the information required to process MultiGet of keys
2427
  // belonging to one column family. For a multi column family MultiGet, there
2428
  // will be a container of these objects.
2429
  struct MultiGetKeyRangePerCf {
2430
    // For the batched MultiGet which relies on sorted keys, start specifies
2431
    // the index of first key belonging to this column family in the sorted
2432
    // list.
2433
    size_t start;
2434
2435
    // For the batched MultiGet case, num_keys specifies the number of keys
2436
    // belonging to this column family in the sorted list
2437
    size_t num_keys;
2438
2439
0
    MultiGetKeyRangePerCf() : start(0), num_keys(0) {}
2440
2441
    MultiGetKeyRangePerCf(size_t first, size_t count)
2442
0
        : start(first), num_keys(count) {}
2443
  };
2444
2445
  // A structure to contain ColumnFamilyData and the SuperVersion obtained for
2446
  // the consistent view of DB
2447
  struct ColumnFamilySuperVersionPair {
2448
    ColumnFamilyHandleImpl* cfh;
2449
    ColumnFamilyData* cfd;
2450
2451
    // SuperVersion for the column family obtained in a manner that ensures a
2452
    // consistent view across all column families in the DB
2453
    SuperVersion* super_version;
2454
    ColumnFamilySuperVersionPair(ColumnFamilyHandle* column_family,
2455
                                 SuperVersion* sv)
2456
        : cfh(static_cast<ColumnFamilyHandleImpl*>(column_family)),
2457
          cfd(cfh->cfd()),
2458
0
          super_version(sv) {}
2459
2460
    ColumnFamilySuperVersionPair() = default;
2461
  };
2462
2463
  // A common function to obtain a consistent snapshot, which can be implicit
2464
  // if the user doesn't specify a snapshot in read_options, across
2465
  // multiple column families. It will attempt to get an implicit
2466
  // snapshot without acquiring the db_mutes, but will give up after a few
2467
  // tries and acquire the mutex if a memtable flush happens. The template
2468
  // allows both the batched and non-batched MultiGet to call this with
2469
  // either an std::unordered_map or autovector of column families.
2470
  //
2471
  // If callback is non-null, the callback is refreshed with the snapshot
2472
  // sequence number
2473
  //
2474
  // `extra_sv_ref` is used to indicate whether thread-local SuperVersion
2475
  // should be obtained with an extra ref (by GetReferencedSuperVersion()) or
2476
  // not (by GetAndRefSuperVersion()). For instance, point lookup like MultiGet
2477
  // does not require SuperVersion to be re-acquired throughout the entire
2478
  // invocation (no need extra ref), while MultiCfIterators may need the
2479
  // SuperVersion to be updated during Refresh() (requires extra ref).
2480
  //
2481
  // `sv_from_thread_local` being set to false indicates that the SuperVersion
2482
  // obtained from the ColumnFamilyData, whereas true indicates they are thread
2483
  // local.
2484
  //
2485
  // A non-OK status will be returned if for a column family that enables
2486
  // user-defined timestamp feature, the specified `ReadOptions.timestamp`
2487
  // attemps to read collapsed history.
2488
  template <class T, typename IterDerefFuncType>
2489
  Status MultiCFSnapshot(const ReadOptions& read_options,
2490
                         ReadCallback* callback,
2491
                         IterDerefFuncType iter_deref_func, T* cf_list,
2492
                         bool extra_sv_ref, SequenceNumber* snapshot,
2493
                         bool* sv_from_thread_local);
2494
2495
  // The actual implementation of the batching MultiGet. The caller is expected
2496
  // to have acquired the SuperVersion and pass in a snapshot sequence number
2497
  // in order to construct the LookupKeys. The start_key and num_keys specify
2498
  // the range of keys in the sorted_keys vector for a single column family.
2499
  Status MultiGetImpl(
2500
      const ReadOptions& read_options, size_t start_key, size_t num_keys,
2501
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
2502
      SuperVersion* sv, SequenceNumber snap_seqnum, ReadCallback* callback);
2503
2504
  void MultiGetWithCallbackImpl(
2505
      const ReadOptions& read_options, ColumnFamilyHandle* column_family,
2506
      ReadCallback* callback,
2507
      autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys);
2508
2509
  Status DisableFileDeletionsWithLock();
2510
2511
  Status IncreaseFullHistoryTsLowImpl(ColumnFamilyData* cfd,
2512
                                      std::string ts_low);
2513
2514
  bool ShouldReferenceSuperVersion(const MergeContext& merge_context);
2515
2516
  template <typename IterType, typename ImplType,
2517
            typename ErrorIteratorFuncType>
2518
  std::unique_ptr<IterType> NewMultiCfIterator(
2519
      const ReadOptions& _read_options,
2520
      const std::vector<ColumnFamilyHandle*>& column_families,
2521
      ErrorIteratorFuncType error_iterator_func);
2522
2523
  // Lock over the persistent DB state.  Non-nullptr iff successfully acquired.
2524
  FileLock* db_lock_;
2525
2526
  // Guards changes to DB and CF options to ensure consistency between
2527
  // * In-memory options objects
2528
  // * Settings in effect
2529
  // * Options file contents
2530
  // while allowing the DB mutex to be released during slow operations like
2531
  // persisting options file or modifying global periodic task timer.
2532
  // Always acquired *before* DB mutex when this one is applicable.
2533
  InstrumentedMutex options_mutex_;
2534
2535
  // Guards reads and writes to in-memory stats_history_.
2536
  InstrumentedMutex stats_history_mutex_;
2537
2538
  // In addition to mutex_, log_write_mutex_ protects writes to logs_ and
2539
  // logfile_number_. With two_write_queues it also protects alive_log_files_,
2540
  // and log_empty_. Refer to the definition of each variable below for more
2541
  // details.
2542
  // Note: to avoid deadlock, if needed to acquire both log_write_mutex_ and
2543
  // mutex_, the order should be first mutex_ and then log_write_mutex_.
2544
  InstrumentedMutex log_write_mutex_;
2545
2546
  // If zero, manual compactions are allowed to proceed. If non-zero, manual
2547
  // compactions may still be running, but will quickly fail with
2548
  // `Status::Incomplete`. The value indicates how many threads have paused
2549
  // manual compactions. It is accessed in read mode outside the DB mutex in
2550
  // compaction code paths.
2551
  std::atomic<int> manual_compaction_paused_;
2552
2553
  // This condition variable is signaled on these conditions:
2554
  // * whenever bg_compaction_scheduled_ goes down to 0
2555
  // * if AnyManualCompaction, whenever a compaction finishes, even if it hasn't
2556
  // made any progress
2557
  // * whenever a compaction made any progress
2558
  // * whenever bg_flush_scheduled_ or bg_purge_scheduled_ value decreases
2559
  // (i.e. whenever a flush is done, even if it didn't make any progress)
2560
  // * whenever there is an error in background purge, flush or compaction
2561
  // * whenever num_running_ingest_file_ goes to 0.
2562
  // * whenever pending_purge_obsolete_files_ goes to 0.
2563
  // * whenever disable_delete_obsolete_files_ goes to 0.
2564
  // * whenever SetOptions successfully updates options.
2565
  // * whenever a column family is dropped.
2566
  InstrumentedCondVar bg_cv_;
2567
  // Writes are protected by locking both mutex_ and log_write_mutex_, and reads
2568
  // must be under either mutex_ or log_write_mutex_. Since after ::Open,
2569
  // logfile_number_ is currently updated only in write_thread_, it can be read
2570
  // from the same write_thread_ without any locks.
2571
  uint64_t logfile_number_;
2572
  // Log files that we can recycle. Must be protected by db mutex_.
2573
  std::deque<uint64_t> log_recycle_files_;
2574
  // The minimum log file number taht can be recycled, if log recycling is
2575
  // enabled. This is used to ensure that log files created by previous
2576
  // instances of the database are not recycled, as we cannot be sure they
2577
  // were created in the recyclable format.
2578
  uint64_t min_log_number_to_recycle_;
2579
  // Protected by log_write_mutex_.
2580
  bool log_dir_synced_;
2581
  // Without two_write_queues, read and writes to log_empty_ are protected by
2582
  // mutex_. Since it is currently updated/read only in write_thread_, it can be
2583
  // accessed from the same write_thread_ without any locks. With
2584
  // two_write_queues writes, where it can be updated in different threads,
2585
  // read and writes are protected by log_write_mutex_ instead. This is to avoid
2586
  // expensive mutex_ lock during WAL write, which update log_empty_.
2587
  bool log_empty_;
2588
2589
  ColumnFamilyHandleImpl* persist_stats_cf_handle_;
2590
2591
  bool persistent_stats_cfd_exists_ = true;
2592
2593
  // The current WAL file and those that have not been found obsolete from
2594
  // memtable flushes. A WAL not on this list might still be pending writer
2595
  // flush and/or sync and close and might still be in logs_. alive_log_files_
2596
  // is protected by mutex_ and log_write_mutex_ with details as follows:
2597
  // 1. read by FindObsoleteFiles() which can be called in either application
2598
  //    thread or RocksDB bg threads, both mutex_ and log_write_mutex_ are
2599
  //    held.
2600
  // 2. pop_front() by FindObsoleteFiles(), both mutex_ and log_write_mutex_
2601
  //    are held.
2602
  // 3. push_back() by DBImpl::Open() and DBImpl::RestoreAliveLogFiles()
2603
  //    (actually called by Open()), only mutex_ is held because at this point,
2604
  //    the DB::Open() call has not returned success to application, and the
2605
  //    only other thread(s) that can conflict are bg threads calling
2606
  //    FindObsoleteFiles() which ensure that both mutex_ and log_write_mutex_
2607
  //    are held when accessing alive_log_files_.
2608
  // 4. read by DBImpl::Open() is protected by mutex_.
2609
  // 5. push_back() by SwitchMemtable(). Both mutex_ and log_write_mutex_ are
2610
  //    held. This is done by the write group leader. Note that in the case of
2611
  //    two-write-queues, another WAL-only write thread can be writing to the
2612
  //    WAL concurrently. See 9.
2613
  // 6. read by SwitchWAL() with both mutex_ and log_write_mutex_ held. This is
2614
  //    done by write group leader.
2615
  // 7. read by ConcurrentWriteToWAL() by the write group leader in the case of
2616
  //    two-write-queues. Only log_write_mutex_ is held to protect concurrent
2617
  //    pop_front() by FindObsoleteFiles().
2618
  // 8. read by PreprocessWrite() by the write group leader. log_write_mutex_
2619
  //    is held to protect the data structure from concurrent pop_front() by
2620
  //    FindObsoleteFiles().
2621
  // 9. read by ConcurrentWriteToWAL() by a WAL-only write thread in the case
2622
  //    of two-write-queues. Only log_write_mutex_ is held. This suffices to
2623
  //    protect the data structure from concurrent push_back() by current
2624
  //    write group leader as well as pop_front() by FindObsoleteFiles().
2625
  std::deque<LogFileNumberSize> alive_log_files_;
2626
2627
  // Log files that aren't fully synced, and the current log file.
2628
  // Synchronization:
2629
  // 1. read by FindObsoleteFiles() which can be called either in application
2630
  //    thread or RocksDB bg threads. log_write_mutex_ is always held, while
2631
  //    some reads are performed without mutex_.
2632
  // 2. pop_front() by FindObsoleteFiles() with only log_write_mutex_ held.
2633
  // 3. read by DBImpl::Open() with both mutex_ and log_write_mutex_.
2634
  // 4. emplace_back() by DBImpl::Open() with both mutex_ and log_write_mutex.
2635
  //    Note that at this point, DB::Open() has not returned success to
2636
  //    application, thus the only other thread(s) that can conflict are bg
2637
  //    threads calling FindObsoleteFiles(). See 1.
2638
  // 5. iteration and clear() from CloseHelper() always hold log_write_mutex
2639
  //    and mutex_.
2640
  // 6. back() called by APIs FlushWAL() and LockWAL() are protected by only
2641
  //    log_write_mutex_. These two can be called by application threads after
2642
  //    DB::Open() returns success to applications.
2643
  // 7. read by SyncWAL(), another API, protected by only log_write_mutex_.
2644
  // 8. read by MarkLogsNotSynced() and MarkLogsSynced() are protected by
2645
  //    log_write_mutex_.
2646
  // 9. erase() by MarkLogsSynced() protected by log_write_mutex_.
2647
  // 10. read by SyncClosedWals() protected by only log_write_mutex_. This can
2648
  //     happen in bg flush threads after DB::Open() returns success to
2649
  //     applications.
2650
  // 11. reads, e.g. front(), iteration, and back() called by PreprocessWrite()
2651
  //     holds only the log_write_mutex_. This is done by the write group
2652
  //     leader. A bg thread calling FindObsoleteFiles() or MarkLogsSynced()
2653
  //     can happen concurrently. This is fine because log_write_mutex_ is used
2654
  //     by all parties. See 2, 5, 9.
2655
  // 12. reads, empty(), back() called by SwitchMemtable() hold both mutex_ and
2656
  //     log_write_mutex_. This happens in the write group leader.
2657
  // 13. emplace_back() by SwitchMemtable() hold both mutex_ and
2658
  //     log_write_mutex_. This happens in the write group leader. Can conflict
2659
  //     with bg threads calling FindObsoleteFiles(), MarkLogsSynced(),
2660
  //     SyncClosedWals(), etc. as well as application threads calling
2661
  //     FlushWAL(), SyncWAL(), LockWAL(). This is fine because all parties
2662
  //     require at least log_write_mutex_.
2663
  // 14. iteration called in WriteToWAL(write_group) protected by
2664
  //     log_write_mutex_. This is done by write group leader when
2665
  //     two-write-queues is disabled and write needs to sync logs.
2666
  // 15. back() called in ConcurrentWriteToWAL() protected by log_write_mutex_.
2667
  //     This can be done by the write group leader if two-write-queues is
2668
  //     enabled. It can also be done by another WAL-only write thread.
2669
  //
2670
  // Other observations:
2671
  //  - back() and items with getting_synced=true are not popped,
2672
  //  - The same thread that sets getting_synced=true will reset it.
2673
  //  - it follows that the object referred by back() can be safely read from
2674
  //  the write_thread_ without using mutex. Note that calling back() without
2675
  //  mutex may be unsafe because different implementations of deque::back() may
2676
  //  access other member variables of deque, causing undefined behaviors.
2677
  //  Generally, do not access stl containers without proper synchronization.
2678
  //  - it follows that the items with getting_synced=true can be safely read
2679
  //  from the same thread that has set getting_synced=true
2680
  std::deque<LogWriterNumber> logs_;
2681
2682
  // Signaled when getting_synced becomes false for some of the logs_.
2683
  InstrumentedCondVar log_sync_cv_;
2684
  // This is the app-level state that is written to the WAL but will be used
2685
  // only during recovery. Using this feature enables not writing the state to
2686
  // memtable on normal writes and hence improving the throughput. Each new
2687
  // write of the state will replace the previous state entirely even if the
2688
  // keys in the two consecutive states do not overlap.
2689
  // It is protected by log_write_mutex_ when two_write_queues_ is enabled.
2690
  // Otherwise only the heaad of write_thread_ can access it.
2691
  WriteBatch cached_recoverable_state_;
2692
  std::atomic<bool> cached_recoverable_state_empty_ = {true};
2693
  std::atomic<uint64_t> total_log_size_;
2694
2695
  // If this is non-empty, we need to delete these log files in background
2696
  // threads. Protected by log_write_mutex_.
2697
  autovector<log::Writer*> logs_to_free_;
2698
2699
  bool is_snapshot_supported_;
2700
2701
  std::map<uint64_t, std::map<std::string, uint64_t>> stats_history_;
2702
2703
  std::map<std::string, uint64_t> stats_slice_;
2704
2705
  bool stats_slice_initialized_ = false;
2706
2707
  Directories directories_;
2708
2709
  WriteBufferManager* write_buffer_manager_;
2710
2711
  WriteThread write_thread_;
2712
  WriteBatch tmp_batch_;
2713
  // The write thread when the writers have no memtable write. This will be used
2714
  // in 2PC to batch the prepares separately from the serial commit.
2715
  WriteThread nonmem_write_thread_;
2716
2717
  WriteController write_controller_;
2718
2719
  // Size of the last batch group. In slowdown mode, next write needs to
2720
  // sleep if it uses up the quota.
2721
  // Note: This is to protect memtable and compaction. If the batch only writes
2722
  // to the WAL its size need not to be included in this.
2723
  uint64_t last_batch_group_size_;
2724
2725
  FlushScheduler flush_scheduler_;
2726
2727
  TrimHistoryScheduler trim_history_scheduler_;
2728
2729
  SnapshotList snapshots_;
2730
2731
  TimestampedSnapshotList timestamped_snapshots_;
2732
2733
  // For each background job, pending_outputs_ keeps the current file number at
2734
  // the time that background job started.
2735
  // FindObsoleteFiles()/PurgeObsoleteFiles() never deletes any file that has
2736
  // number bigger than any of the file number in pending_outputs_. Since file
2737
  // numbers grow monotonically, this also means that pending_outputs_ is always
2738
  // sorted. After a background job is done executing, its file number is
2739
  // deleted from pending_outputs_, which allows PurgeObsoleteFiles() to clean
2740
  // it up.
2741
  // State is protected with db mutex.
2742
  std::list<uint64_t> pending_outputs_;
2743
2744
  // flush_queue_ and compaction_queue_ hold column families that we need to
2745
  // flush and compact, respectively.
2746
  // A column family is inserted into flush_queue_ when it satisfies condition
2747
  // cfd->imm()->IsFlushPending()
2748
  // A column family is inserted into compaction_queue_ when it satisfied
2749
  // condition cfd->NeedsCompaction()
2750
  // Column families in this list are all Ref()-erenced
2751
  // TODO(icanadi) Provide some kind of ReferencedColumnFamily class that will
2752
  // do RAII on ColumnFamilyData
2753
  // Column families are in this queue when they need to be flushed or
2754
  // compacted. Consumers of these queues are flush and compaction threads. When
2755
  // column family is put on this queue, we increase unscheduled_flushes_ and
2756
  // unscheduled_compactions_. When these variables are bigger than zero, that
2757
  // means we need to schedule background threads for flush and compaction.
2758
  // Once the background threads are scheduled, we decrease unscheduled_flushes_
2759
  // and unscheduled_compactions_. That way we keep track of number of
2760
  // compaction and flush threads we need to schedule. This scheduling is done
2761
  // in MaybeScheduleFlushOrCompaction()
2762
  // invariant(column family present in flush_queue_ <==>
2763
  // ColumnFamilyData::pending_flush_ == true)
2764
  std::deque<FlushRequest> flush_queue_;
2765
  // invariant(column family present in compaction_queue_ <==>
2766
  // ColumnFamilyData::pending_compaction_ == true)
2767
  std::deque<ColumnFamilyData*> compaction_queue_;
2768
2769
  // A map to store file numbers and filenames of the files to be purged
2770
  std::unordered_map<uint64_t, PurgeFileInfo> purge_files_;
2771
2772
  // A vector to store the file numbers that have been assigned to certain
2773
  // JobContext. Current implementation tracks table and blob files only.
2774
  std::unordered_set<uint64_t> files_grabbed_for_purge_;
2775
2776
  // A queue to store log writers to close. Protected by db mutex_.
2777
  std::deque<log::Writer*> logs_to_free_queue_;
2778
2779
  std::deque<SuperVersion*> superversions_to_free_queue_;
2780
2781
  int unscheduled_flushes_;
2782
2783
  int unscheduled_compactions_;
2784
2785
  // count how many background compactions are running or have been scheduled in
2786
  // the BOTTOM pool
2787
  int bg_bottom_compaction_scheduled_;
2788
2789
  // count how many background compactions are running or have been scheduled
2790
  int bg_compaction_scheduled_;
2791
2792
  // stores the number of compactions are currently running
2793
  int num_running_compactions_;
2794
2795
  // number of background memtable flush jobs, submitted to the HIGH pool
2796
  int bg_flush_scheduled_;
2797
2798
  // stores the number of flushes are currently running
2799
  int num_running_flushes_;
2800
2801
  // number of background obsolete file purge jobs, submitted to the HIGH pool
2802
  int bg_purge_scheduled_;
2803
2804
  std::deque<ManualCompactionState*> manual_compaction_dequeue_;
2805
2806
  // shall we disable deletion of obsolete files
2807
  // if 0 the deletion is enabled.
2808
  // if non-zero, files will not be getting deleted
2809
  // This enables two different threads to call
2810
  // EnableFileDeletions() and DisableFileDeletions()
2811
  // without any synchronization
2812
  int disable_delete_obsolete_files_;
2813
2814
  // Number of times FindObsoleteFiles has found deletable files and the
2815
  // corresponding call to PurgeObsoleteFiles has not yet finished.
2816
  int pending_purge_obsolete_files_;
2817
2818
  // last time when DeleteObsoleteFiles with full scan was executed. Originally
2819
  // initialized with startup time.
2820
  uint64_t delete_obsolete_files_last_run_;
2821
2822
  // The thread that wants to switch memtable, can wait on this cv until the
2823
  // pending writes to memtable finishes.
2824
  std::condition_variable switch_cv_;
2825
  // The mutex used by switch_cv_. mutex_ should be acquired beforehand.
2826
  std::mutex switch_mutex_;
2827
  // Number of threads intending to write to memtable
2828
  std::atomic<size_t> pending_memtable_writes_ = {};
2829
2830
  // A flag indicating whether the current rocksdb database has any
2831
  // data that is not yet persisted into either WAL or SST file.
2832
  // Used when disableWAL is true.
2833
  std::atomic<bool> has_unpersisted_data_;
2834
2835
  // if an attempt was made to flush all column families that
2836
  // the oldest log depends on but uncommitted data in the oldest
2837
  // log prevents the log from being released.
2838
  // We must attempt to free the dependent memtables again
2839
  // at a later time after the transaction in the oldest
2840
  // log is fully commited.
2841
  bool unable_to_release_oldest_log_;
2842
2843
  // Number of running IngestExternalFile() or CreateColumnFamilyWithImport()
2844
  // calls.
2845
  // REQUIRES: mutex held
2846
  int num_running_ingest_file_;
2847
2848
  WalManager wal_manager_;
2849
2850
  // A value of > 0 temporarily disables scheduling of background work
2851
  int bg_work_paused_;
2852
2853
  // A value of > 0 temporarily disables scheduling of background compaction
2854
  int bg_compaction_paused_;
2855
2856
  // Guard against multiple concurrent refitting
2857
  bool refitting_level_;
2858
2859
  // Indicate DB was opened successfully
2860
  bool opened_successfully_;
2861
2862
  // The min threshold to triggere bottommost compaction for removing
2863
  // garbages, among all column families.
2864
  SequenceNumber bottommost_files_mark_threshold_ = kMaxSequenceNumber;
2865
2866
  LogsWithPrepTracker logs_with_prep_tracker_;
2867
2868
  // Callback for compaction to check if a key is visible to a snapshot.
2869
  // REQUIRES: mutex held
2870
  std::unique_ptr<SnapshotChecker> snapshot_checker_;
2871
2872
  // Callback for when the cached_recoverable_state_ is written to memtable
2873
  // Only to be set during initialization
2874
  std::unique_ptr<PreReleaseCallback> recoverable_state_pre_release_callback_;
2875
2876
  // Scheduler to run DumpStats(), PersistStats(), and FlushInfoLog().
2877
  // Currently, internally it has a global timer instance for running the tasks.
2878
  PeriodicTaskScheduler periodic_task_scheduler_;
2879
2880
  // It contains the implementations for each periodic task.
2881
  std::map<PeriodicTaskType, const PeriodicTaskFunc> periodic_task_functions_;
2882
2883
  // When set, we use a separate queue for writes that don't write to memtable.
2884
  // In 2PC these are the writes at Prepare phase.
2885
  const bool two_write_queues_;
2886
  const bool manual_wal_flush_;
2887
2888
  // LastSequence also indicates last published sequence visibile to the
2889
  // readers. Otherwise LastPublishedSequence should be used.
2890
  const bool last_seq_same_as_publish_seq_;
2891
  // It indicates that a customized gc algorithm must be used for
2892
  // flush/compaction and if it is not provided vis SnapshotChecker, we should
2893
  // disable gc to be safe.
2894
  const bool use_custom_gc_;
2895
  // Flag to indicate that the DB instance shutdown has been initiated. This
2896
  // different from shutting_down_ atomic in that it is set at the beginning
2897
  // of shutdown sequence, specifically in order to prevent any background
2898
  // error recovery from going on in parallel. The latter, shutting_down_,
2899
  // is set a little later during the shutdown after scheduling memtable
2900
  // flushes
2901
  std::atomic<bool> shutdown_initiated_;
2902
  // Flag to indicate whether sst_file_manager object was allocated in
2903
  // DB::Open() or passed to us
2904
  bool own_sfm_;
2905
2906
  // Flag to check whether Close() has been called on this DB
2907
  bool closed_;
2908
  // save the closing status, for re-calling the close()
2909
  Status closing_status_;
2910
  // mutex for DB::Close()
2911
  InstrumentedMutex closing_mutex_;
2912
2913
  // Conditional variable to coordinate installation of atomic flush results.
2914
  // With atomic flush, each bg thread installs the result of flushing multiple
2915
  // column families, and different threads can flush different column
2916
  // families. It's difficult to rely on one thread to perform batch
2917
  // installation for all threads. This is different from the non-atomic flush
2918
  // case.
2919
  // atomic_flush_install_cv_ makes sure that threads install atomic flush
2920
  // results sequentially. Flush results of memtables with lower IDs get
2921
  // installed to MANIFEST first.
2922
  InstrumentedCondVar atomic_flush_install_cv_;
2923
2924
  bool wal_in_db_path_;
2925
  std::atomic<uint64_t> max_total_wal_size_;
2926
2927
  BlobFileCompletionCallback blob_callback_;
2928
2929
  // Pointer to WriteBufferManager stalling interface.
2930
  std::unique_ptr<StallInterface> wbm_stall_;
2931
2932
  // seqno_to_time_mapping_ stores the sequence number to time mapping, it's not
2933
  // thread safe, both read and write need db mutex hold.
2934
  SeqnoToTimeMapping seqno_to_time_mapping_;
2935
2936
  // Stop write token that is acquired when first LockWAL() is called.
2937
  // Destroyed when last UnlockWAL() is called. Controlled by DB mutex.
2938
  // See lock_wal_count_
2939
  std::unique_ptr<WriteControllerToken> lock_wal_write_token_;
2940
2941
  // The number of LockWAL called without matching UnlockWAL call.
2942
  // See also lock_wal_write_token_
2943
  uint32_t lock_wal_count_;
2944
};
2945
2946
class GetWithTimestampReadCallback : public ReadCallback {
2947
 public:
2948
  explicit GetWithTimestampReadCallback(SequenceNumber seq)
2949
584
      : ReadCallback(seq) {}
2950
0
  bool IsVisibleFullCheck(SequenceNumber seq) override {
2951
0
    return seq <= max_visible_seq_;
2952
0
  }
2953
};
2954
2955
Options SanitizeOptions(const std::string& db, const Options& src,
2956
                        bool read_only = false,
2957
                        Status* logger_creation_s = nullptr);
2958
2959
DBOptions SanitizeOptions(const std::string& db, const DBOptions& src,
2960
                          bool read_only = false,
2961
                          Status* logger_creation_s = nullptr);
2962
2963
CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions,
2964
                                    const MutableCFOptions& mutable_cf_options);
2965
2966
// Return the earliest log file to keep after the memtable flush is
2967
// finalized.
2968
// `cfd_to_flush` is the column family whose memtable (specified in
2969
// `memtables_to_flush`) will be flushed and thus will not depend on any WAL
2970
// file.
2971
// The function is only applicable to 2pc mode.
2972
uint64_t PrecomputeMinLogNumberToKeep2PC(
2973
    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
2974
    const autovector<VersionEdit*>& edit_list,
2975
    const autovector<MemTable*>& memtables_to_flush,
2976
    LogsWithPrepTracker* prep_tracker);
2977
// For atomic flush.
2978
uint64_t PrecomputeMinLogNumberToKeep2PC(
2979
    VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
2980
    const autovector<autovector<VersionEdit*>>& edit_lists,
2981
    const autovector<const autovector<MemTable*>*>& memtables_to_flush,
2982
    LogsWithPrepTracker* prep_tracker);
2983
2984
// In non-2PC mode, WALs with log number < the returned number can be
2985
// deleted after the cfd_to_flush column family is flushed successfully.
2986
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
2987
    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
2988
    const autovector<VersionEdit*>& edit_list);
2989
// For atomic flush.
2990
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
2991
    VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
2992
    const autovector<autovector<VersionEdit*>>& edit_lists);
2993
2994
// `cfd_to_flush` is the column family whose memtable will be flushed and thus
2995
// will not depend on any WAL file. nullptr means no memtable is being flushed.
2996
// The function is only applicable to 2pc mode.
2997
uint64_t FindMinPrepLogReferencedByMemTable(
2998
    VersionSet* vset, const autovector<MemTable*>& memtables_to_flush);
2999
// For atomic flush.
3000
uint64_t FindMinPrepLogReferencedByMemTable(
3001
    VersionSet* vset,
3002
    const autovector<const autovector<MemTable*>*>& memtables_to_flush);
3003
3004
// Fix user-supplied options to be reasonable
3005
template <class T, class V>
3006
28.3k
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
3007
28.3k
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
3008
28.3k
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
3009
28.3k
}
Unexecuted instantiation: db_impl_open.cc:void rocksdb::ClipToRange<int, int>(int*, int, int)
column_family.cc:void rocksdb::ClipToRange<unsigned long, unsigned long>(unsigned long*, unsigned long, unsigned long)
Line
Count
Source
3006
28.3k
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
3007
28.3k
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
3008
28.3k
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
3009
28.3k
}
3010
3011
inline Status DBImpl::FailIfCfHasTs(
3012
1.16M
    const ColumnFamilyHandle* column_family) const {
3013
1.16M
  if (!column_family) {
3014
0
    return Status::InvalidArgument("column family handle cannot be null");
3015
0
  }
3016
1.16M
  assert(column_family);
3017
1.16M
  const Comparator* const ucmp = column_family->GetComparator();
3018
1.16M
  assert(ucmp);
3019
1.16M
  if (ucmp->timestamp_size() > 0) {
3020
0
    std::ostringstream oss;
3021
0
    oss << "cannot call this method on column family "
3022
0
        << column_family->GetName() << " that enables timestamp";
3023
0
    return Status::InvalidArgument(oss.str());
3024
0
  }
3025
1.16M
  return Status::OK();
3026
1.16M
}
3027
3028
inline Status DBImpl::FailIfTsMismatchCf(ColumnFamilyHandle* column_family,
3029
0
                                         const Slice& ts) const {
3030
0
  if (!column_family) {
3031
0
    return Status::InvalidArgument("column family handle cannot be null");
3032
0
  }
3033
0
  assert(column_family);
3034
0
  const Comparator* const ucmp = column_family->GetComparator();
3035
0
  assert(ucmp);
3036
0
  if (0 == ucmp->timestamp_size()) {
3037
0
    std::stringstream oss;
3038
0
    oss << "cannot call this method on column family "
3039
0
        << column_family->GetName() << " that does not enable timestamp";
3040
0
    return Status::InvalidArgument(oss.str());
3041
0
  }
3042
0
  const size_t ts_sz = ts.size();
3043
0
  if (ts_sz != ucmp->timestamp_size()) {
3044
0
    std::stringstream oss;
3045
0
    oss << "Timestamp sizes mismatch: expect " << ucmp->timestamp_size() << ", "
3046
0
        << ts_sz << " given";
3047
0
    return Status::InvalidArgument(oss.str());
3048
0
  }
3049
0
  return Status::OK();
3050
0
}
3051
3052
inline Status DBImpl::FailIfReadCollapsedHistory(const ColumnFamilyData* cfd,
3053
                                                 const SuperVersion* sv,
3054
0
                                                 const Slice& ts) const {
3055
  // Reaching to this point means the timestamp size matching sanity check in
3056
  // `DBImpl::FailIfTsMismatchCf` already passed. So we skip that and assume
3057
  // column family has the same user-defined timestamp format as `ts`.
3058
0
  const Comparator* const ucmp = cfd->user_comparator();
3059
0
  assert(ucmp);
3060
0
  const std::string& full_history_ts_low = sv->full_history_ts_low;
3061
0
  assert(full_history_ts_low.empty() ||
3062
0
         full_history_ts_low.size() == ts.size());
3063
0
  if (!full_history_ts_low.empty() &&
3064
0
      ucmp->CompareTimestamp(ts, full_history_ts_low) < 0) {
3065
0
    std::stringstream oss;
3066
0
    oss << "Read timestamp: " << ucmp->TimestampToString(ts)
3067
0
        << " is smaller than full_history_ts_low: "
3068
0
        << ucmp->TimestampToString(full_history_ts_low) << std::endl;
3069
0
    return Status::InvalidArgument(oss.str());
3070
0
  }
3071
0
  return Status::OK();
3072
0
}
3073
}  // namespace ROCKSDB_NAMESPACE