Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/flush_job.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/flush_job.h"
11
12
#include <algorithm>
13
#include <cinttypes>
14
#include <vector>
15
16
#include "db/builder.h"
17
#include "db/db_iter.h"
18
#include "db/dbformat.h"
19
#include "db/event_helpers.h"
20
#include "db/log_reader.h"
21
#include "db/log_writer.h"
22
#include "db/memtable.h"
23
#include "db/memtable_list.h"
24
#include "db/merge_context.h"
25
#include "db/range_tombstone_fragmenter.h"
26
#include "db/version_edit.h"
27
#include "db/version_set.h"
28
#include "file/file_util.h"
29
#include "file/filename.h"
30
#include "logging/event_logger.h"
31
#include "logging/log_buffer.h"
32
#include "logging/logging.h"
33
#include "monitoring/iostats_context_imp.h"
34
#include "monitoring/perf_context_imp.h"
35
#include "monitoring/thread_status_util.h"
36
#include "port/port.h"
37
#include "rocksdb/db.h"
38
#include "rocksdb/env.h"
39
#include "rocksdb/statistics.h"
40
#include "rocksdb/status.h"
41
#include "rocksdb/table.h"
42
#include "table/merging_iterator.h"
43
#include "table/table_builder.h"
44
#include "table/two_level_iterator.h"
45
#include "test_util/sync_point.h"
46
#include "util/coding.h"
47
#include "util/mutexlock.h"
48
#include "util/stop_watch.h"
49
50
namespace ROCKSDB_NAMESPACE {
51
52
1.14k
const char* GetFlushReasonString(FlushReason flush_reason) {
53
1.14k
  switch (flush_reason) {
54
0
    case FlushReason::kOthers:
55
0
      return "Other Reasons";
56
0
    case FlushReason::kGetLiveFiles:
57
0
      return "Get Live Files";
58
0
    case FlushReason::kShutDown:
59
0
      return "Shut down";
60
0
    case FlushReason::kExternalFileIngestion:
61
0
      return "External File Ingestion";
62
1.14k
    case FlushReason::kManualCompaction:
63
1.14k
      return "Manual Compaction";
64
0
    case FlushReason::kWriteBufferManager:
65
0
      return "Write Buffer Manager";
66
0
    case FlushReason::kWriteBufferFull:
67
0
      return "Write Buffer Full";
68
0
    case FlushReason::kTest:
69
0
      return "Test";
70
0
    case FlushReason::kDeleteFiles:
71
0
      return "Delete Files";
72
0
    case FlushReason::kAutoCompaction:
73
0
      return "Auto Compaction";
74
0
    case FlushReason::kManualFlush:
75
0
      return "Manual Flush";
76
0
    case FlushReason::kErrorRecovery:
77
0
      return "Error Recovery";
78
0
    case FlushReason::kErrorRecoveryRetryFlush:
79
0
      return "Error Recovery Retry Flush";
80
0
    case FlushReason::kWalFull:
81
0
      return "WAL Full";
82
0
    case FlushReason::kCatchUpAfterErrorRecovery:
83
0
      return "Catch Up After Error Recovery";
84
0
    default:
85
0
      return "Invalid";
86
1.14k
  }
87
1.14k
}
88
89
FlushJob::FlushJob(
90
    const std::string& dbname, ColumnFamilyData* cfd,
91
    const ImmutableDBOptions& db_options,
92
    const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
93
    const FileOptions& file_options, VersionSet* versions,
94
    InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
95
    JobContext* job_context, FlushReason flush_reason, LogBuffer* log_buffer,
96
    FSDirectory* db_directory, FSDirectory* output_file_directory,
97
    CompressionType output_compression, Statistics* stats,
98
    EventLogger* event_logger, bool measure_io_stats,
99
    const bool sync_output_directory, const bool write_manifest,
100
    Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
101
    std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping,
102
    const std::string& db_id, const std::string& db_session_id,
103
    std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback)
104
1.14k
    : dbname_(dbname),
105
1.14k
      db_id_(db_id),
106
1.14k
      db_session_id_(db_session_id),
107
1.14k
      cfd_(cfd),
108
1.14k
      db_options_(db_options),
109
1.14k
      mutable_cf_options_(mutable_cf_options),
110
1.14k
      max_memtable_id_(max_memtable_id),
111
1.14k
      file_options_(file_options),
112
1.14k
      versions_(versions),
113
1.14k
      db_mutex_(db_mutex),
114
1.14k
      shutting_down_(shutting_down),
115
1.14k
      earliest_snapshot_(job_context->GetEarliestSnapshotSequence()),
116
1.14k
      job_context_(job_context),
117
1.14k
      flush_reason_(flush_reason),
118
1.14k
      log_buffer_(log_buffer),
119
1.14k
      db_directory_(db_directory),
120
1.14k
      output_file_directory_(output_file_directory),
121
1.14k
      output_compression_(output_compression),
122
1.14k
      stats_(stats),
123
1.14k
      event_logger_(event_logger),
124
1.14k
      measure_io_stats_(measure_io_stats),
125
1.14k
      sync_output_directory_(sync_output_directory),
126
1.14k
      write_manifest_(write_manifest),
127
1.14k
      edit_(nullptr),
128
1.14k
      base_(nullptr),
129
1.14k
      pick_memtable_called(false),
130
1.14k
      thread_pri_(thread_pri),
131
1.14k
      io_tracer_(io_tracer),
132
1.14k
      clock_(db_options_.clock),
133
1.14k
      full_history_ts_low_(std::move(full_history_ts_low)),
134
1.14k
      blob_callback_(blob_callback),
135
1.14k
      seqno_to_time_mapping_(std::move(seqno_to_time_mapping)) {
136
1.14k
  assert(job_context->snapshot_context_initialized);
137
  // Update the thread status to indicate flush.
138
1.14k
  ReportStartedFlush();
139
1.14k
  TEST_SYNC_POINT("FlushJob::FlushJob()");
140
1.14k
}
141
142
1.14k
FlushJob::~FlushJob() { ThreadStatusUtil::ResetThreadStatus(); }
143
144
1.14k
void FlushJob::ReportStartedFlush() {
145
1.14k
  ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking);
146
1.14k
  ThreadStatusUtil::SetColumnFamily(cfd_);
147
1.14k
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
148
1.14k
  ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
149
1.14k
                                               job_context_->job_id);
150
151
1.14k
  IOSTATS_RESET(bytes_written);
