/src/rocksdb/db/flush_job.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/flush_job.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <cinttypes> |
14 | | #include <vector> |
15 | | |
16 | | #include "db/builder.h" |
17 | | #include "db/db_iter.h" |
18 | | #include "db/dbformat.h" |
19 | | #include "db/event_helpers.h" |
20 | | #include "db/log_reader.h" |
21 | | #include "db/log_writer.h" |
22 | | #include "db/memtable.h" |
23 | | #include "db/memtable_list.h" |
24 | | #include "db/merge_context.h" |
25 | | #include "db/range_tombstone_fragmenter.h" |
26 | | #include "db/version_edit.h" |
27 | | #include "db/version_set.h" |
28 | | #include "file/file_util.h" |
29 | | #include "file/filename.h" |
30 | | #include "logging/event_logger.h" |
31 | | #include "logging/log_buffer.h" |
32 | | #include "logging/logging.h" |
33 | | #include "monitoring/iostats_context_imp.h" |
34 | | #include "monitoring/perf_context_imp.h" |
35 | | #include "monitoring/thread_status_util.h" |
36 | | #include "port/port.h" |
37 | | #include "rocksdb/db.h" |
38 | | #include "rocksdb/env.h" |
39 | | #include "rocksdb/statistics.h" |
40 | | #include "rocksdb/status.h" |
41 | | #include "rocksdb/table.h" |
42 | | #include "table/merging_iterator.h" |
43 | | #include "table/table_builder.h" |
44 | | #include "table/two_level_iterator.h" |
45 | | #include "test_util/sync_point.h" |
46 | | #include "util/coding.h" |
47 | | #include "util/mutexlock.h" |
48 | | #include "util/stop_watch.h" |
49 | | |
50 | | namespace ROCKSDB_NAMESPACE { |
51 | | |
52 | 1.14k | const char* GetFlushReasonString(FlushReason flush_reason) { |
53 | 1.14k | switch (flush_reason) { |
54 | 0 | case FlushReason::kOthers: |
55 | 0 | return "Other Reasons"; |
56 | 0 | case FlushReason::kGetLiveFiles: |
57 | 0 | return "Get Live Files"; |
58 | 0 | case FlushReason::kShutDown: |
59 | 0 | return "Shut down"; |
60 | 0 | case FlushReason::kExternalFileIngestion: |
61 | 0 | return "External File Ingestion"; |
62 | 1.14k | case FlushReason::kManualCompaction: |
63 | 1.14k | return "Manual Compaction"; |
64 | 0 | case FlushReason::kWriteBufferManager: |
65 | 0 | return "Write Buffer Manager"; |
66 | 0 | case FlushReason::kWriteBufferFull: |
67 | 0 | return "Write Buffer Full"; |
68 | 0 | case FlushReason::kTest: |
69 | 0 | return "Test"; |
70 | 0 | case FlushReason::kDeleteFiles: |
71 | 0 | return "Delete Files"; |
72 | 0 | case FlushReason::kAutoCompaction: |
73 | 0 | return "Auto Compaction"; |
74 | 0 | case FlushReason::kManualFlush: |
75 | 0 | return "Manual Flush"; |
76 | 0 | case FlushReason::kErrorRecovery: |
77 | 0 | return "Error Recovery"; |
78 | 0 | case FlushReason::kErrorRecoveryRetryFlush: |
79 | 0 | return "Error Recovery Retry Flush"; |
80 | 0 | case FlushReason::kWalFull: |
81 | 0 | return "WAL Full"; |
82 | 0 | case FlushReason::kCatchUpAfterErrorRecovery: |
83 | 0 | return "Catch Up After Error Recovery"; |
84 | 0 | default: |
85 | 0 | return "Invalid"; |
86 | 1.14k | } |
87 | 1.14k | } |
88 | | |
89 | | FlushJob::FlushJob( |
90 | | const std::string& dbname, ColumnFamilyData* cfd, |
91 | | const ImmutableDBOptions& db_options, |
92 | | const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id, |
93 | | const FileOptions& file_options, VersionSet* versions, |
94 | | InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down, |
95 | | JobContext* job_context, FlushReason flush_reason, LogBuffer* log_buffer, |
96 | | FSDirectory* db_directory, FSDirectory* output_file_directory, |
97 | | CompressionType output_compression, Statistics* stats, |
98 | | EventLogger* event_logger, bool measure_io_stats, |
99 | | const bool sync_output_directory, const bool write_manifest, |
100 | | Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer, |
101 | | std::shared_ptr<const SeqnoToTimeMapping> seqno_to_time_mapping, |
102 | | const std::string& db_id, const std::string& db_session_id, |
103 | | std::string full_history_ts_low, BlobFileCompletionCallback* blob_callback) |
104 | 1.14k | : dbname_(dbname), |
105 | 1.14k | db_id_(db_id), |
106 | 1.14k | db_session_id_(db_session_id), |
107 | 1.14k | cfd_(cfd), |
108 | 1.14k | db_options_(db_options), |
109 | 1.14k | mutable_cf_options_(mutable_cf_options), |
110 | 1.14k | max_memtable_id_(max_memtable_id), |
111 | 1.14k | file_options_(file_options), |
112 | 1.14k | versions_(versions), |
113 | 1.14k | db_mutex_(db_mutex), |
114 | 1.14k | shutting_down_(shutting_down), |
115 | 1.14k | earliest_snapshot_(job_context->GetEarliestSnapshotSequence()), |
116 | 1.14k | job_context_(job_context), |
117 | 1.14k | flush_reason_(flush_reason), |
118 | 1.14k | log_buffer_(log_buffer), |
119 | 1.14k | db_directory_(db_directory), |
120 | 1.14k | output_file_directory_(output_file_directory), |
121 | 1.14k | output_compression_(output_compression), |
122 | 1.14k | stats_(stats), |
123 | 1.14k | event_logger_(event_logger), |
124 | 1.14k | measure_io_stats_(measure_io_stats), |
125 | 1.14k | sync_output_directory_(sync_output_directory), |
126 | 1.14k | write_manifest_(write_manifest), |
127 | 1.14k | edit_(nullptr), |
128 | 1.14k | base_(nullptr), |
129 | 1.14k | pick_memtable_called(false), |
130 | 1.14k | thread_pri_(thread_pri), |
131 | 1.14k | io_tracer_(io_tracer), |
132 | 1.14k | clock_(db_options_.clock), |
133 | 1.14k | full_history_ts_low_(std::move(full_history_ts_low)), |
134 | 1.14k | blob_callback_(blob_callback), |
135 | 1.14k | seqno_to_time_mapping_(std::move(seqno_to_time_mapping)) { |
136 | 1.14k | assert(job_context->snapshot_context_initialized); |
137 | | // Update the thread status to indicate flush. |
138 | 1.14k | ReportStartedFlush(); |
139 | 1.14k | TEST_SYNC_POINT("FlushJob::FlushJob()"); |
140 | 1.14k | } |
141 | | |
142 | 1.14k | FlushJob::~FlushJob() { ThreadStatusUtil::ResetThreadStatus(); } |
143 | | |
144 | 1.14k | void FlushJob::ReportStartedFlush() { |
145 | 1.14k | ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking); |
146 | 1.14k | ThreadStatusUtil::SetColumnFamily(cfd_); |
147 | 1.14k | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH); |
148 | 1.14k | ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID, |
149 | 1.14k | job_context_->job_id); |
150 | | |
151 | 1.14k | IOSTATS_RESET(bytes_written); |
152 | 1.14k | } |
153 | | |
154 | 1.14k | void FlushJob::ReportFlushInputSize(const autovector<ReadOnlyMemTable*>& mems) { |
155 | 1.14k | uint64_t input_size = 0; |
156 | 1.14k | for (auto* mem : mems) { |
157 | 1.14k | input_size += mem->ApproximateMemoryUsage(); |
158 | 1.14k | } |
159 | 1.14k | ThreadStatusUtil::IncreaseThreadOperationProperty( |
160 | 1.14k | ThreadStatus::FLUSH_BYTES_MEMTABLES, input_size); |
161 | 1.14k | } |
162 | | |
163 | 2.29k | void FlushJob::RecordFlushIOStats() { |
164 | 2.29k | RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written)); |
165 | 2.29k | ThreadStatusUtil::IncreaseThreadOperationProperty( |
166 | 2.29k | ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); |
167 | 2.29k | IOSTATS_RESET(bytes_written); |
168 | 2.29k | } |
169 | 1.14k | void FlushJob::PickMemTable() { |
170 | 1.14k | db_mutex_->AssertHeld(); |
171 | 1.14k | assert(!pick_memtable_called); |
172 | 1.14k | pick_memtable_called = true; |
173 | | |
174 | | // Maximum "NextLogNumber" of the memtables to flush. |
175 | | // When mempurge feature is turned off, this variable is useless |
176 | | // because the memtables are implicitly sorted by increasing order of creation |
177 | | // time. Therefore mems_->back()->GetNextLogNumber() is already equal to |
178 | | // max_next_log_number. However when Mempurge is on, the memtables are no |
179 | | // longer sorted by increasing order of creation time. Therefore this variable |
180 | | // becomes necessary because mems_->back()->GetNextLogNumber() is no longer |
181 | | // necessarily equal to max_next_log_number. |
182 | 1.14k | uint64_t max_next_log_number = 0; |
183 | | |
184 | | // Save the contents of the earliest memtable as a new Table |
185 | 1.14k | cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_, |
186 | 1.14k | &max_next_log_number); |
187 | 1.14k | if (mems_.empty()) { |
188 | 0 | return; |
189 | 0 | } |
190 | | |
191 | | // Track effective cutoff user-defined timestamp during flush if |
192 | | // user-defined timestamps can be stripped. |
193 | 1.14k | GetEffectiveCutoffUDTForPickedMemTables(); |
194 | 1.14k | GetPrecludeLastLevelMinSeqno(); |
195 | | |
196 | 1.14k | ReportFlushInputSize(mems_); |
197 | | |
198 | | // entries mems are (implicitly) sorted in ascending order by their created |
199 | | // time. We will use the first memtable's `edit` to keep the meta info for |
200 | | // this flush. |
201 | 1.14k | ReadOnlyMemTable* m = mems_[0]; |
202 | 1.14k | edit_ = m->GetEdits(); |
203 | 1.14k | edit_->SetPrevLogNumber(0); |
204 | | // SetLogNumber(log_num) indicates logs with number smaller than log_num |
205 | | // will no longer be picked up for recovery. |
206 | 1.14k | edit_->SetLogNumber(max_next_log_number); |
207 | 1.14k | edit_->SetColumnFamily(cfd_->GetID()); |
208 | | |
209 | | // path 0 for level 0 file. |
210 | 1.14k | meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); |
211 | 1.14k | meta_.epoch_number = cfd_->NewEpochNumber(); |
212 | | |
213 | 1.14k | base_ = cfd_->current(); |
214 | 1.14k | base_->Ref(); // it is likely that we do not need this reference |
215 | 1.14k | } |
216 | | |
217 | | Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta, |
218 | | bool* switched_to_mempurge, bool* skipped_since_bg_error, |
219 | 1.14k | ErrorHandler* error_handler) { |
220 | 1.14k | TEST_SYNC_POINT("FlushJob::Start"); |
221 | 1.14k | db_mutex_->AssertHeld(); |
222 | 1.14k | assert(pick_memtable_called); |
223 | | // Mempurge threshold can be dynamically changed. |
224 | | // For sake of consistency, mempurge_threshold is |
225 | | // saved locally to maintain consistency in each |
226 | | // FlushJob::Run call. |
227 | 1.14k | double mempurge_threshold = |
228 | 1.14k | mutable_cf_options_.experimental_mempurge_threshold; |
229 | | |
230 | 1.14k | AutoThreadOperationStageUpdater stage_run(ThreadStatus::STAGE_FLUSH_RUN); |
231 | 1.14k | if (mems_.empty()) { |
232 | 0 | ROCKS_LOG_BUFFER(log_buffer_, "[%s] No memtable to flush", |
233 | 0 | cfd_->GetName().c_str()); |
234 | 0 | return Status::OK(); |
235 | 0 | } |
236 | | |
237 | | // I/O measurement variables |
238 | 1.14k | PerfLevel prev_perf_level = PerfLevel::kEnableTime; |
239 | 1.14k | uint64_t prev_write_nanos = 0; |
240 | 1.14k | uint64_t prev_fsync_nanos = 0; |
241 | 1.14k | uint64_t prev_range_sync_nanos = 0; |
242 | 1.14k | uint64_t prev_prepare_write_nanos = 0; |
243 | 1.14k | uint64_t prev_cpu_write_nanos = 0; |
244 | 1.14k | uint64_t prev_cpu_read_nanos = 0; |
245 | 1.14k | if (measure_io_stats_) { |
246 | 0 | prev_perf_level = GetPerfLevel(); |
247 | 0 | SetPerfLevel(PerfLevel::kEnableTime); |
248 | 0 | prev_write_nanos = IOSTATS(write_nanos); |
249 | 0 | prev_fsync_nanos = IOSTATS(fsync_nanos); |
250 | 0 | prev_range_sync_nanos = IOSTATS(range_sync_nanos); |
251 | 0 | prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); |
252 | 0 | prev_cpu_write_nanos = IOSTATS(cpu_write_nanos); |
253 | 0 | prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); |
254 | 0 | } |
255 | 1.14k | Status mempurge_s = Status::NotFound("No MemPurge."); |
256 | 1.14k | if ((mempurge_threshold > 0.0) && |
257 | 0 | (flush_reason_ == FlushReason::kWriteBufferFull) && (!mems_.empty()) && |
258 | 0 | MemPurgeDecider(mempurge_threshold) && !(db_options_.atomic_flush)) { |
259 | 0 | cfd_->SetMempurgeUsed(); |
260 | 0 | mempurge_s = MemPurge(); |
261 | 0 | if (!mempurge_s.ok()) { |
262 | | // Mempurge is typically aborted when the output |
263 | | // bytes cannot be contained onto a single output memtable. |
264 | 0 | if (mempurge_s.IsAborted()) { |
265 | 0 | ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n", |
266 | 0 | mempurge_s.ToString().c_str()); |
267 | 0 | } else { |
268 | | // However the mempurge process can also fail for |
269 | | // other reasons (eg: new_mem->Add() fails). |
270 | 0 | ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n", |
271 | 0 | mempurge_s.ToString().c_str()); |
272 | 0 | } |
273 | 0 | } else { |
274 | 0 | if (switched_to_mempurge) { |
275 | 0 | *switched_to_mempurge = true; |
276 | 0 | } else { |
277 | | // The mempurge process was successful, but no switch_to_mempurge |
278 | | // pointer provided so no way to propagate the state of flush job. |
279 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
280 | 0 | "Mempurge process succeeded" |
281 | 0 | "but no 'switched_to_mempurge' ptr provided.\n"); |
282 | 0 | } |
283 | 0 | } |
284 | 0 | } |
285 | 1.14k | Status s; |
286 | 1.14k | if (mempurge_s.ok()) { |
287 | 0 | base_->Unref(); |
288 | 0 | s = Status::OK(); |
289 | 1.14k | } else { |
290 | | // This will release and re-acquire the mutex. |
291 | 1.14k | s = WriteLevel0Table(); |
292 | 1.14k | } |
293 | | |
294 | 1.14k | if (s.ok() && cfd_->IsDropped()) { |
295 | 0 | s = Status::ColumnFamilyDropped("Column family dropped during compaction"); |
296 | 0 | } |
297 | 1.14k | if ((s.ok() || s.IsColumnFamilyDropped()) && |
298 | 1.14k | shutting_down_->load(std::memory_order_acquire)) { |
299 | 0 | s = Status::ShutdownInProgress("Database shutdown"); |
300 | 0 | } |
301 | | |
302 | 1.14k | if (s.ok()) { |
303 | 1.14k | s = MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT(); |
304 | 1.14k | } |
305 | | |
306 | 1.14k | if (!s.ok()) { |
307 | 0 | cfd_->imm()->RollbackMemtableFlush( |
308 | 0 | mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush); |
309 | 1.14k | } else if (write_manifest_) { |
310 | 1.14k | assert(!db_options_.atomic_flush); |
311 | 1.14k | if (!db_options_.atomic_flush && |
312 | 1.14k | flush_reason_ != FlushReason::kErrorRecovery && |
313 | 1.14k | flush_reason_ != FlushReason::kErrorRecoveryRetryFlush && |
314 | 1.14k | error_handler && !error_handler->GetBGError().ok() && |
315 | 0 | error_handler->IsBGWorkStopped()) { |
316 | 0 | cfd_->imm()->RollbackMemtableFlush( |
317 | 0 | mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush); |
318 | 0 | s = error_handler->GetBGError(); |
319 | 0 | if (skipped_since_bg_error) { |
320 | 0 | *skipped_since_bg_error = true; |
321 | 0 | } |
322 | 1.14k | } else { |
323 | 1.14k | TEST_SYNC_POINT("FlushJob::InstallResults"); |
324 | | // Replace immutable memtable with the generated Table |
325 | 1.14k | s = cfd_->imm()->TryInstallMemtableFlushResults( |
326 | 1.14k | cfd_, mems_, prep_tracker, versions_, db_mutex_, |
327 | 1.14k | meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, |
328 | 1.14k | log_buffer_, &committed_flush_jobs_info_, |
329 | 1.14k | !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted), |
330 | | but 'false' if mempurge successful: no new min log number |
331 | 1.14k | or new level 0 file path to write to manifest. */); |
332 | 1.14k | } |
333 | 1.14k | } |
334 | | |
335 | 1.14k | if (s.ok() && file_meta != nullptr) { |
336 | 1.14k | *file_meta = meta_; |
337 | 1.14k | } |
338 | 1.14k | RecordFlushIOStats(); |
339 | | |
340 | | // When measure_io_stats_ is true, the default 512 bytes is not enough. |
341 | 1.14k | auto stream = event_logger_->LogToBuffer(log_buffer_, 1024); |
342 | 1.14k | stream << "job" << job_context_->job_id << "event" << "flush_finished"; |
343 | 1.14k | stream << "output_compression" |
344 | 1.14k | << CompressionTypeToString(output_compression_); |
345 | 1.14k | stream << "lsm_state"; |
346 | 1.14k | stream.StartArray(); |
347 | 1.14k | auto vstorage = cfd_->current()->storage_info(); |
348 | 9.16k | for (int level = 0; level < vstorage->num_levels(); ++level) { |
349 | 8.02k | stream << vstorage->NumLevelFiles(level); |
350 | 8.02k | } |
351 | 1.14k | stream.EndArray(); |
352 | | |
353 | 1.14k | const auto& blob_files = vstorage->GetBlobFiles(); |
354 | 1.14k | if (!blob_files.empty()) { |
355 | 0 | assert(blob_files.front()); |
356 | 0 | stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber(); |
357 | |
|
358 | 0 | assert(blob_files.back()); |
359 | 0 | stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber(); |
360 | 0 | } |
361 | | |
362 | 1.14k | stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed(); |
363 | | |
364 | 1.14k | if (measure_io_stats_) { |
365 | 0 | if (prev_perf_level != PerfLevel::kEnableTime) { |
366 | 0 | SetPerfLevel(prev_perf_level); |
367 | 0 | } |
368 | 0 | stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos); |
369 | 0 | stream << "file_range_sync_nanos" |
370 | 0 | << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos); |
371 | 0 | stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos); |
372 | 0 | stream << "file_prepare_write_nanos" |
373 | 0 | << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos); |
374 | 0 | stream << "file_cpu_write_nanos" |
375 | 0 | << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos); |
376 | 0 | stream << "file_cpu_read_nanos" |
377 | 0 | << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos); |
378 | 0 | } |
379 | | |
380 | 1.14k | TEST_SYNC_POINT("FlushJob::End"); |
381 | 1.14k | return s; |
382 | 1.14k | } |
383 | | |
384 | 0 | void FlushJob::Cancel() { |
385 | 0 | db_mutex_->AssertHeld(); |
386 | 0 | assert(base_ != nullptr); |
387 | 0 | base_->Unref(); |
388 | 0 | } |
389 | | |
390 | 0 | Status FlushJob::MemPurge() { |
391 | 0 | Status s; |
392 | 0 | db_mutex_->AssertHeld(); |
393 | 0 | db_mutex_->Unlock(); |
394 | 0 | assert(!mems_.empty()); |
395 | | |
396 | | // Measure purging time. |
397 | 0 | const uint64_t start_micros = clock_->NowMicros(); |
398 | 0 | const uint64_t start_cpu_micros = clock_->CPUMicros(); |
399 | |
|
400 | 0 | MemTable* new_mem = nullptr; |
401 | | // For performance/log investigation purposes: |
402 | | // look at how much useful payload we harvest in the new_mem. |
403 | | // This value is then printed to the DB log. |
404 | 0 | double new_mem_capacity = 0.0; |
405 | | |
406 | | // Create two iterators, one for the memtable data (contains |
407 | | // info from puts + deletes), and one for the memtable |
408 | | // Range Tombstones (from DeleteRanges). |
409 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
410 | 0 | ReadOptions ro; |
411 | 0 | ro.total_order_seek = true; |
412 | 0 | Arena arena; |
413 | 0 | std::vector<InternalIterator*> memtables; |
414 | 0 | std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> |
415 | 0 | range_del_iters; |
416 | 0 | for (ReadOnlyMemTable* m : mems_) { |
417 | 0 | memtables.push_back(m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, |
418 | 0 | &arena, /*prefix_extractor=*/nullptr, |
419 | 0 | /*for_flush=*/true)); |
420 | 0 | auto* range_del_iter = m->NewRangeTombstoneIterator( |
421 | 0 | ro, kMaxSequenceNumber, true /* immutable_memtable */); |
422 | 0 | if (range_del_iter != nullptr) { |
423 | 0 | range_del_iters.emplace_back(range_del_iter); |
424 | 0 | } |
425 | 0 | } |
426 | |
|
427 | 0 | assert(!memtables.empty()); |
428 | 0 | SequenceNumber first_seqno = kMaxSequenceNumber; |
429 | 0 | SequenceNumber earliest_seqno = kMaxSequenceNumber; |
430 | | // Pick first and earliest seqno as min of all first_seqno |
431 | | // and earliest_seqno of the mempurged memtables. |
432 | 0 | for (const auto& mem : mems_) { |
433 | 0 | first_seqno = mem->GetFirstSequenceNumber() < first_seqno |
434 | 0 | ? mem->GetFirstSequenceNumber() |
435 | 0 | : first_seqno; |
436 | 0 | earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno |
437 | 0 | ? mem->GetEarliestSequenceNumber() |
438 | 0 | : earliest_seqno; |
439 | 0 | } |
440 | |
|
441 | 0 | ScopedArenaPtr<InternalIterator> iter( |
442 | 0 | NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(), |
443 | 0 | static_cast<int>(memtables.size()), &arena)); |
444 | |
|
445 | 0 | const auto& ioptions = cfd_->ioptions(); |
446 | | |
447 | | // Place iterator at the First (meaning most recent) key node. |
448 | 0 | iter->SeekToFirst(); |
449 | |
|
450 | 0 | const std::string* const full_history_ts_low = &(cfd_->GetFullHistoryTsLow()); |
451 | 0 | std::unique_ptr<CompactionRangeDelAggregator> range_del_agg( |
452 | 0 | new CompactionRangeDelAggregator(&(cfd_->internal_comparator()), |
453 | 0 | job_context_->snapshot_seqs, |
454 | 0 | full_history_ts_low)); |
455 | 0 | for (auto& rd_iter : range_del_iters) { |
456 | 0 | range_del_agg->AddTombstones(std::move(rd_iter)); |
457 | 0 | } |
458 | | |
459 | | // If there is valid data in the memtable, |
460 | | // or at least range tombstones, copy over the info |
461 | | // to the new memtable. |
462 | 0 | if (iter->Valid() || !range_del_agg->IsEmpty()) { |
463 | | // MaxSize is the size of a memtable. |
464 | 0 | size_t maxSize = mutable_cf_options_.write_buffer_size; |
465 | 0 | std::unique_ptr<CompactionFilter> compaction_filter; |
466 | 0 | if (ioptions.compaction_filter_factory != nullptr && |
467 | 0 | ioptions.compaction_filter_factory->ShouldFilterTableFileCreation( |
468 | 0 | TableFileCreationReason::kFlush)) { |
469 | 0 | CompactionFilter::Context ctx; |
470 | 0 | ctx.is_full_compaction = false; |
471 | 0 | ctx.is_manual_compaction = false; |
472 | 0 | ctx.column_family_id = cfd_->GetID(); |
473 | 0 | ctx.reason = TableFileCreationReason::kFlush; |
474 | 0 | compaction_filter = |
475 | 0 | ioptions.compaction_filter_factory->CreateCompactionFilter(ctx); |
476 | 0 | if (compaction_filter != nullptr && |
477 | 0 | !compaction_filter->IgnoreSnapshots()) { |
478 | 0 | s = Status::NotSupported( |
479 | 0 | "CompactionFilter::IgnoreSnapshots() = false is not supported " |
480 | 0 | "anymore."); |
481 | 0 | return s; |
482 | 0 | } |
483 | 0 | } |
484 | | |
485 | 0 | new_mem = new MemTable(cfd_->internal_comparator(), cfd_->ioptions(), |
486 | 0 | mutable_cf_options_, cfd_->write_buffer_mgr(), |
487 | 0 | earliest_seqno, cfd_->GetID()); |
488 | 0 | assert(new_mem != nullptr); |
489 | |
|
490 | 0 | Env* env = db_options_.env; |
491 | 0 | assert(env); |
492 | 0 | MergeHelper merge(env, (cfd_->internal_comparator()).user_comparator(), |
493 | 0 | (ioptions.merge_operator).get(), compaction_filter.get(), |
494 | 0 | ioptions.logger, |
495 | 0 | true /* internal key corruption is not ok */, |
496 | 0 | job_context_->GetLatestSnapshotSequence(), |
497 | 0 | job_context_->snapshot_checker); |
498 | 0 | assert(job_context_); |
499 | 0 | const std::atomic<bool> kManualCompactionCanceledFalse{false}; |
500 | 0 | CompactionIterator c_iter( |
501 | 0 | iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge, |
502 | 0 | kMaxSequenceNumber, &job_context_->snapshot_seqs, earliest_snapshot_, |
503 | 0 | job_context_->earliest_write_conflict_snapshot, |
504 | 0 | job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker, |
505 | 0 | env, ShouldReportDetailedTime(env, ioptions.stats), range_del_agg.get(), |
506 | 0 | nullptr, ioptions.allow_data_in_errors, |
507 | 0 | ioptions.enforce_single_del_contracts, |
508 | 0 | /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, |
509 | 0 | false /* must_count_input_entries */, |
510 | 0 | /*compaction=*/nullptr, compaction_filter.get(), |
511 | 0 | /*shutting_down=*/nullptr, ioptions.info_log, full_history_ts_low); |
512 | | |
513 | | // Set earliest sequence number in the new memtable |
514 | | // to be equal to the earliest sequence number of the |
515 | | // memtable being flushed (See later if there is a need |
516 | | // to update this number!). |
517 | 0 | new_mem->SetEarliestSequenceNumber(earliest_seqno); |
518 | | // Likewise for first seq number. |
519 | 0 | new_mem->SetFirstSequenceNumber(first_seqno); |
520 | 0 | SequenceNumber new_first_seqno = kMaxSequenceNumber; |
521 | |
|
522 | 0 | c_iter.SeekToFirst(); |
523 | | |
524 | | // Key transfer |
525 | 0 | for (; c_iter.Valid(); c_iter.Next()) { |
526 | 0 | const ParsedInternalKey ikey = c_iter.ikey(); |
527 | 0 | const Slice value = c_iter.value(); |
528 | 0 | new_first_seqno = |
529 | 0 | ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno; |
530 | | |
531 | | // Should we update "OldestKeyTime" ???? -> timestamp appear |
532 | | // to still be an "experimental" feature. |
533 | 0 | s = new_mem->Add( |
534 | 0 | ikey.sequence, ikey.type, ikey.user_key, value, |
535 | 0 | nullptr, // KV protection info set as nullptr since it |
536 | | // should only be useful for the first add to |
537 | | // the original memtable. |
538 | 0 | false, // : allow concurrent_memtable_writes_ |
539 | | // Not seen as necessary for now. |
540 | 0 | nullptr, // get_post_process_info(m) must be nullptr |
541 | | // when concurrent_memtable_writes is switched off. |
542 | 0 | nullptr); // hint, only used when concurrent_memtable_writes_ |
543 | | // is switched on. |
544 | 0 | if (!s.ok()) { |
545 | 0 | break; |
546 | 0 | } |
547 | | |
548 | | // If new_mem has size greater than maxSize, |
549 | | // then rollback to regular flush operation, |
550 | | // and destroy new_mem. |
551 | 0 | if (new_mem->ApproximateMemoryUsage() > maxSize) { |
552 | 0 | s = Status::Aborted("Mempurge filled more than one memtable."); |
553 | 0 | new_mem_capacity = 1.0; |
554 | 0 | break; |
555 | 0 | } |
556 | 0 | } |
557 | | |
558 | | // Check status and propagate |
559 | | // potential error status from c_iter |
560 | 0 | if (!s.ok()) { |
561 | 0 | c_iter.status().PermitUncheckedError(); |
562 | 0 | } else if (!c_iter.status().ok()) { |
563 | 0 | s = c_iter.status(); |
564 | 0 | } |
565 | | |
566 | | // Range tombstone transfer. |
567 | 0 | if (s.ok()) { |
568 | 0 | auto range_del_it = range_del_agg->NewIterator(); |
569 | 0 | for (range_del_it->SeekToFirst(); range_del_it->Valid(); |
570 | 0 | range_del_it->Next()) { |
571 | 0 | auto tombstone = range_del_it->Tombstone(); |
572 | 0 | new_first_seqno = |
573 | 0 | tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno; |
574 | 0 | s = new_mem->Add( |
575 | 0 | tombstone.seq_, // Sequence number |
576 | 0 | kTypeRangeDeletion, // KV type |
577 | 0 | tombstone.start_key_, // Key is start key. |
578 | 0 | tombstone.end_key_, // Value is end key. |
579 | 0 | nullptr, // KV protection info set as nullptr since it |
580 | | // should only be useful for the first add to |
581 | | // the original memtable. |
582 | 0 | false, // : allow concurrent_memtable_writes_ |
583 | | // Not seen as necessary for now. |
584 | 0 | nullptr, // get_post_process_info(m) must be nullptr |
585 | | // when concurrent_memtable_writes is switched off. |
586 | 0 | nullptr); // hint, only used when concurrent_memtable_writes_ |
587 | | // is switched on. |
588 | |
|
589 | 0 | if (!s.ok()) { |
590 | 0 | break; |
591 | 0 | } |
592 | | |
593 | | // If new_mem has size greater than maxSize, |
594 | | // then rollback to regular flush operation, |
595 | | // and destroy new_mem. |
596 | 0 | if (new_mem->ApproximateMemoryUsage() > maxSize) { |
597 | 0 | s = Status::Aborted(Slice("Mempurge filled more than one memtable.")); |
598 | 0 | new_mem_capacity = 1.0; |
599 | 0 | break; |
600 | 0 | } |
601 | 0 | } |
602 | 0 | } |
603 | | |
604 | | // If everything happened smoothly and new_mem contains valid data, |
605 | | // decide if it is flushed to storage or kept in the imm() |
606 | | // memtable list (memory). |
607 | 0 | if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) { |
608 | | // Rectify the first sequence number, which (unlike the earliest seq |
609 | | // number) needs to be present in the new memtable. |
610 | 0 | new_mem->SetFirstSequenceNumber(new_first_seqno); |
611 | | |
612 | | // The new_mem is added to the list of immutable memtables |
613 | | // only if it filled at less than 100% capacity and isn't flagged |
614 | | // as in need of being flushed. |
615 | 0 | if (new_mem->ApproximateMemoryUsage() < maxSize && |
616 | 0 | !(new_mem->ShouldFlushNow())) { |
617 | | // Construct fragmented memtable range tombstones without mutex |
618 | 0 | new_mem->ConstructFragmentedRangeTombstones(); |
619 | 0 | db_mutex_->Lock(); |
620 | | // Take the newest id, so that memtables in MemtableList don't have |
621 | | // out-of-order memtable ids. |
622 | 0 | uint64_t new_mem_id = mems_.back()->GetID(); |
623 | |
|
624 | 0 | new_mem->SetID(new_mem_id); |
625 | | // Take the latest memtable's next log number. |
626 | 0 | new_mem->SetNextLogNumber(mems_.back()->GetNextLogNumber()); |
627 | | |
628 | | // This addition will not trigger another flush, because |
629 | | // we do not call EnqueuePendingFlush(). |
630 | 0 | cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free); |
631 | 0 | new_mem->Ref(); |
632 | | // Piggyback FlushJobInfo on the first flushed memtable. |
633 | 0 | db_mutex_->AssertHeld(); |
634 | 0 | meta_.fd.file_size = 0; |
635 | 0 | mems_[0]->SetFlushJobInfo(GetFlushJobInfo()); |
636 | 0 | db_mutex_->Unlock(); |
637 | 0 | } else { |
638 | 0 | s = Status::Aborted(Slice("Mempurge filled more than one memtable.")); |
639 | 0 | new_mem_capacity = 1.0; |
640 | 0 | if (new_mem) { |
641 | 0 | job_context_->memtables_to_free.push_back(new_mem); |
642 | 0 | } |
643 | 0 | } |
644 | 0 | } else { |
645 | | // In this case, the newly allocated new_mem is empty. |
646 | 0 | assert(new_mem != nullptr); |
647 | 0 | job_context_->memtables_to_free.push_back(new_mem); |
648 | 0 | } |
649 | 0 | } |
650 | | |
651 | | // Reacquire the mutex for WriteLevel0 function. |
652 | 0 | db_mutex_->Lock(); |
653 | | |
654 | | // If mempurge successful, don't write input tables to level0, |
655 | | // but write any full output table to level0. |
656 | 0 | if (s.ok()) { |
657 | 0 | TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful"); |
658 | 0 | } else { |
659 | 0 | TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful"); |
660 | 0 | } |
661 | 0 | const uint64_t micros = clock_->NowMicros() - start_micros; |
662 | 0 | const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros; |
663 | 0 | ROCKS_LOG_INFO(db_options_.info_log, |
664 | 0 | "[%s] [JOB %d] Mempurge lasted %" PRIu64 |
665 | 0 | " microseconds, and %" PRIu64 |
666 | 0 | " cpu " |
667 | 0 | "microseconds. Status is %s ok. Perc capacity: %f\n", |
668 | 0 | cfd_->GetName().c_str(), job_context_->job_id, micros, |
669 | 0 | cpu_micros, s.ok() ? "" : "not", new_mem_capacity); |
670 | |
|
671 | 0 | return s; |
672 | 0 | } |
673 | | |
674 | 0 | bool FlushJob::MemPurgeDecider(double threshold) { |
675 | | // Never trigger mempurge if threshold is not a strictly positive value. |
676 | 0 | if (!(threshold > 0.0)) { |
677 | 0 | return false; |
678 | 0 | } |
679 | 0 | if (threshold > (1.0 * mems_.size())) { |
680 | 0 | return true; |
681 | 0 | } |
682 | | // Payload and useful_payload (in bytes). |
683 | | // The useful payload ratio of a given MemTable |
684 | | // is estimated to be useful_payload/payload. |
685 | 0 | uint64_t payload = 0, useful_payload = 0, entry_size = 0; |
686 | | |
687 | | // Local variables used repetitively inside the for-loop |
688 | | // when iterating over the sampled entries. |
689 | 0 | Slice key_slice, value_slice; |
690 | 0 | ParsedInternalKey res; |
691 | 0 | SnapshotImpl min_snapshot; |
692 | 0 | std::string vget; |
693 | 0 | Status mget_s, parse_s; |
694 | 0 | MergeContext merge_context; |
695 | 0 | SequenceNumber max_covering_tombstone_seq = 0, sqno = 0, |
696 | 0 | min_seqno_snapshot = 0; |
697 | 0 | bool get_res, can_be_useful_payload, not_in_next_mems; |
698 | | |
699 | | // If estimated_useful_payload is > threshold, |
700 | | // then flush to storage, else MemPurge. |
701 | 0 | double estimated_useful_payload = 0.0; |
702 | | // Cochran formula for determining sample size. |
703 | | // 95% confidence interval, 7% precision. |
704 | | // n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0 |
705 | 0 | double n0 = 196.0; |
706 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
707 | 0 | ReadOptions ro; |
708 | 0 | ro.total_order_seek = true; |
709 | | |
710 | | // Iterate over each memtable of the set. |
711 | 0 | for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_); |
712 | 0 | ++mem_iter) { |
713 | 0 | ReadOnlyMemTable* mt = *mem_iter; |
714 | | |
715 | | // Else sample from the table. |
716 | 0 | uint64_t nentries = mt->NumEntries(); |
717 | | // Corrected Cochran formula for small populations |
718 | | // (converges to n0 for large populations). |
719 | 0 | uint64_t target_sample_size = |
720 | 0 | static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries)))); |
721 | 0 | std::unordered_set<const char*> sentries = {}; |
722 | | // Populate sample entries set. |
723 | 0 | mt->UniqueRandomSample(target_sample_size, &sentries); |
724 | | |
725 | | // Estimate the garbage ratio by comparing if |
726 | | // each sample corresponds to a valid entry. |
727 | 0 | for (const char* ss : sentries) { |
728 | 0 | key_slice = GetLengthPrefixedSlice(ss); |
729 | 0 | parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/); |
730 | 0 | if (!parse_s.ok()) { |
731 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
732 | 0 | "Memtable Decider: ParseInternalKey did not parse " |
733 | 0 | "key_slice %s successfully.", |
734 | 0 | key_slice.data()); |
735 | 0 | } |
736 | | |
737 | | // Size of the entry is "key size (+ value size if KV entry)" |
738 | 0 | entry_size = key_slice.size(); |
739 | 0 | if (res.type == kTypeValue) { |
740 | 0 | value_slice = |
741 | 0 | GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); |
742 | 0 | entry_size += value_slice.size(); |
743 | 0 | } |
744 | | |
745 | | // Count entry bytes as payload. |
746 | 0 | payload += entry_size; |
747 | |
|
748 | 0 | LookupKey lkey(res.user_key, kMaxSequenceNumber); |
749 | | |
750 | | // Paranoia: zero out these values just in case. |
751 | 0 | max_covering_tombstone_seq = 0; |
752 | 0 | sqno = 0; |
753 | | |
754 | | // Pick the oldest existing snapshot that is more recent |
755 | | // than the sequence number of the sampled entry. |
756 | 0 | min_seqno_snapshot = kMaxSequenceNumber; |
757 | 0 | for (SequenceNumber seq_num : job_context_->snapshot_seqs) { |
758 | 0 | if (seq_num > res.sequence && seq_num < min_seqno_snapshot) { |
759 | 0 | min_seqno_snapshot = seq_num; |
760 | 0 | } |
761 | 0 | } |
762 | 0 | min_snapshot.number_ = min_seqno_snapshot; |
763 | 0 | ro.snapshot = |
764 | 0 | min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr; |
765 | | |
766 | | // Estimate if the sample entry is valid or not. |
767 | 0 | get_res = mt->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr, |
768 | 0 | &mget_s, &merge_context, &max_covering_tombstone_seq, |
769 | 0 | &sqno, ro, true /* immutable_memtable */); |
770 | 0 | if (!get_res) { |
771 | 0 | ROCKS_LOG_WARN( |
772 | 0 | db_options_.info_log, |
773 | 0 | "Memtable Get returned false when Get(sampled entry). " |
774 | 0 | "Yet each sample entry should exist somewhere in the memtable, " |
775 | 0 | "unrelated to whether it has been deleted or not."); |
776 | 0 | } |
777 | | |
778 | | // TODO(bjlemaire): evaluate typeMerge. |
779 | | // This is where the sampled entry is estimated to be |
780 | | // garbage or not. Note that this is a garbage *estimation* |
781 | | // because we do not include certain items such as |
782 | | // CompactionFitlers triggered at flush, or if the same delete |
783 | | // has been inserted twice or more in the memtable. |
784 | | |
785 | | // Evaluate if the entry can be useful payload |
786 | | // Situation #1: entry is a KV entry, was found in the memtable mt |
787 | | // and the sequence numbers match. |
788 | 0 | can_be_useful_payload = (res.type == kTypeValue) && get_res && |
789 | 0 | mget_s.ok() && (sqno == res.sequence); |
790 | | |
791 | | // Situation #2: entry is a delete entry, was found in the memtable mt |
792 | | // (because gres==true) and no valid KV entry is found. |
793 | | // (note: duplicate delete entries are also taken into |
794 | | // account here, because the sequence number 'sqno' |
795 | | // in memtable->Get(&sqno) operation is set to be equal |
796 | | // to the most recent delete entry as well). |
797 | 0 | can_be_useful_payload |= |
798 | 0 | ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) && |
799 | 0 | mget_s.IsNotFound() && get_res && (sqno == res.sequence); |
800 | | |
801 | | // If there is a chance that the entry is useful payload |
802 | | // Verify that the entry does not appear in the following memtables |
803 | | // (memtables with greater memtable ID/larger sequence numbers). |
804 | 0 | if (can_be_useful_payload) { |
805 | 0 | not_in_next_mems = true; |
806 | 0 | for (auto next_mem_iter = mem_iter + 1; |
807 | 0 | next_mem_iter != std::end(mems_); next_mem_iter++) { |
808 | 0 | if ((*next_mem_iter) |
809 | 0 | ->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr, |
810 | 0 | &mget_s, &merge_context, &max_covering_tombstone_seq, |
811 | 0 | &sqno, ro, true /* immutable_memtable */)) { |
812 | 0 | not_in_next_mems = false; |
813 | 0 | break; |
814 | 0 | } |
815 | 0 | } |
816 | 0 | if (not_in_next_mems) { |
817 | 0 | useful_payload += entry_size; |
818 | 0 | } |
819 | 0 | } |
820 | 0 | } |
821 | 0 | if (payload > 0) { |
822 | | // We use the estimated useful payload ratio to |
823 | | // evaluate how many of the memtable bytes are useful bytes. |
824 | 0 | estimated_useful_payload += |
825 | 0 | (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload); |
826 | |
|
827 | 0 | ROCKS_LOG_INFO(db_options_.info_log, |
828 | 0 | "Mempurge sampling [CF %s] - found garbage ratio from " |
829 | 0 | "sampling: %f. Threshold is %f\n", |
830 | 0 | cfd_->GetName().c_str(), |
831 | 0 | (payload - useful_payload) * 1.0 / payload, threshold); |
832 | 0 | } else { |
833 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
834 | 0 | "Mempurge sampling: null payload measured, and collected " |
835 | 0 | "sample size is %zu\n.", |
836 | 0 | sentries.size()); |
837 | 0 | } |
838 | 0 | } |
839 | | // We convert the total number of useful payload bytes |
840 | | // into the proportion of memtable necessary to store all these bytes. |
841 | | // We compare this proportion with the threshold value. |
842 | 0 | return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) < |
843 | 0 | threshold); |
844 | 0 | } |
845 | | |
846 | 1.14k | Status FlushJob::WriteLevel0Table() { |
847 | 1.14k | AutoThreadOperationStageUpdater stage_updater( |
848 | 1.14k | ThreadStatus::STAGE_FLUSH_WRITE_L0); |
849 | 1.14k | db_mutex_->AssertHeld(); |
850 | 1.14k | const uint64_t start_micros = clock_->NowMicros(); |
851 | 1.14k | const uint64_t start_cpu_micros = clock_->CPUMicros(); |
852 | 1.14k | Status s; |
853 | | |
854 | 1.14k | meta_.temperature = mutable_cf_options_.default_write_temperature; |
855 | 1.14k | file_options_.temperature = meta_.temperature; |
856 | | |
857 | 1.14k | const auto* ucmp = cfd_->internal_comparator().user_comparator(); |
858 | 1.14k | assert(ucmp); |
859 | 1.14k | const size_t ts_sz = ucmp->timestamp_size(); |
860 | 1.14k | const bool logical_strip_timestamp = |
861 | 1.14k | ts_sz > 0 && !cfd_->ioptions().persist_user_defined_timestamps; |
862 | | |
863 | 1.14k | std::vector<BlobFileAddition> blob_file_additions; |
864 | | // Note that here we treat flush as level 0 compaction in internal stats |
865 | 1.14k | InternalStats::CompactionStats flush_stats(CompactionReason::kFlush, |
866 | 1.14k | 1 /* count**/); |
867 | 1.14k | { |
868 | 1.14k | auto write_hint = base_->storage_info()->CalculateSSTWriteHint( |
869 | 1.14k | /*level=*/0, db_options_.calculate_sst_write_lifetime_hint_set); |
870 | 1.14k | Env::IOPriority io_priority = GetRateLimiterPriority(); |
871 | 1.14k | db_mutex_->Unlock(); |
872 | 1.14k | if (log_buffer_) { |
873 | 1.14k | log_buffer_->FlushBufferToLog(); |
874 | 1.14k | } |
875 | | // memtables and range_del_iters store internal iterators over each data |
876 | | // memtable and its associated range deletion memtable, respectively, at |
877 | | // corresponding indexes. |
878 | 1.14k | std::vector<InternalIterator*> memtables; |
879 | 1.14k | std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> |
880 | 1.14k | range_del_iters; |
881 | 1.14k | ReadOptions ro; |
882 | 1.14k | ro.total_order_seek = true; |
883 | 1.14k | ro.io_activity = Env::IOActivity::kFlush; |
884 | 1.14k | Arena arena; |
885 | 1.14k | uint64_t total_num_input_entries = 0, total_num_deletes = 0; |
886 | 1.14k | uint64_t total_data_size = 0; |
887 | 1.14k | size_t total_memory_usage = 0; |
888 | 1.14k | uint64_t total_num_range_deletes = 0; |
889 | | // Used for testing: |
890 | 1.14k | uint64_t mems_size = mems_.size(); |
891 | 1.14k | (void)mems_size; // avoids unused variable error when |
892 | | // TEST_SYNC_POINT_CALLBACK not used. |
893 | 1.14k | TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables", |
894 | 1.14k | &mems_size); |
895 | 1.14k | assert(job_context_); |
896 | 1.14k | for (ReadOnlyMemTable* m : mems_) { |
897 | 1.14k | ROCKS_LOG_INFO(db_options_.info_log, |
898 | 1.14k | "[%s] [JOB %d] Flushing memtable id %" PRIu64 |
899 | 1.14k | " with next log file: %" PRIu64 ", marked_for_flush: %d\n", |
900 | 1.14k | cfd_->GetName().c_str(), job_context_->job_id, m->GetID(), |
901 | 1.14k | m->GetNextLogNumber(), m->IsMarkedForFlush()); |
902 | 1.14k | if (logical_strip_timestamp) { |
903 | 0 | memtables.push_back(m->NewTimestampStrippingIterator( |
904 | 0 | ro, /*seqno_to_time_mapping=*/nullptr, &arena, |
905 | 0 | /*prefix_extractor=*/nullptr, ts_sz)); |
906 | 1.14k | } else { |
907 | 1.14k | memtables.push_back( |
908 | 1.14k | m->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena, |
909 | 1.14k | /*prefix_extractor=*/nullptr, /*for_flush=*/true)); |
910 | 1.14k | } |
911 | 1.14k | auto* range_del_iter = |
912 | 1.14k | logical_strip_timestamp |
913 | 1.14k | ? m->NewTimestampStrippingRangeTombstoneIterator( |
914 | 0 | ro, kMaxSequenceNumber, ts_sz) |
915 | 1.14k | : m->NewRangeTombstoneIterator(ro, kMaxSequenceNumber, |
916 | 1.14k | true /* immutable_memtable */); |
917 | 1.14k | if (range_del_iter != nullptr) { |
918 | 0 | range_del_iters.emplace_back(range_del_iter); |
919 | 0 | } |
920 | 1.14k | total_num_input_entries += m->NumEntries(); |
921 | 1.14k | total_num_deletes += m->NumDeletion(); |
922 | 1.14k | total_data_size += m->GetDataSize(); |
923 | 1.14k | total_memory_usage += m->ApproximateMemoryUsage(); |
924 | 1.14k | total_num_range_deletes += m->NumRangeDeletion(); |
925 | 1.14k | } |
926 | | |
927 | | // TODO(cbi): when memtable is flushed due to number of range deletions |
928 | | // hitting limit memtable_max_range_deletions, flush_reason_ is still |
929 | | // "Write Buffer Full", should make update flush_reason_ accordingly. |
930 | 1.14k | event_logger_->Log() << "job" << job_context_->job_id << "event" |
931 | 1.14k | << "flush_started" << "num_memtables" << mems_.size() |
932 | 1.14k | << "total_num_input_entries" << total_num_input_entries |
933 | 1.14k | << "num_deletes" << total_num_deletes |
934 | 1.14k | << "total_data_size" << total_data_size |
935 | 1.14k | << "memory_usage" << total_memory_usage |
936 | 1.14k | << "num_range_deletes" << total_num_range_deletes |
937 | 1.14k | << "flush_reason" |
938 | 1.14k | << GetFlushReasonString(flush_reason_); |
939 | | |
940 | 1.14k | { |
941 | 1.14k | ScopedArenaPtr<InternalIterator> iter( |
942 | 1.14k | NewMergingIterator(&cfd_->internal_comparator(), memtables.data(), |
943 | 1.14k | static_cast<int>(memtables.size()), &arena)); |
944 | 1.14k | ROCKS_LOG_INFO(db_options_.info_log, |
945 | 1.14k | "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started", |
946 | 1.14k | cfd_->GetName().c_str(), job_context_->job_id, |
947 | 1.14k | meta_.fd.GetNumber()); |
948 | | |
949 | 1.14k | TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", |
950 | 1.14k | &output_compression_); |
951 | 1.14k | int64_t _current_time = 0; |
952 | 1.14k | auto status = clock_->GetCurrentTime(&_current_time); |
953 | | // Safe to proceed even if GetCurrentTime fails. So, log and proceed. |
954 | 1.14k | if (!status.ok()) { |
955 | 0 | ROCKS_LOG_WARN( |
956 | 0 | db_options_.info_log, |
957 | 0 | "Failed to get current time to populate creation_time property. " |
958 | 0 | "Status: %s", |
959 | 0 | status.ToString().c_str()); |
960 | 0 | } |
961 | 1.14k | const uint64_t current_time = static_cast<uint64_t>(_current_time); |
962 | | |
963 | 1.14k | uint64_t oldest_key_time = mems_.front()->ApproximateOldestKeyTime(); |
964 | | |
965 | | // It's not clear whether oldest_key_time is always available. In case |
966 | | // it is not available, use current_time. |
967 | 1.14k | uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time); |
968 | | |
969 | 1.14k | TEST_SYNC_POINT_CALLBACK( |
970 | 1.14k | "FlushJob::WriteLevel0Table:oldest_ancester_time", |
971 | 1.14k | &oldest_ancester_time); |
972 | 1.14k | meta_.oldest_ancester_time = oldest_ancester_time; |
973 | 1.14k | meta_.file_creation_time = current_time; |
974 | | |
975 | 1.14k | uint64_t memtable_payload_bytes = 0; |
976 | 1.14k | uint64_t memtable_garbage_bytes = 0; |
977 | 1.14k | IOStatus io_s; |
978 | | |
979 | 1.14k | const std::string* const full_history_ts_low = |
980 | 1.14k | (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_; |
981 | 1.14k | ReadOptions read_options(Env::IOActivity::kFlush); |
982 | 1.14k | read_options.rate_limiter_priority = io_priority; |
983 | 1.14k | const WriteOptions write_options(io_priority, Env::IOActivity::kFlush); |
984 | 1.14k | TableBuilderOptions tboptions( |
985 | 1.14k | cfd_->ioptions(), mutable_cf_options_, read_options, write_options, |
986 | 1.14k | cfd_->internal_comparator(), cfd_->internal_tbl_prop_coll_factories(), |
987 | 1.14k | output_compression_, mutable_cf_options_.compression_opts, |
988 | 1.14k | cfd_->GetID(), cfd_->GetName(), 0 /* level */, |
989 | 1.14k | current_time /* newest_key_time */, false /* is_bottommost */, |
990 | 1.14k | TableFileCreationReason::kFlush, oldest_key_time, current_time, |
991 | 1.14k | db_id_, db_session_id_, 0 /* target_file_size */, |
992 | 1.14k | meta_.fd.GetNumber(), |
993 | 1.14k | preclude_last_level_min_seqno_ == kMaxSequenceNumber |
994 | 1.14k | ? preclude_last_level_min_seqno_ |
995 | 1.14k | : std::min(earliest_snapshot_, preclude_last_level_min_seqno_)); |
996 | 1.14k | s = BuildTable( |
997 | 1.14k | dbname_, versions_, db_options_, tboptions, file_options_, |
998 | 1.14k | cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_, |
999 | 1.14k | &blob_file_additions, job_context_->snapshot_seqs, earliest_snapshot_, |
1000 | 1.14k | job_context_->earliest_write_conflict_snapshot, |
1001 | 1.14k | job_context_->GetJobSnapshotSequence(), |
1002 | 1.14k | job_context_->snapshot_checker, |
1003 | 1.14k | mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), |
1004 | 1.14k | &io_s, io_tracer_, BlobFileCreationReason::kFlush, |
1005 | 1.14k | seqno_to_time_mapping_.get(), event_logger_, job_context_->job_id, |
1006 | 1.14k | &table_properties_, write_hint, full_history_ts_low, blob_callback_, |
1007 | 1.14k | base_, &memtable_payload_bytes, &memtable_garbage_bytes, |
1008 | 1.14k | &flush_stats); |
1009 | 1.14k | TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:s", &s); |
1010 | | // TODO: Cleanup io_status in BuildTable and table builders |
1011 | 1.14k | assert(!s.ok() || io_s.ok()); |
1012 | 1.14k | io_s.PermitUncheckedError(); |
1013 | 1.14k | if (s.ok() && total_num_input_entries != flush_stats.num_input_records) { |
1014 | 0 | std::string msg = "Expected " + |
1015 | 0 | std::to_string(total_num_input_entries) + |
1016 | 0 | " entries in memtables, but read " + |
1017 | 0 | std::to_string(flush_stats.num_input_records); |
1018 | 0 | ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s", |
1019 | 0 | cfd_->GetName().c_str(), job_context_->job_id, |
1020 | 0 | msg.c_str()); |
1021 | 0 | if (db_options_.flush_verify_memtable_count) { |
1022 | 0 | s = Status::Corruption(msg); |
1023 | 0 | } |
1024 | 0 | } |
1025 | | |
1026 | | // Only verify on table with format collects table properties |
1027 | 1.14k | if (s.ok() && |
1028 | 1.14k | (mutable_cf_options_.table_factory->IsInstanceOf( |
1029 | 1.14k | TableFactory::kBlockBasedTableName()) || |
1030 | 0 | mutable_cf_options_.table_factory->IsInstanceOf( |
1031 | 0 | TableFactory::kPlainTableName())) && |
1032 | 1.14k | flush_stats.num_output_records != table_properties_.num_entries) { |
1033 | 0 | std::string msg = |
1034 | 0 | "Number of keys in flush output SST files does not match " |
1035 | 0 | "number of keys added to the table. Expected " + |
1036 | 0 | std::to_string(flush_stats.num_output_records) + " but there are " + |
1037 | 0 | std::to_string(table_properties_.num_entries) + |
1038 | 0 | " in output SST files"; |
1039 | 0 | ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s", |
1040 | 0 | cfd_->GetName().c_str(), job_context_->job_id, |
1041 | 0 | msg.c_str()); |
1042 | 0 | if (db_options_.flush_verify_memtable_count) { |
1043 | 0 | s = Status::Corruption(msg); |
1044 | 0 | } |
1045 | 0 | } |
1046 | 1.14k | if (tboptions.reason == TableFileCreationReason::kFlush) { |
1047 | 1.14k | TEST_SYNC_POINT("DBImpl::FlushJob:Flush"); |
1048 | 1.14k | RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH, |
1049 | 1.14k | memtable_payload_bytes); |
1050 | 1.14k | RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH, |
1051 | 1.14k | memtable_garbage_bytes); |
1052 | 1.14k | } |
1053 | 1.14k | LogFlush(db_options_.info_log); |
1054 | 1.14k | } |
1055 | 1.14k | ROCKS_LOG_BUFFER(log_buffer_, |
1056 | 1.14k | "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64 |
1057 | 1.14k | " bytes %s" |
1058 | 1.14k | " %s" |
1059 | 1.14k | " %s", |
1060 | 1.14k | cfd_->GetName().c_str(), job_context_->job_id, |
1061 | 1.14k | meta_.fd.GetNumber(), meta_.fd.GetFileSize(), |
1062 | 1.14k | s.ToString().c_str(), |
1063 | 1.14k | s.ok() && meta_.fd.GetFileSize() == 0 |
1064 | 1.14k | ? "It's an empty SST file from a successful flush so " |
1065 | 1.14k | "won't be kept in the DB" |
1066 | 1.14k | : "", |
1067 | 1.14k | meta_.marked_for_compaction ? " (needs compaction)" : ""); |
1068 | | |
1069 | 1.14k | if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { |
1070 | 1.14k | s = output_file_directory_->FsyncWithDirOptions( |
1071 | 1.14k | IOOptions(), nullptr, |
1072 | 1.14k | DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); |
1073 | 1.14k | } |
1074 | 1.14k | TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_); |
1075 | 1.14k | db_mutex_->Lock(); |
1076 | 1.14k | } |
1077 | 1.14k | base_->Unref(); |
1078 | | |
1079 | | // Note that if file_size is zero, the file has been deleted and |
1080 | | // should not be added to the manifest. |
1081 | 1.14k | const bool has_output = meta_.fd.GetFileSize() > 0; |
1082 | | |
1083 | 1.14k | if (s.ok() && has_output) { |
1084 | 1.14k | TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated"); |
1085 | | // if we have more than 1 background thread, then we cannot |
1086 | | // insert files directly into higher levels because some other |
1087 | | // threads could be concurrently producing compacted files for |
1088 | | // that key range. |
1089 | | // Add file to L0 |
1090 | 1.14k | edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(), |
1091 | 1.14k | meta_.fd.GetFileSize(), meta_.smallest, meta_.largest, |
1092 | 1.14k | meta_.fd.smallest_seqno, meta_.fd.largest_seqno, |
1093 | 1.14k | meta_.marked_for_compaction, meta_.temperature, |
1094 | 1.14k | meta_.oldest_blob_file_number, meta_.oldest_ancester_time, |
1095 | 1.14k | meta_.file_creation_time, meta_.epoch_number, |
1096 | 1.14k | meta_.file_checksum, meta_.file_checksum_func_name, |
1097 | 1.14k | meta_.unique_id, meta_.compensated_range_deletion_size, |
1098 | 1.14k | meta_.tail_size, meta_.user_defined_timestamps_persisted, |
1099 | 1.14k | meta_.min_timestamp, meta_.max_timestamp); |
1100 | 1.14k | edit_->SetBlobFileAdditions(std::move(blob_file_additions)); |
1101 | 1.14k | } |
1102 | | // Piggyback FlushJobInfo on the first first flushed memtable. |
1103 | 1.14k | mems_[0]->SetFlushJobInfo(GetFlushJobInfo()); |
1104 | | |
1105 | 1.14k | const uint64_t micros = clock_->NowMicros() - start_micros; |
1106 | 1.14k | const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros; |
1107 | 1.14k | flush_stats.micros = micros; |
1108 | 1.14k | flush_stats.cpu_micros += cpu_micros; |
1109 | | |
1110 | 1.14k | ROCKS_LOG_INFO(db_options_.info_log, |
1111 | 1.14k | "[%s] [JOB %d] Flush lasted %" PRIu64 |
1112 | 1.14k | " microseconds, and %" PRIu64 " cpu microseconds.\n", |
1113 | 1.14k | cfd_->GetName().c_str(), job_context_->job_id, micros, |
1114 | 1.14k | flush_stats.cpu_micros); |
1115 | | |
1116 | 1.14k | if (has_output) { |
1117 | 1.14k | flush_stats.bytes_written = meta_.fd.GetFileSize(); |
1118 | 1.14k | flush_stats.num_output_files = 1; |
1119 | 1.14k | } |
1120 | | |
1121 | 1.14k | const auto& blobs = edit_->GetBlobFileAdditions(); |
1122 | 1.14k | for (const auto& blob : blobs) { |
1123 | 0 | flush_stats.bytes_written_blob += blob.GetTotalBlobBytes(); |
1124 | 0 | } |
1125 | | |
1126 | 1.14k | flush_stats.num_output_files_blob = static_cast<int>(blobs.size()); |
1127 | | |
1128 | 1.14k | RecordTimeToHistogram(stats_, FLUSH_TIME, flush_stats.micros); |
1129 | 1.14k | cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, |
1130 | 1.14k | flush_stats); |
1131 | 1.14k | cfd_->internal_stats()->AddCFStats( |
1132 | 1.14k | InternalStats::BYTES_FLUSHED, |
1133 | 1.14k | flush_stats.bytes_written + flush_stats.bytes_written_blob); |
1134 | 1.14k | RecordFlushIOStats(); |
1135 | | |
1136 | 1.14k | return s; |
1137 | 1.14k | } |
1138 | | |
1139 | 1.14k | Env::IOPriority FlushJob::GetRateLimiterPriority() { |
1140 | 1.14k | if (versions_ && versions_->GetColumnFamilySet() && |
1141 | 1.14k | versions_->GetColumnFamilySet()->write_controller()) { |
1142 | 1.14k | WriteController* write_controller = |
1143 | 1.14k | versions_->GetColumnFamilySet()->write_controller(); |
1144 | 1.14k | if (write_controller->IsStopped() || write_controller->NeedsDelay()) { |
1145 | 0 | return Env::IO_USER; |
1146 | 0 | } |
1147 | 1.14k | } |
1148 | | |
1149 | 1.14k | return Env::IO_HIGH; |
1150 | 1.14k | } |
1151 | | |
1152 | 1.14k | std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const { |
1153 | 1.14k | db_mutex_->AssertHeld(); |
1154 | 1.14k | std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{}); |
1155 | 1.14k | info->cf_id = cfd_->GetID(); |
1156 | 1.14k | info->cf_name = cfd_->GetName(); |
1157 | | |
1158 | 1.14k | const uint64_t file_number = meta_.fd.GetNumber(); |
1159 | 1.14k | info->file_path = |
1160 | 1.14k | MakeTableFileName(cfd_->ioptions().cf_paths[0].path, file_number); |
1161 | 1.14k | info->file_number = file_number; |
1162 | 1.14k | info->oldest_blob_file_number = meta_.oldest_blob_file_number; |
1163 | 1.14k | info->thread_id = db_options_.env->GetThreadID(); |
1164 | 1.14k | info->job_id = job_context_->job_id; |
1165 | 1.14k | info->smallest_seqno = meta_.fd.smallest_seqno; |
1166 | 1.14k | info->largest_seqno = meta_.fd.largest_seqno; |
1167 | 1.14k | info->table_properties = table_properties_; |
1168 | 1.14k | info->flush_reason = flush_reason_; |
1169 | 1.14k | info->blob_compression_type = mutable_cf_options_.blob_compression_type; |
1170 | | |
1171 | | // Update BlobFilesInfo. |
1172 | 1.14k | for (const auto& blob_file : edit_->GetBlobFileAdditions()) { |
1173 | 0 | BlobFileAdditionInfo blob_file_addition_info( |
1174 | 0 | BlobFileName(cfd_->ioptions().cf_paths.front().path, |
1175 | 0 | blob_file.GetBlobFileNumber()) /*blob_file_path*/, |
1176 | 0 | blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(), |
1177 | 0 | blob_file.GetTotalBlobBytes()); |
1178 | 0 | info->blob_file_addition_infos.emplace_back( |
1179 | 0 | std::move(blob_file_addition_info)); |
1180 | 0 | } |
1181 | 1.14k | return info; |
1182 | 1.14k | } |
1183 | | |
1184 | 1.14k | void FlushJob::GetEffectiveCutoffUDTForPickedMemTables() { |
1185 | 1.14k | db_mutex_->AssertHeld(); |
1186 | 1.14k | assert(pick_memtable_called); |
1187 | 1.14k | const auto* ucmp = cfd_->internal_comparator().user_comparator(); |
1188 | 1.14k | assert(ucmp); |
1189 | 1.14k | const size_t ts_sz = ucmp->timestamp_size(); |
1190 | 1.14k | if (db_options_.atomic_flush || ts_sz == 0 || |
1191 | 1.14k | cfd_->ioptions().persist_user_defined_timestamps) { |
1192 | 1.14k | return; |
1193 | 1.14k | } |
1194 | | // Find the newest user-defined timestamps from all the flushed memtables. |
1195 | 0 | for (const ReadOnlyMemTable* m : mems_) { |
1196 | 0 | Slice table_newest_udt = m->GetNewestUDT(); |
1197 | | // Empty memtables can be legitimately created and flushed, for example |
1198 | | // by error recovery flush attempts. |
1199 | 0 | if (table_newest_udt.empty()) { |
1200 | 0 | continue; |
1201 | 0 | } |
1202 | 0 | if (cutoff_udt_.empty() || |
1203 | 0 | ucmp->CompareTimestamp(table_newest_udt, cutoff_udt_) > 0) { |
1204 | 0 | if (!cutoff_udt_.empty()) { |
1205 | 0 | assert(table_newest_udt.size() == cutoff_udt_.size()); |
1206 | 0 | } |
1207 | 0 | cutoff_udt_.assign(table_newest_udt.data(), table_newest_udt.size()); |
1208 | 0 | } |
1209 | 0 | } |
1210 | 0 | } |
1211 | | |
1212 | 1.14k | void FlushJob::GetPrecludeLastLevelMinSeqno() { |
1213 | 1.14k | if (mutable_cf_options_.preclude_last_level_data_seconds == 0) { |
1214 | 1.14k | return; |
1215 | 1.14k | } |
1216 | | // SuperVersion should guarantee this |
1217 | 1.14k | assert(seqno_to_time_mapping_); |
1218 | 0 | assert(!seqno_to_time_mapping_->Empty()); |
1219 | 0 | int64_t current_time = 0; |
1220 | 0 | Status s = db_options_.clock->GetCurrentTime(¤t_time); |
1221 | 0 | if (!s.ok()) { |
1222 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
1223 | 0 | "Failed to get current time in Flush: Status: %s", |
1224 | 0 | s.ToString().c_str()); |
1225 | 0 | } else { |
1226 | 0 | SequenceNumber preserve_time_min_seqno; |
1227 | 0 | seqno_to_time_mapping_->GetCurrentTieringCutoffSeqnos( |
1228 | 0 | static_cast<uint64_t>(current_time), |
1229 | 0 | mutable_cf_options_.preserve_internal_time_seconds, |
1230 | 0 | mutable_cf_options_.preclude_last_level_data_seconds, |
1231 | 0 | &preserve_time_min_seqno, &preclude_last_level_min_seqno_); |
1232 | 0 | } |
1233 | 0 | } |
1234 | | |
1235 | 1.14k | Status FlushJob::MaybeIncreaseFullHistoryTsLowToAboveCutoffUDT() { |
1236 | 1.14k | db_mutex_->AssertHeld(); |
1237 | 1.14k | const auto* ucmp = cfd_->user_comparator(); |
1238 | 1.14k | assert(ucmp); |
1239 | 1.14k | const std::string& full_history_ts_low = cfd_->GetFullHistoryTsLow(); |
1240 | | // Update full_history_ts_low to right above cutoff udt only if that would |
1241 | | // increase it. |
1242 | 1.14k | if (cutoff_udt_.empty() || |
1243 | 0 | (!full_history_ts_low.empty() && |
1244 | 1.14k | ucmp->CompareTimestamp(cutoff_udt_, full_history_ts_low) < 0)) { |
1245 | 1.14k | return Status::OK(); |
1246 | 1.14k | } |
1247 | 0 | std::string new_full_history_ts_low; |
1248 | 0 | Slice cutoff_udt_slice = cutoff_udt_; |
1249 | | // TODO(yuzhangyu): Add a member to AdvancedColumnFamilyOptions for an |
1250 | | // operation to get the next immediately larger user-defined timestamp to |
1251 | | // expand this feature to other user-defined timestamp formats. |
1252 | 0 | GetFullHistoryTsLowFromU64CutoffTs(&cutoff_udt_slice, |
1253 | 0 | &new_full_history_ts_low); |
1254 | 0 | VersionEdit edit; |
1255 | 0 | edit.SetColumnFamily(cfd_->GetID()); |
1256 | 0 | edit.SetFullHistoryTsLow(new_full_history_ts_low); |
1257 | 0 | return versions_->LogAndApply(cfd_, ReadOptions(Env::IOActivity::kFlush), |
1258 | 0 | WriteOptions(Env::IOActivity::kFlush), &edit, |
1259 | 0 | db_mutex_, output_file_directory_); |
1260 | 1.14k | } |
1261 | | |
1262 | | } // namespace ROCKSDB_NAMESPACE |