152
1.14k
}
153
154
1.14k
void FlushJob::ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems) {
155
1.14k
  uint64_t input_size = 0;
156
1.14k
  for (auto* mem : mems) {
157
1.14k
    input_size += mem->ApproximateMemoryUsage();
158
1.14k
  }
159
1.14k
  ThreadStatusUtil::IncreaseThreadOperationProperty(
160
1.14k
      ThreadStatus::FLUSH_BYTES_MEMTABLES, input_size);
161
1.14k
}
162
163
2.29k
void FlushJob::RecordFlushIOStats() {
164
2.29k
  RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
165
2.29k
  ThreadStatusUtil::IncreaseThreadOperationProperty(
166
2.29k
      ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
167
2.29k
  IOSTATS_RESET(bytes_written);
168
2.29k
}
169
1.14k
void FlushJob::PickMemTable() {
170
1.14k
  db_mutex_->AssertHeld();
171
1.14k
  assert(!pick_memtable_called);
172
1.14k
  pick_memtable_called = true;
173
174
  // Maximum "NextLogNumber" of the memtables to flush.
175
  // When mempurge feature is turned off, this variable is useless
176
  // because the memtables are implicitly sorted by increasing order of creation
177
  // time. Therefore mems_->back()->GetNextLogNumber() is already equal to
178
  // max_next_log_number. However when Mempurge is on, the memtables are no
179
  // longer sorted by increasing order of creation time. Therefore this variable
180
  // becomes necessary because mems_->back()->GetNextLogNumber() is no longer
181
  // necessarily equal to max_next_log_number.
182
1.14k
  uint64_t max_next_log_number = 0;
183
184
  // Save the contents of the earliest memtable as a new Table
185
1.14k
  cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_,
186
1.14k
                                    &max_next_log_number);
187
1.14k
  if (mems_.empty()) {
188
0
    return;
189
0
  }
190
191
  // Track effective cutoff user-defined timestamp during flush if
192
  // user-defined timestamps can be stripped.
193
1.14k
  GetEffectiveCutoffUDTForPickedMemTables();
194
1.14k
  GetPrecludeLastLevelMinSeqno();
195
196
1.14k
  ReportFlushInputSize(mems_);
197
198
  // entries mems are (implicitly) sorted in ascending order by their created
199
  // time. We will use the first memtable's `edit` to keep the meta info for
200
  // this flush.
201
1.14k
  ReadOnlyMemTable* m = mems_[0];
202
1.14k
  edit_ = m->GetEdits();
203
1.14k
  edit_->SetPrevLogNumber(0);
204
  // SetLogNumber(log_num) indicates logs with number smaller than log_num
205
  // will no longer be picked up for recovery.
206
1.14k
  edit_->SetLogNumber(max_next_log_number);
207
1.14k
  edit_->SetColumnFamily(cfd_->GetID());
208
209
  // path 0 for level 0 file.
210
1.14k
  meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
211
1.14k
  meta_.epoch_number = cfd_->NewEpochNumber();
212
213
1.14k
  base_ = cfd_->current();
214
1.14k
  base_->Ref();  // it is likely that we do not need this reference
215
1.14k
}
216
217
Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
218
                     bool* switched_to_mempurge, bool* skipped_since_bg_error,
219
1.14k
                     ErrorHandler* error_handler) {
220
1.14k
  TEST_SYNC_POINT("FlushJob::Start");
221
1.14k
  db_mutex_->AssertHeld();
222
1.14k
  assert(pick_memtable_called);
223
  // Mempurge threshold can be dynamically changed.
224
  // For sake of consistency, mempurge_threshold is
225
  // saved locally to maintain consistency in each
226
  // FlushJob::Run call.
227
1.14k
  double mempurge_threshold =
228
1.14k
      mutable_cf_options_.experimental_mempurge_threshold;
229
230
1.14k
  AutoThreadOperationStageUpdater stage_run(ThreadStatus::STAGE_FLUSH_RUN);
231
1.14k
  if (mems_.empty()) {
232
0
    ROCKS_LOG_BUFFER(log_buffer_, "[%s] No memtable to flush",
233
0
                     cfd_->GetName().c_str());
234
0
    return Status::OK();
235
0
  }
236
237
  // I/O measurement variables
238
1.14k
  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
239
1.14k
  uint64_t prev_write_nanos = 0;
240
1.14k
  uint64_t prev_fsync_nanos = 0;
241
1.14k
  uint64_t prev_range_sync_nanos = 0;
242
1.14k
  uint64_t prev_prepare_write_nanos = 0;
243
1.14k
  uint64_t prev_cpu_write_nanos = 0;
244
1.14k
  uint64_t prev_cpu_read_nanos = 0;
245
1.14k
  if (measure_io_stats_) {
246
0
    prev_perf_level = GetPerfLevel();
247
0
    SetPerfLevel(PerfLevel::kEnableTime);
248
0
    prev_write_nanos = IOSTATS(write_nanos);
249
0
    prev_fsync_nanos = IOSTATS(fsync_nanos);
250
0
    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
251
0
    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
252
0
    prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
253
0
    prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
254
0
  }
255
1.14k
  Status mempurge_s = Status::NotFound("No MemPurge.");
256
1.14k
  if ((mempurge_threshold > 0.0) &&
257
0
      (flush_reason_ == FlushReason::kWriteBufferFull) && (!mems_.empty()) &&
258
0
      MemPurgeDecider(mempurge_threshold) && !(db_options_.atomic_flush)) {
259
0
    cfd_->SetMempurgeUsed();
260
0
    mempurge_s = MemPurge();
261
0
    if (!mempurge_s.ok()) {
262
      // Mempurge is typically aborted when the output
263
      // bytes cannot be contained onto a single output memtable.
264
0
      if (mempurge_s.IsAborted()) {
265
0
        ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
266
0
                       mempurge_s.ToString().c_str());
267
0
      } else {
268
        // However the mempurge process can also fail for
269
        // other reasons (eg: new_mem->Add() fails).
270
0
        ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
271
0
                       mempurge_s.ToString().c_str());
272
0
      }
273
0
    } else {
274
0
      if (switched_to_mempurge) {
275
0
        *switched_to_mempurge = true;
276
0
      } else {
277
        // The mempurge process was successful, but no switch_to_mempurge
278
        // pointer provided so no way to propagate the state of flush job.
279
0
        ROCKS_LOG_WARN(db_options_.info_log,
280
0
                       "Mempurge process succeeded"
281
0
                       "but no 'switched_to_mempurge' ptr provided.\n");
282
0
      }
283
0
    }
284
0
  }
285
1.14k
  Status s;
286
1.14k
  if (mempurge_s.ok()) {
287
0
    base_->Unref();
288
0
    s = Status::OK();
289
1.14k
  } else {
290
    // This will release and re-acquire the mutex.
291
1.14k
    s = WriteLevel0Table();
292
1.14k
  }
293
294
1.14k
  if (s.ok() && cfd_->IsDropped()) {
295
0
    s = Status::ColumnFamilyDropped("Column family dropped during compaction");
296
0
  }
297
1.14k
  if ((s.ok() || s.IsColumnFamilyDropped()) &&
298
1.14k
      shutting_down_->load(std::memory_order_acquire)) {
299
0
    s = Status::ShutdownInProgress("Database shutdown");
300
0
  }
301
302
1.14k
  if (s.ok()) {
303
1.14k
    s = MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT();
304
1.14k
  }
305
306
1.14k
  if (!s.ok()) {
307
0
    cfd_->imm()->RollbackMemtableFlush(
308
0
        mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
309
1.14k
  } else if (write_manifest_) {
310
1.14k
    assert(!db_options_.atomic_flush);
311
1.14k
    if (!db_options_.atomic_flush &&
312
1.14k
        flush_reason_ != FlushReason::kErrorRecovery &&
313
1.14k
        flush_reason_ != FlushReason::kErrorRecoveryRetryFlush &&
314
1.14k
        error_handler && !error_handler->GetBGError().ok() &&
315
0
        error_handler->IsBGWorkStopped()) {
316
0
      cfd_->imm()->RollbackMemtableFlush(
317
0
          mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
318
0
      s = error_handler->GetBGError();
319
0
      if (skipped_since_bg_error) {
320
0
        *skipped_since_bg_error = true;
321
0
      }
322
1.14k
    } else {
323
1.14k
      TEST_SYNC_POINT("FlushJob::InstallResults");
324
      // Replace immutable memtable with the generated Table
325
1.14k
      s = cfd_->imm()->TryInstallMemtableFlushResults(
326
1.14k
              cfd_, mems_, prep_tracker, versions_, db_mutex_,
327
1.14k
              meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
328
1.14k
              log_buffer_, &committed_flush_jobs_info_,
329
1.14k
              !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
330
                              but 'false' if mempurge successful: no new min log number
331
1.14k
                              or new level 0 file path to write to manifest. */);
332
1.14k
    }
333
1.14k
  }
334
335
1.14k
  if (s.ok() && file_meta != nullptr) {
336
1.14k
    *file_meta = meta_;
337
1.14k
  }
338
1.14k
  RecordFlushIOStats();
339
340
  // When measure_io_stats_ is true, the default 512 bytes is not enough.
341
1.14k
  auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
342
1.14k
  stream << "job" << job_context_->job_id << "event" << "flush_finished";
343
1.14k
  stream << "output_compression"
344
1.14k
         << CompressionTypeToString(output_compression_);
345
1.14k
  stream << "lsm_state";
346
1.14k
  stream.StartArray();
347
1.14k
  auto vstorage = cfd_->current()->storage_info();
348
9.16k
  for (int level = 0; level < vstorage->num_levels(); ++level) {
349
8.02k
    stream << vstorage->NumLevelFiles(level);
350
8.02k
  }
351
1.14k
  stream.EndArray();
352
353
1.14k
  const auto& blob_files = vstorage->GetBlobFiles();
354
1.14k
  if (!blob_files.empty()) {
355
0
    assert(blob_files.front());
356
0
    stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
357
358
0
    assert(blob_files.back());
359
0
    stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
360
0
  }
361
362
1.14k
  stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
363
364
1.14k
  if (measure_io_stats_) {
365
0
    if (prev_perf_level != PerfLevel::kEnableTime) {
366
0
      SetPerfLevel(prev_perf_level);
367
0
    }
368
0
    stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
369
0
    stream << "file_range_sync_nanos"
370
0
           << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
371
0
    stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
372
0
    stream << "file_prepare_write_nanos"
373
0
           << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
374
0
    stream << "file_cpu_write_nanos"
375
0
           << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
376
0
    stream << "file_cpu_read_nanos"
377
0
           << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
378
0
  }
379
380
1.14k
  TEST_SYNC_POINT("FlushJob::End");
381
1.14k
  return s;
382
1.14k
}
383
384
0
void FlushJob::Cancel() {
385
0
  db_mutex_->AssertHeld();
386
0
  assert(base_ != nullptr);
387
0
  base_->Unref();
388
0
}
389
390
0
Status FlushJob::MemPurge() {
391
0
  Status s;
392
0
  db_mutex_->AssertHeld();
393
0
  db_mutex_->Unlock();
394
0
  assert(!mems_.empty());
395
396
  // Measure purging time.
397
0
  const uint64_t start_micros = clock_->NowMicros();
398
0
  const uint64_t start_cpu_micros = clock_->CPUMicros();
399
400
0
  MemTable* new_mem = nullptr;
401
  // For performance/log investigation purposes:
402
  // look at how much useful payload we harvest in the new_mem.
403
  // This value is then printed to the DB log.
404
0
  double new_mem_capacity = 0.0;
405
406
  // Create two iterators, one for the memtable data (contains
407
  // info from puts + deletes), and one for the memtable
408
  // Range Tombstones (from DeleteRanges).
409
  // TODO: plumb Env::IOActivity, Env::IOPriority
410
0
  ReadOptions ro;
411
0
  ro.total_order_seek = true;
412
0
  Arena arena;
413
0
  std::vector<InternalIterator*> memtables;
414
0
  std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
415
0
      range_del_iters;
416
0
  for (ReadOnlyMemTable* m : mems_) {
417
0
    memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr,
418
0
                                       &arena, /*prefix_extractor=*/nullptr,
419
0
                                       /*for_flush=*/true));
420
0
    auto* range_del_iter = m->NewRangeTombstoneIterator(
421
0
        ro, kMaxSequenceNumber, true /* immutable_memtable */);
422
0
    if (range_del_iter != nullptr) {
423
0
      range_del_iters.emplace_back(range_del_iter);
424
0
    }
425
0
  }
426
427
0
  assert(!memtables.empty());
428
0
  SequenceNumber first_seqno = kMaxSequenceNumber;
429
0
  SequenceNumber earliest_seqno = kMaxSequenceNumber;
430
  // Pick first and earliest seqno as min of all first_seqno
431
  // and earliest_seqno of the mempurged memtables.
432
0
  for (const auto& mem : mems_) {
433
0
    first_seqno = mem->GetFirstSequenceNumber() < first_seqno
434
0
                      ? mem->GetFirstSequenceNumber()
435
0
                      : first_seqno;
436
0
    earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno
437
0
                         ? mem->GetEarliestSequenceNumber()
438
0
                         : earliest_seqno;
439
0
  }
440
441
0
  ScopedArenaPtr<InternalIterator> iter(
442
0
      NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
443
0
                         static_cast<int>(memtables.size()), &arena));
444
445
0
  const auto& ioptions = cfd_->ioptions();
446
447
  // Place iterator at the First (meaning most recent) key node.
448
0
  iter->SeekToFirst();
449
450
0
  const std::string* const full_history_ts_low = &(cfd_->GetFullHistoryTsLow());
451
0
  std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
452
0
      new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
453
0
                                       job_context_->snapshot_seqs,
454
0
                                       full_history_ts_low));
455
0
  for (auto& rd_iter : range_del_iters) {
456
0
    range_del_agg->AddTombstones(std::move(rd_iter));
457
0
  }
458
459
  // If there is valid data in the memtable,
460
  // or at least range tombstones, copy over the info
461
  // to the new memtable.
462
0
  if (iter->Valid() || !range_del_agg->IsEmpty()) {
463
    // MaxSize is the size of a memtable.
464
0
    size_t maxSize = mutable_cf_options_.write_buffer_size;
465
0
    std::unique_ptr<CompactionFilter> compaction_filter;
466
0
    if (ioptions.compaction_filter_factory != nullptr &&
467
0
        ioptions.compaction_filter_factory->ShouldFilterTableFileCreation(
468
0
            TableFileCreationReason::kFlush)) {
469
0
      CompactionFilter::Context ctx;
470
0
      ctx.is_full_compaction = false;
471
0
      ctx.is_manual_compaction = false;
472
0
      ctx.column_family_id = cfd_->GetID();
473
0
      ctx.reason = TableFileCreationReason::kFlush;
474
0
      compaction_filter =
475
0
          ioptions.compaction_filter_factory->CreateCompactionFilter(ctx);
476
0
      if (compaction_filter != nullptr &&
477
0
          !compaction_filter->IgnoreSnapshots()) {
478
0
        s = Status::NotSupported(
479
0
            "CompactionFilter::IgnoreSnapshots() = false is not supported "
480
0
            "anymore.");
481
0
        return s;
482
0
      }
483
0
    }
484
485
0
    new_mem = new MemTable(cfd_->internal_comparator(), cfd_->ioptions(),
486
0
                           mutable_cf_options_, cfd_->write_buffer_mgr(),
487
0
                           earliest_seqno, cfd_->GetID());
488
0
    assert(new_mem != nullptr);
489
490
0
    Env* env = db_options_.env;
491
0
    assert(env);
492
0
    MergeHelper merge(env, (cfd_->internal_comparator()).user_comparator(),
493
0
                      (ioptions.merge_operator).get(), compaction_filter.get(),
494
0
                      ioptions.logger,
495
0
                      true /* internal key corruption is not ok */,
496
0
                      job_context_->GetLatestSnapshotSequence(),
497
0
                      job_context_->snapshot_checker);
498
0
    assert(job_context_);
499
0
    const std::atomic<bool> kManualCompactionCanceledFalse{false};
500
0
    CompactionIterator c_iter(
501
0
        iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
502
0
        kMaxSequenceNumber, &job_context_->snapshot_seqs, earliest_snapshot_,
503
0
        job_context_->earliest_write_conflict_snapshot,
504
0
        job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker,
505
0
        env, ShouldReportDetailedTime(env, ioptions.stats), range_del_agg.get(),
506
0
        nullptr, ioptions.allow_data_in_errors,
507
0
        ioptions.enforce_single_del_contracts,
508
0
        /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
509
0
        false /* must_count_input_entries */,
510
0
        /*compaction=*/nullptr, compaction_filter.get(),
511
0
        /*shutting_down=*/nullptr, ioptions.info_log, full_history_ts_low);
512
513
    // Set earliest sequence number in the new memtable
514
    // to be equal to the earliest sequence number of the
515
    // memtable being flushed (See later if there is a need
516
    // to update this number!).
517
0
    new_mem->SetEarliestSequenceNumber(earliest_seqno);
518
    // Likewise for first seq number.
519
0
    new_mem->SetFirstSequenceNumber(first_seqno);
520
0
    SequenceNumber new_first_seqno = kMaxSequenceNumber;
521
522
0
    c_iter.SeekToFirst();
523
524
    // Key transfer
525
0
    for (; c_iter.Valid(); c_iter.Next()) {
526
0
      const ParsedInternalKey ikey = c_iter.ikey();
527
0
      const Slice value = c_iter.value();
528
0
      new_first_seqno =
529
0
          ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
530
531
      // Should we update "OldestKeyTime" ???? -> timestamp appear
532
      // to still be an "experimental" feature.
533
0
      s = new_mem->Add(
534
0
          ikey.sequence, ikey.type, ikey.user_key, value,
535
0
          nullptr,   // KV protection info set as nullptr since it
536
                     // should only be useful for the first add to
537
                     // the original memtable.
538
0
          false,     // : allow concurrent_memtable_writes_
539
                     // Not seen as necessary for now.
540
0
          nullptr,   // get_post_process_info(m) must be nullptr
541
                     // when concurrent_memtable_writes is switched off.
542
0
          nullptr);  // hint, only used when concurrent_memtable_writes_
543
                     // is switched on.
544
0
      if (!s.ok()) {
545
0
        break;
546
0
      }
547
548
      // If new_mem has size greater than maxSize,
549
      // then rollback to regular flush operation,
550
      // and destroy new_mem.
551
0
      if (new_mem->ApproximateMemoryUsage() > maxSize) {
552
0
        s = Status::Aborted("Mempurge filled more than one memtable.");
553
0
        new_mem_capacity = 1.0;
554
0
        break;
555
0
      }
556
0
    }
557
558
    // Check status and propagate
559
    // potential error status from c_iter
560
0
    if (!s.ok()) {
561
0
      c_iter.status().PermitUncheckedError();
562
0
    } else if (!c_iter.status().ok()) {
563
0
      s = c_iter.status();
564
0
    }
565
566
    // Range tombstone transfer.
567
0
    if (s.ok()) {
568
0
      auto range_del_it = range_del_agg->NewIterator();
569
0
      for (range_del_it->SeekToFirst(); range_del_it->Valid();
570
0
           range_del_it->Next()) {
571
0
        auto tombstone = range_del_it->Tombstone();
572
0
        new_first_seqno =
573
0
            tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
574
0
        s = new_mem->Add(
575
0
            tombstone.seq_,        // Sequence number
576
0
            kTypeRangeDeletion,    // KV type
577
0
            tombstone.start_key_,  // Key is start key.
578
0
            tombstone.end_key_,    // Value is end key.
579
0
            nullptr,               // KV protection info set as nullptr since it
580
                                   // should only be useful for the first add to
581
                                   // the original memtable.
582
0
            false,                 // : allow concurrent_memtable_writes_
583
                                   // Not seen as necessary for now.
584
0
            nullptr,               // get_post_process_info(m) must be nullptr
585
                      // when concurrent_memtable_writes is switched off.
586
0
            nullptr);  // hint, only used when concurrent_memtable_writes_
587
                       // is switched on.
588
589
0
        if (!s.ok()) {
590
0
          break;
591
0
        }
592
593
        // If new_mem has size greater than maxSize,
594
        // then rollback to regular flush operation,
595
        // and destroy new_mem.
596
0
        if (new_mem->ApproximateMemoryUsage() > maxSize) {
597
0
          s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
598
0
          new_mem_capacity = 1.0;
599
0
          break;
600
0
        }
601
0
      }
602
0
    }
603
604
    // If everything happened smoothly and new_mem contains valid data,
605
    // decide if it is flushed to storage or kept in the imm()
606
    // memtable list (memory).
607
0
    if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) {
608
      // Rectify the first sequence number, which (unlike the earliest seq
609
      // number) needs to be present in the new memtable.
610
0
      new_mem->SetFirstSequenceNumber(new_first_seqno);
611
612
      // The new_mem is added to the list of immutable memtables
613
      // only if it filled at less than 100% capacity and isn't flagged
614
      // as in need of being flushed.
615
0
      if (new_mem->ApproximateMemoryUsage() < maxSize &&
616
0
          !(new_mem->ShouldFlushNow())) {
617
        // Construct fragmented memtable range tombstones without mutex
618
0
        new_mem->ConstructFragmentedRangeTombstones();
619
0
        db_mutex_->Lock();
620
        // Take the newest id, so that memtables in MemtableList don't have
621
        // out-of-order memtable ids.
622
0
        uint64_t new_mem_id = mems_.back()->GetID();
623
624
0
        new_mem->SetID(new_mem_id);
625
        // Take the latest memtable's next log number.
626
0
        new_mem->SetNextLogNumber(mems_.back()->GetNextLogNumber());
627
628
        // This addition will not trigger another flush, because
629
        // we do not call EnqueuePendingFlush().
630
0
        cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
631
0
        new_mem->Ref();
632
        // Piggyback FlushJobInfo on the first flushed memtable.
633
0
        db_mutex_->AssertHeld();
634
0
        meta_.fd.file_size = 0;
635
0
        mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
636
0
        db_mutex_->Unlock();
637
0
      } else {
638
0
        s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
639
0
        new_mem_capacity = 1.0;
640
0
        if (new_mem) {
641
0
          job_context_->memtables_to_free.push_back(new_mem);
642
0
        }
643
0
      }
644
0
    } else {
645
      // In this case, the newly allocated new_mem is empty.
646
0
      assert(new_mem != nullptr);
647
0
      job_context_->memtables_to_free.push_back(new_mem);
648
0
    }
649
0
  }
650
651
  // Reacquire the mutex for WriteLevel0 function.
652
0
  db_mutex_->Lock();
653
654
  // If mempurge successful, don't write input tables to level0,
655
  // but write any full output table to level0.
656
0
  if (s.ok()) {
657
0
    TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful");
658
0
  } else {
659
0
    TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
660
0
  }
661
0
  const uint64_t micros = clock_->NowMicros() - start_micros;
662
0
  const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
663
0
  ROCKS_LOG_INFO(db_options_.info_log,
664
0
                 "[%s] [JOB %d] Mempurge lasted %" PRIu64
665
0
                 " microseconds, and %" PRIu64
666
0
                 " cpu "
667
0
                 "microseconds. Status is %s ok. Perc capacity: %f\n",
668
0
                 cfd_->GetName().c_str(), job_context_->job_id, micros,
669
0
                 cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
670
671
0
  return s;
672
0
}
673
674
0
bool FlushJob::MemPurgeDecider(double threshold) {
675
  // Never trigger mempurge if threshold is not a strictly positive value.
676
0
  if (!(threshold > 0.0)) {
677
0
    return false;
678
0
  }
679
0
  if (threshold > (1.0 * mems_.size())) {
680
0
    return true;
681
0
  }
682
  // Payload and useful_payload (in bytes).
683
  // The useful payload ratio of a given MemTable
684
  // is estimated to be useful_payload/payload.
685
0
  uint64_t payload = 0, useful_payload = 0, entry_size = 0;
686
687
  // Local variables used repetitively inside the for-loop
688
  // when iterating over the sampled entries.
689
0
  Slice key_slice, value_slice;
690
0
  ParsedInternalKey res;
691
0
  SnapshotImpl min_snapshot;
692
0
  std::string vget;
693
0
  Status mget_s, parse_s;
694
0
  MergeContext merge_context;
695
0
  SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
696
0
                 min_seqno_snapshot = 0;
697
0
  bool get_res, can_be_useful_payload, not_in_next_mems;
698
699
  // If estimated_useful_payload is > threshold,
700
  // then flush to storage, else MemPurge.
701
0
  double estimated_useful_payload = 0.0;
702
  // Cochran formula for determining sample size.
703
  // 95% confidence interval, 7% precision.
704
  //    n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
705
0
  double n0 = 196.0;
706
  // TODO: plumb Env::IOActivity, Env::IOPriority
707
0
  ReadOptions ro;
708
0
  ro.total_order_seek = true;
709
710
  // Iterate over each memtable of the set.
711
0
  for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
712
0
       ++mem_iter) {
713
0
    ReadOnlyMemTable* mt = *mem_iter;
714
715
    // Else sample from the table.
716
0
    uint64_t nentries = mt->NumEntries();
717
    // Corrected Cochran formula for small populations
718
    // (converges to n0 for large populations).
719
0
    uint64_t target_sample_size =
720
0
        static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
721
0
    std::unordered_set<const char*> sentries = {};
722
    // Populate sample entries set.
723
0
    mt->UniqueRandomSample(target_sample_size, &sentries);
724
725
    // Estimate the garbage ratio by comparing if
726
    // each sample corresponds to a valid entry.
727
0
    for (const char* ss : sentries) {
728
0
      key_slice = GetLengthPrefixedSlice(ss);
729
0
      parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
730
0
      if (!parse_s.ok()) {
731
0
        ROCKS_LOG_WARN(db_options_.info_log,
732
0
                       "Memtable Decider: ParseInternalKey did not parse "
733
0
                       "key_slice %s successfully.",
734
0
                       key_slice.data());
735
0
      }
736
737
      // Size of the entry is "key size (+ value size if KV entry)"
738
0
      entry_size = key_slice.size();
739
0
      if (res.type == kTypeValue) {
740
0
        value_slice =
741
0
            GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
742
0
        entry_size += value_slice.size();
743
0
      }
744
745
      // Count entry bytes as payload.
746
0
      payload += entry_size;
747
748
0
      LookupKey lkey(res.user_key, kMaxSequenceNumber);
749
750
      // Paranoia: zero out these values just in case.
751
0
      max_covering_tombstone_seq = 0;
752
0
      sqno = 0;
753
754
      // Pick the oldest existing snapshot that is more recent
755
      // than the sequence number of the sampled entry.
756
0
      min_seqno_snapshot = kMaxSequenceNumber;
757
0
      for (SequenceNumber seq_num : job_context_->snapshot_seqs) {
758
0
        if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
759
0
          min_seqno_snapshot = seq_num;
760
0
        }
761
0
      }
762
0
      min_snapshot.number_ = min_seqno_snapshot;
763
0
      ro.snapshot =
764
0
          min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
765
766
      // Estimate if the sample entry is valid or not.
767
0
      get_res = mt->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr,
768
0
                        &mget_s, &merge_context, &max_covering_tombstone_seq,
769
0
                        &sqno, ro, true /* immutable_memtable */);
770
0
      if (!get_res) {
771
0
        ROCKS_LOG_WARN(
772
0
            db_options_.info_log,
773
0
            "Memtable Get returned false when Get(sampled entry). "
774
0
            "Yet each sample entry should exist somewhere in the memtable, "
775
0
            "unrelated to whether it has been deleted or not.");
776
0
      }
777
778
      // TODO(bjlemaire): evaluate typeMerge.
779
      // This is where the sampled entry is estimated to be
780
      // garbage or not. Note that this is a garbage *estimation*
781
      // because we do not include certain items such as
782
      // CompactionFitlers triggered at flush, or if the same delete
783
      // has been inserted twice or more in the memtable.
784
785
      // Evaluate if the entry can be useful payload
786
      // Situation #1: entry is a KV entry, was found in the memtable mt
787
      //               and the sequence numbers match.
788
0
      can_be_useful_payload = (res.type == kTypeValue) && get_res &&
789
0
                              mget_s.ok() && (sqno == res.sequence);
790
791
      // Situation #2: entry is a delete entry, was found in the memtable mt
792
      //               (because gres==true) and no valid KV entry is found.
793
      //               (note: duplicate delete entries are also taken into
794
      //               account here, because the sequence number 'sqno'
795
      //               in memtable->Get(&sqno) operation is set to be equal
796
      //               to the most recent delete entry as well).
797
0
      can_be_useful_payload |=
798
0
          ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
799
0
          mget_s.IsNotFound() && get_res && (sqno == res.sequence);
800
801
      // If there is a chance that the entry is useful payload
802
      // Verify that the entry does not appear in the following memtables
803
      // (memtables with greater memtable ID/larger sequence numbers).
804
0
      if (can_be_useful_payload) {
805
0
        not_in_next_mems = true;
806
0
        for (auto next_mem_iter = mem_iter + 1;
807
0
             next_mem_iter != std::end(mems_); next_mem_iter++) {
808
0
          if ((*next_mem_iter)
809
0
                  ->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr,
810
0
                        &mget_s, &merge_context, &max_covering_tombstone_seq,
811
0
                        &sqno, ro, true /* immutable_memtable */)) {
812
0
            not_in_next_mems = false;
813
0
            break;
814
0
          }
815
0
        }
816
0
        if (not_in_next_mems) {
817
0
          useful_payload += entry_size;
818
0
        }
819
0
      }
820
0
    }
821
0
    if (payload > 0) {
822
      // We use the estimated useful payload ratio to
823
      // evaluate how many of the memtable bytes are useful bytes.
824
0
      estimated_useful_payload +=
825
0
          (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
826
827
0
      ROCKS_LOG_INFO(db_options_.info_log,
828
0
                     "Mempurge sampling [CF %s] - found garbage ratio from "
829
0
                     "sampling: %f. Threshold is %f\n",
830
0
                     cfd_->GetName().c_str(),
831
0
                     (payload - useful_payload) * 1.0 / payload, threshold);
832
0
    } else {
833
0
      ROCKS_LOG_WARN(db_options_.info_log,
834
0
                     "Mempurge sampling: null payload measured, and collected "
835
0
                     "sample size is %zu\n.",
836
0
                     sentries.size());
837
0
    }
838
0
  }
839
  // We convert the total number of useful payload bytes
840
  // into the proportion of memtable necessary to store all these bytes.
841
  // We compare this proportion with the threshold value.
842
0
  return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
843
0
          threshold);
844
0
}
845
846
1.14k
Status FlushJob::WriteLevel0Table() {
847
1.14k
  AutoThreadOperationStageUpdater stage_updater(
848
1.14k
      ThreadStatus::STAGE_FLUSH_WRITE_L0);
849
1.14k
  db_mutex_->AssertHeld();
850
1.14k
  const uint64_t start_micros = clock_->NowMicros();
851
1.14k
  const uint64_t start_cpu_micros = clock_->CPUMicros();
852
1.14k
  Status s;
853
854
1.14k
  meta_.temperature = mutable_cf_options_.default_write_temperature;
855
1.14k
  file_options_.temperature = meta_.temperature;
856
857
1.14k
  const auto* ucmp = cfd_->internal_comparator().user_comparator();
858
1.14k
  assert(ucmp);
859
1.14k
  const size_t ts_sz = ucmp->timestamp_size();
860
1.14k
  const bool logical_strip_timestamp =
861
1.14k
      ts_sz > 0 && !cfd_->ioptions().persist_user_defined_timestamps;
862
863
1.14k
  std::vector<BlobFileAddition> blob_file_additions;
864
  // Note that here we treat flush as level 0 compaction in internal stats
865
1.14k
  InternalStats::CompactionStats flush_stats(CompactionReason::kFlush,
866
1.14k
                                             1 /* count**/);
867
1.14k
  {
868
1.14k
    auto write_hint = base_->storage_info()->CalculateSSTWriteHint(
869
1.14k
        /*level=*/0, db_options_.calculate_sst_write_lifetime_hint_set);
870
1.14k
    Env::IOPriority io_priority = GetRateLimiterPriority();
871
1.14k
    db_mutex_->Unlock();
872
1.14k
    if (log_buffer_) {
873
1.14k
      log_buffer_->FlushBufferToLog();
874
1.14k
    }
875
    // memtables and range_del_iters store internal iterators over each data
876
    // memtable and its associated range deletion memtable, respectively, at
877
    // corresponding indexes.
878
1.14k
    std::vector<InternalIterator*> memtables;
879
1.14k
    std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
880
1.14k
        range_del_iters;
881
1.14k
    ReadOptions ro;
882
1.14k
    ro.total_order_seek = true;
883
1.14k
    ro.io_activity = Env::IOActivity::kFlush;
884
1.14k
    Arena arena;
885
1.14k
    uint64_t total_num_input_entries = 0, total_num_deletes = 0;
886
1.14k
    uint64_t total_data_size = 0;
887
1.14k
    size_t total_memory_usage = 0;
888
1.14k
    uint64_t total_num_range_deletes = 0;
889
    // Used for testing:
890
1.14k
    uint64_t mems_size = mems_.size();
891
1.14k
    (void)mems_size;  // avoids unused variable error when
892
                      // TEST_SYNC_POINT_CALLBACK not used.
893
1.14k
    TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables",
894
1.14k
                             &mems_size);
895
1.14k
    assert(job_context_);
896
1.14k
    for (ReadOnlyMemTable* m : mems_) {
897
1.14k
      ROCKS_LOG_INFO(db_options_.info_log,
898
1.14k
                     "[%s] [JOB %d] Flushing memtable id %" PRIu64
899
1.14k
                     " with next log file: %" PRIu64 ", marked_for_flush: %d\n",
900
1.14k
                     cfd_->GetName().c_str(), job_context_->job_id, m->GetID(),
901
1.14k
                     m->GetNextLogNumber(), m->IsMarkedForFlush());
902
1.14k
      if (logical_strip_timestamp) {
903
0
        memtables.push_back(m->NewTimestampStrippingIterator(
904
0
            ro, /*seqno_to_time_mapping=*/nullptr, &arena,
905
0
            /*prefix_extractor=*/nullptr, ts_sz));
906
1.14k
      } else {
907
1.14k
        memtables.push_back(
908
1.14k
            m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
909
1.14k
                           /*prefix_extractor=*/nullptr, /*for_flush=*/true));
910
1.14k
      }
911
1.14k
      auto* range_del_iter =
912
1.14k
          logical_strip_timestamp
913
1.14k
              ? m->NewTimestampStrippingRangeTombstoneIterator(
914
0
                    ro, kMaxSequenceNumber, ts_sz)
915
1.14k
              : m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
916
1.14k
                                             true /* immutable_memtable */);
917
1.14k
      if (range_del_iter != nullptr) {
918
0
        range_del_iters.emplace_back(range_del_iter);
919
0
      }
920
1.14k
      total_num_input_entries += m->NumEntries();
921
1.14k
      total_num_deletes += m->NumDeletion();
922
1.14k
      total_data_size += m->GetDataSize();
923
1.14k
      total_memory_usage += m->ApproximateMemoryUsage();
924
1.14k
      total_num_range_deletes += m->NumRangeDeletion();
925
1.14k
    }
926
927
    // TODO(cbi): when memtable is flushed due to number of range deletions
928
    //  hitting limit memtable_max_range_deletions, flush_reason_ is still
929
    //  "Write Buffer Full", should make update flush_reason_ accordingly.
930
1.14k
    event_logger_->Log() << "job" << job_context_->job_id << "event"
931
1.14k
                         << "flush_started" << "num_memtables" << mems_.size()
932
1.14k
                         << "total_num_input_entries" << total_num_input_entries
933
1.14k
                         << "num_deletes" << total_num_deletes
934
1.14k
                         << "total_data_size" << total_data_size
935
1.14k
                         << "memory_usage" << total_memory_usage
936
1.14k
                         << "num_range_deletes" << total_num_range_deletes
937
1.14k
                         << "flush_reason"
938
1.14k
                         << GetFlushReasonString(flush_reason_);
939
940
1.14k
    {
941
1.14k
      ScopedArenaPtr<InternalIterator> iter(
942
1.14k
          NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
943
1.14k
                             static_cast<int>(memtables.size()), &arena));
944
1.14k
      ROCKS_LOG_INFO(db_options_.info_log,
945
1.14k
                     "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
946
1.14k
                     cfd_->GetName().c_str(), job_context_->job_id,
947
1.14k
                     meta_.fd.GetNumber());
948
949
1.14k
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
950
1.14k
                               &output_compression_);
951
1.14k
      int64_t _current_time = 0;
952
1.14k
      auto status = clock_->GetCurrentTime(&_current_time);
953
      // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
954
1.14k
      if (!status.ok()) {
955
0
        ROCKS_LOG_WARN(
956
0
            db_options_.info_log,
957
0
            "Failed to get current time to populate creation_time property. "
958
0
            "Status: %s",
959
0
            status.ToString().c_str());
960
0
      }
961
1.14k
      const uint64_t current_time = static_cast<uint64_t>(_current_time);
962
963
1.14k
      uint64_t oldest_key_time = mems_.front()->ApproximateOldestKeyTime();
964
965
      // It's not clear whether oldest_key_time is always available. In case
966
      // it is not available, use current_time.
967
1.14k
      uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time);
968
969
1.14k
      TEST_SYNC_POINT_CALLBACK(
970
1.14k
          "FlushJob::WriteLevel0Table:oldest_ancester_time",
971
1.14k
          &oldest_ancester_time);
972
1.14k
      meta_.oldest_ancester_time = oldest_ancester_time;
973
1.14k
      meta_.file_creation_time = current_time;
974
975
1.14k
      uint64_t memtable_payload_bytes = 0;
976
1.14k
      uint64_t memtable_garbage_bytes = 0;
977
1.14k
      IOStatus io_s;
978
979
1.14k
      const std::string* const full_history_ts_low =
980
1.14k
          (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
981
1.14k
      ReadOptions read_options(Env::IOActivity::kFlush);
982
1.14k
      read_options.rate_limiter_priority = io_priority;
983
1.14k
      const WriteOptions write_options(io_priority, Env::IOActivity::kFlush);
984
1.14k
      TableBuilderOptions tboptions(
985
1.14k
          cfd_->ioptions(), mutable_cf_options_, read_options, write_options,
986
1.14k
          cfd_->internal_comparator(), cfd_->internal_tbl_prop_coll_factories(),
987
1.14k
          output_compression_, mutable_cf_options_.compression_opts,
988
1.14k
          cfd_->GetID(), cfd_->GetName(), 0 /* level */,
989
1.14k
          current_time /* newest_key_time */, false /* is_bottommost */,
990
1.14k
          TableFileCreationReason::kFlush, oldest_key_time, current_time,
991
1.14k
          db_id_, db_session_id_, 0 /* target_file_size */,
992
1.14k
          meta_.fd.GetNumber(),
993
1.14k
          preclude_last_level_min_seqno_ == kMaxSequenceNumber
994
1.14k
              ? preclude_last_level_min_seqno_
995
1.14k
              : std::min(earliest_snapshot_, preclude_last_level_min_seqno_));
996
1.14k
      s = BuildTable(
997
1.14k
          dbname_, versions_, db_options_, tboptions, file_options_,
998
1.14k
          cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
999
1.14k
          &blob_file_additions, job_context_->snapshot_seqs, earliest_snapshot_,
1000
1.14k
          job_context_->earliest_write_conflict_snapshot,
1001
1.14k
          job_context_->GetJobSnapshotSequence(),
1002
1.14k
          job_context_->snapshot_checker,
1003
1.14k
          mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
1004
1.14k
          &io_s, io_tracer_, BlobFileCreationReason::kFlush,
1005
1.14k
          seqno_to_time_mapping_.get(), event_logger_, job_context_->job_id,
1006
1.14k
          &table_properties_, write_hint, full_history_ts_low, blob_callback_,
1007
1.14k
          base_, &memtable_payload_bytes, &memtable_garbage_bytes,
1008
1.14k
          &flush_stats);
1009
1.14k
      TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s);
1010
      // TODO: Cleanup io_status in BuildTable and table builders
1011
1.14k
      assert(!s.ok() || io_s.ok());
1012
1.14k
      io_s.PermitUncheckedError();
1013
1.14k
      if (s.ok() && total_num_input_entries != flush_stats.num_input_records) {
1014
0
        std::string msg = "Expected " +
1015
0
                          std::to_string(total_num_input_entries) +
1016
0
                          " entries in memtables, but read " +
1017
0
                          std::to_string(flush_stats.num_input_records);
1018
0
        ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s",
1019
0
                       cfd_->GetName().c_str(), job_context_->job_id,
1020
0
                       msg.c_str());
1021
0
        if (db_options_.flush_verify_memtable_count) {
1022
0
          s = Status::Corruption(msg);
1023
0
        }
1024
0
      }
1025
1026
      // Only verify on table with format collects table properties
1027
1.14k
      if (s.ok() &&
1028
1.14k
          (mutable_cf_options_.table_factory->IsInstanceOf(
1029
1.14k
               TableFactory::kBlockBasedTableName()) ||
1030
0
           mutable_cf_options_.table_factory->IsInstanceOf(
1031
0
               TableFactory::kPlainTableName())) &&
1032
1.14k
          flush_stats.num_output_records != table_properties_.num_entries) {
1033
0
        std::string msg =
1034
0
            "Number of keys in flush output SST files does not match "
1035
0
            "number of keys added to the table. Expected " +
1036
0
            std::to_string(flush_stats.num_output_records) + " but there are " +
1037
0
            std::to_string(table_properties_.num_entries) +
1038
0
            " in output SST files";
1039
0
        ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s",
1040
0
                       cfd_->GetName().c_str(), job_context_->job_id,
1041
0
                       msg.c_str());
1042
0
        if (db_options_.flush_verify_memtable_count) {
1043
0
          s = Status::Corruption(msg);
1044
0
        }
1045
0
      }
1046
1.14k
      if (tboptions.reason == TableFileCreationReason::kFlush) {
1047
1.14k
        TEST_SYNC_POINT("DBImpl::FlushJob:Flush");
1048
1.14k
        RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH,
1049
1.14k
                   memtable_payload_bytes);
1050
1.14k
        RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
1051
1.14k
                   memtable_garbage_bytes);
1052
1.14k
      }
1053
1.14k
      LogFlush(db_options_.info_log);
1054
1.14k
    }
1055
1.14k
    ROCKS_LOG_BUFFER(log_buffer_,
1056
1.14k
                     "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
1057
1.14k
                     " bytes %s"
1058
1.14k
                     " %s"
1059
1.14k
                     " %s",
1060
1.14k
                     cfd_->GetName().c_str(), job_context_->job_id,
1061
1.14k
                     meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
1062
1.14k
                     s.ToString().c_str(),
1063
1.14k
                     s.ok() && meta_.fd.GetFileSize() == 0
1064
1.14k
                         ? "It's an empty SST file from a successful flush so "
1065
1.14k
                           "won't be kept in the DB"
1066
1.14k
                         : "",
1067
1.14k
                     meta_.marked_for_compaction ? " (needs compaction)" : "");
1068
1069
1.14k
    if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
1070
1.14k
      s = output_file_directory_->FsyncWithDirOptions(
1071
1.14k
          IOOptions(), nullptr,
1072
1.14k
          DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
1073
1.14k
    }
1074
1.14k
    TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
1075
1.14k
    db_mutex_->Lock();
1076
1.14k
  }
1077
1.14k
  base_->Unref();
1078
1079
  // Note that if file_size is zero, the file has been deleted and
1080
  // should not be added to the manifest.
1081
1.14k
  const bool has_output = meta_.fd.GetFileSize() > 0;
1082
1083
1.14k
  if (s.ok() && has_output) {
1084
1.14k
    TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated");
1085
    // if we have more than 1 background thread, then we cannot
1086
    // insert files directly into higher levels because some other
1087
    // threads could be concurrently producing compacted files for
1088
    // that key range.
1089
    // Add file to L0
1090
1.14k
    edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
1091
1.14k
                   meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
1092
1.14k
                   meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
1093
1.14k
                   meta_.marked_for_compaction, meta_.temperature,
1094
1.14k
                   meta_.oldest_blob_file_number, meta_.oldest_ancester_time,
1095
1.14k
                   meta_.file_creation_time, meta_.epoch_number,
1096
1.14k
                   meta_.file_checksum, meta_.file_checksum_func_name,
1097
1.14k
                   meta_.unique_id, meta_.compensated_range_deletion_size,
1098
1.14k
                   meta_.tail_size, meta_.user_defined_timestamps_persisted,
1099
1.14k
                   meta_.min_timestamp, meta_.max_timestamp);
1100
1.14k
    edit_->SetBlobFileAdditions(std::move(blob_file_additions));
1101
1.14k
  }
1102
  // Piggyback FlushJobInfo on the first first flushed memtable.
1103
1.14k
  mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
1104
1105
1.14k
  const uint64_t micros = clock_->NowMicros() - start_micros;
1106
1.14k
  const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
1107
1.14k
  flush_stats.micros = micros;
1108
1.14k
  flush_stats.cpu_micros += cpu_micros;
1109
1110
1.14k
  ROCKS_LOG_INFO(db_options_.info_log,
1111
1.14k
                 "[%s] [JOB %d] Flush lasted %" PRIu64
1112
1.14k
                 " microseconds, and %" PRIu64 " cpu microseconds.\n",
1113
1.14k
                 cfd_->GetName().c_str(), job_context_->job_id, micros,
1114
1.14k
                 flush_stats.cpu_micros);
1115
1116
1.14k
  if (has_output) {
1117
1.14k
    flush_stats.bytes_written = meta_.fd.GetFileSize();
1118
1.14k
    flush_stats.num_output_files = 1;
1119
1.14k
  }
1120
1121
1.14k
  const auto& blobs = edit_->GetBlobFileAdditions();
1122
1.14k
  for (const auto& blob : blobs) {
1123
0
    flush_stats.bytes_written_blob += blob.GetTotalBlobBytes();
1124
0
  }
1125
1126
1.14k
  flush_stats.num_output_files_blob = static_cast<int>(blobs.size());
1127
1128
1.14k
  RecordTimeToHistogram(stats_, FLUSH_TIME, flush_stats.micros);
1129
1.14k
  cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_,
1130
1.14k
                                             flush_stats);
1131
1.14k
  cfd_->internal_stats()->AddCFStats(
1132
1.14k
      InternalStats::BYTES_FLUSHED,
1133
1.14k
      flush_stats.bytes_written + flush_stats.bytes_written_blob);
1134
1.14k
  RecordFlushIOStats();
1135
1136
1.14k
  return s;
1137
1.14k
}
1138
1139
1.14k
Env::IOPriority FlushJob::GetRateLimiterPriority() {
1140
1.14k
  if (versions_ && versions_->GetColumnFamilySet() &&
1141
1.14k
      versions_->GetColumnFamilySet()->write_controller()) {
1142
1.14k
    WriteController* write_controller =
1143
1.14k
        versions_->GetColumnFamilySet()->write_controller();
1144
1.14k
    if (write_controller->IsStopped() || write_controller->NeedsDelay()) {
1145
0
      return Env::IO_USER;
1146
0
    }
1147
1.14k
  }
1148
1149
1.14k
  return Env::IO_HIGH;
1150
1.14k
}
1151
1152
1.14k
std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
1153
1.14k
  db_mutex_->AssertHeld();
1154
1.14k
  std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
1155
1.14k
  info->cf_id = cfd_->GetID();
1156
1.14k
  info->cf_name = cfd_->GetName();
1157
1158
1.14k
  const uint64_t file_number = meta_.fd.GetNumber();
1159
1.14k
  info->file_path =
1160
1.14k
      MakeTableFileName(cfd_->ioptions().cf_paths[0].path, file_number);
1161
1.14k
  info->file_number = file_number;
1162
1.14k
  info->oldest_blob_file_number = meta_.oldest_blob_file_number;
1163
1.14k
  info->thread_id = db_options_.env->GetThreadID();
1164
1.14k
  info->job_id = job_context_->job_id;
1165
1.14k
  info->smallest_seqno = meta_.fd.smallest_seqno;
1166
1.14k
  info->largest_seqno = meta_.fd.largest_seqno;
1167
1.14k
  info->table_properties = table_properties_;
1168
1.14k
  info->flush_reason = flush_reason_;
1169
1.14k
  info->blob_compression_type = mutable_cf_options_.blob_compression_type;
1170
1171
  // Update BlobFilesInfo.
1172
1.14k
  for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
1173
0
    BlobFileAdditionInfo blob_file_addition_info(
1174
0
        BlobFileName(cfd_->ioptions().cf_paths.front().path,
1175
0
                     blob_file.GetBlobFileNumber()) /*blob_file_path*/,
1176
0
        blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
1177
0
        blob_file.GetTotalBlobBytes());
1178
0
    info->blob_file_addition_infos.emplace_back(
1179
0
        std::move(blob_file_addition_info));
1180
0
  }
1181
1.14k
  return info;
1182
1.14k
}
1183
1184
1.14k
void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() {
1185
1.14k
  db_mutex_->AssertHeld();
1186
1.14k
  assert(pick_memtable_called);
1187
1.14k
  const auto* ucmp = cfd_->internal_comparator().user_comparator();
1188
1.14k
  assert(ucmp);
1189
1.14k
  const size_t ts_sz = ucmp->timestamp_size();
1190
1.14k
  if (db_options_.atomic_flush || ts_sz == 0 ||
1191
1.14k
      cfd_->ioptions().persist_user_defined_timestamps) {
1192
1.14k
    return;
1193
1.14k
  }
1194
  // Find the newest user-defined timestamps from all the flushed memtables.
1195
0
  for (const ReadOnlyMemTable* m : mems_) {
1196
0
    Slice table_newest_udt = m->GetNewestUDT();
1197
    // Empty memtables can be legitimately created and flushed, for example
1198
    // by error recovery flush attempts.
1199
0
    if (table_newest_udt.empty()) {
1200
0
      continue;
1201
0
    }
1202
0
    if (cutoff_udt_.empty() ||
1203
0
        ucmp->CompareTimestamp(table_newest_udt, cutoff_udt_) > 0) {
1204
0
      if (!cutoff_udt_.empty()) {
1205
0
        assert(table_newest_udt.size() == cutoff_udt_.size());
1206
0
      }
1207
0
      cutoff_udt_.assign(table_newest_udt.data(), table_newest_udt.size());
1208
0
    }
1209
0
  }
1210
0
}
1211
1212
1.14k
void FlushJob::GetPrecludeLastLevelMinSeqno() {
1213
1.14k
  if (mutable_cf_options_.preclude_last_level_data_seconds == 0) {
1214
1.14k
    return;
1215
1.14k
  }
1216
  // SuperVersion should guarantee this
1217
1.14k
  assert(seqno_to_time_mapping_);
1218
0
  assert(!seqno_to_time_mapping_->Empty());
1219
0
  int64_t current_time = 0;
1220
0
  Status s = db_options_.clock->GetCurrentTime(&current_time);
1221
0
  if (!s.ok()) {
1222
0
    ROCKS_LOG_WARN(db_options_.info_log,
1223
0
                   "Failed to get current time in Flush: Status: %s",
1224
0
                   s.ToString().c_str());
1225
0
  } else {
1226
0
    SequenceNumber preserve_time_min_seqno;
1227
0
    seqno_to_time_mapping_->GetCurrentTieringCutoffSeqnos(
1228
0
        static_cast<uint64_t>(current_time),
1229
0
        mutable_cf_options_.preserve_internal_time_seconds,
1230
0
        mutable_cf_options_.preclude_last_level_data_seconds,
1231
0
        &preserve_time_min_seqno, &preclude_last_level_min_seqno_);
1232
0
  }
1233
0
}
1234
1235
1.14k
Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() {
1236
1.14k
  db_mutex_->AssertHeld();
1237
1.14k
  const auto* ucmp = cfd_->user_comparator();
1238
1.14k
  assert(ucmp);
1239
1.14k
  const std::string& full_history_ts_low = cfd_->GetFullHistoryTsLow();
1240
  // Update full_history_ts_low to right above cutoff udt only if that would
1241
  // increase it.
1242
1.14k
  if (cutoff_udt_.empty() ||
1243
0
      (!full_history_ts_low.empty() &&
1244
1.14k
       ucmp->CompareTimestamp(cutoff_udt_, full_history_ts_low) < 0)) {
1245
1.14k
    return Status::OK();
1246
1.14k
  }
1247
0
  std::string new_full_history_ts_low;
1248
0
  Slice cutoff_udt_slice = cutoff_udt_;
1249
  // TODO(yuzhangyu): Add a member to AdvancedColumnFamilyOptions for an
1250
  //  operation to get the next immediately larger user-defined timestamp to
1251
  //  expand this feature to other user-defined timestamp formats.
1252
0
  GetFullHistoryTsLowFromU64CutoffTs(&cutoff_udt_slice,
1253
0
                                     &new_full_history_ts_low);
1254
0
  VersionEdit edit;
1255
0
  edit.SetColumnFamily(cfd_->GetID());
1256
0
  edit.SetFullHistoryTsLow(new_full_history_ts_low);
1257
0
  return versions_->LogAndApply(cfd_, ReadOptions(Env::IOActivity::kFlush),
1258
0
                                WriteOptions(Env::IOActivity::kFlush), &edit,
1259
0
                                db_mutex_, output_file_directory_);
1260
1.14k
}
1261
1262
}  // namespace ROCKSDB_NAMESPACE