/src/rocksdb/db/builder.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/builder.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <deque> |
14 | | #include <vector> |
15 | | |
16 | | #include "db/blob/blob_file_builder.h" |
17 | | #include "db/compaction/compaction_iterator.h" |
18 | | #include "db/dbformat.h" |
19 | | #include "db/event_helpers.h" |
20 | | #include "db/internal_stats.h" |
21 | | #include "db/merge_helper.h" |
22 | | #include "db/output_validator.h" |
23 | | #include "db/range_del_aggregator.h" |
24 | | #include "db/table_cache.h" |
25 | | #include "db/version_edit.h" |
26 | | #include "file/file_util.h" |
27 | | #include "file/filename.h" |
28 | | #include "file/read_write_util.h" |
29 | | #include "file/writable_file_writer.h" |
30 | | #include "monitoring/iostats_context_imp.h" |
31 | | #include "monitoring/thread_status_util.h" |
32 | | #include "options/options_helper.h" |
33 | | #include "rocksdb/db.h" |
34 | | #include "rocksdb/env.h" |
35 | | #include "rocksdb/file_system.h" |
36 | | #include "rocksdb/iterator.h" |
37 | | #include "rocksdb/options.h" |
38 | | #include "rocksdb/table.h" |
39 | | #include "seqno_to_time_mapping.h" |
40 | | #include "table/block_based/block_based_table_builder.h" |
41 | | #include "table/format.h" |
42 | | #include "table/internal_iterator.h" |
43 | | #include "table/unique_id_impl.h" |
44 | | #include "test_util/sync_point.h" |
45 | | #include "util/stop_watch.h" |
46 | | |
47 | | namespace ROCKSDB_NAMESPACE { |
48 | | |
49 | | class TableFactory; |
50 | | |
51 | | TableBuilder* NewTableBuilder(const TableBuilderOptions& tboptions, |
52 | 8.06k | WritableFileWriter* file) { |
53 | 8.06k | assert((tboptions.column_family_id == |
54 | 8.06k | TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == |
55 | 8.06k | tboptions.column_family_name.empty()); |
56 | 8.06k | return tboptions.moptions.table_factory->NewTableBuilder(tboptions, file); |
57 | 8.06k | } |
58 | | |
59 | | void ExtractTimestampFromTableProperties(const TableProperties& tp, |
60 | 7.93k | FileMetaData* meta) { |
61 | 7.93k | auto min_ts_iter = tp.user_collected_properties.find("rocksdb.timestamp_min"); |
62 | 7.93k | if (min_ts_iter != tp.user_collected_properties.end()) { |
63 | 0 | meta->min_timestamp = min_ts_iter->second; |
64 | 0 | } |
65 | 7.93k | auto max_ts_iter = tp.user_collected_properties.find("rocksdb.timestamp_max"); |
66 | 7.93k | if (max_ts_iter != tp.user_collected_properties.end()) { |
67 | 0 | meta->max_timestamp = max_ts_iter->second; |
68 | 0 | } |
69 | 7.93k | } |
70 | | |
71 | | Status BuildTable( |
72 | | const std::string& dbname, VersionSet* versions, |
73 | | const ImmutableDBOptions& db_options, const TableBuilderOptions& tboptions, |
74 | | const FileOptions& file_options, TableCache* table_cache, |
75 | | InternalIterator* iter, |
76 | | std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>> |
77 | | range_del_iters, |
78 | | FileMetaData* meta, std::vector<BlobFileAddition>* blob_file_additions, |
79 | | std::vector<SequenceNumber> snapshots, SequenceNumber earliest_snapshot, |
80 | | SequenceNumber earliest_write_conflict_snapshot, |
81 | | SequenceNumber job_snapshot, SnapshotChecker* snapshot_checker, |
82 | | bool paranoid_file_checks, InternalStats* internal_stats, |
83 | | IOStatus* io_status, const std::shared_ptr<IOTracer>& io_tracer, |
84 | | BlobFileCreationReason blob_creation_reason, |
85 | | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, |
86 | | EventLogger* event_logger, int job_id, TableProperties* table_properties, |
87 | | Env::WriteLifeTimeHint write_hint, const std::string* full_history_ts_low, |
88 | | BlobFileCompletionCallback* blob_callback, Version* version, |
89 | | uint64_t* memtable_payload_bytes, uint64_t* memtable_garbage_bytes, |
90 | 7.25k | InternalStats::CompactionStats* flush_stats) { |
91 | 7.25k | assert((tboptions.column_family_id == |
92 | 7.25k | TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == |
93 | 7.25k | tboptions.column_family_name.empty()); |
94 | 7.25k | auto& mutable_cf_options = tboptions.moptions; |
95 | 7.25k | auto& ioptions = tboptions.ioptions; |
96 | | // Reports the IOStats for flush for every following bytes. |
97 | 7.25k | const size_t kReportFlushIOStatsEvery = 1048576; |
98 | 7.25k | OutputValidator output_validator(tboptions.internal_comparator, |
99 | 7.25k | /*enable_hash=*/paranoid_file_checks); |
100 | 7.25k | Status s; |
101 | 7.25k | meta->fd.file_size = 0; |
102 | 7.25k | iter->SeekToFirst(); |
103 | 7.25k | std::unique_ptr<CompactionRangeDelAggregator> range_del_agg( |
104 | 7.25k | new CompactionRangeDelAggregator(&tboptions.internal_comparator, |
105 | 7.25k | snapshots, full_history_ts_low)); |
106 | 7.25k | uint64_t num_unfragmented_tombstones = 0; |
107 | 7.25k | uint64_t total_tombstone_payload_bytes = 0; |
108 | 7.25k | for (auto& range_del_iter : range_del_iters) { |
109 | 6 | num_unfragmented_tombstones += |
110 | 6 | range_del_iter->num_unfragmented_tombstones(); |
111 | 6 | total_tombstone_payload_bytes += |
112 | 6 | range_del_iter->total_tombstone_payload_bytes(); |
113 | 6 | range_del_agg->AddTombstones(std::move(range_del_iter)); |
114 | 6 | } |
115 | | |
116 | 7.25k | std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(), |
117 | 7.25k | meta->fd.GetPathId()); |
118 | 7.25k | std::vector<std::string> blob_file_paths; |
119 | 7.25k | std::string file_checksum = kUnknownFileChecksum; |
120 | 7.25k | std::string file_checksum_func_name = kUnknownFileChecksumFuncName; |
121 | 7.25k | EventHelpers::NotifyTableFileCreationStarted(ioptions.listeners, dbname, |
122 | 7.25k | tboptions.column_family_name, |
123 | 7.25k | fname, job_id, tboptions.reason); |
124 | 7.25k | Env* env = db_options.env; |
125 | 7.25k | assert(env); |
126 | 7.25k | FileSystem* fs = db_options.fs.get(); |
127 | 7.25k | assert(fs); |
128 | | |
129 | 7.25k | TableProperties tp; |
130 | 7.25k | bool table_file_created = false; |
131 | 7.25k | if (iter->Valid() || !range_del_agg->IsEmpty()) { |
132 | 7.25k | std::unique_ptr<CompactionFilter> compaction_filter; |
133 | 7.25k | if (ioptions.compaction_filter_factory != nullptr && |
134 | 0 | ioptions.compaction_filter_factory->ShouldFilterTableFileCreation( |
135 | 0 | tboptions.reason)) { |
136 | 0 | CompactionFilter::Context context; |
137 | 0 | context.is_full_compaction = false; |
138 | 0 | context.is_manual_compaction = false; |
139 | 0 | context.column_family_id = tboptions.column_family_id; |
140 | 0 | context.reason = tboptions.reason; |
141 | 0 | compaction_filter = |
142 | 0 | ioptions.compaction_filter_factory->CreateCompactionFilter(context); |
143 | 0 | if (compaction_filter != nullptr && |
144 | 0 | !compaction_filter->IgnoreSnapshots()) { |
145 | 0 | s.PermitUncheckedError(); |
146 | 0 | return Status::NotSupported( |
147 | 0 | "CompactionFilter::IgnoreSnapshots() = false is not supported " |
148 | 0 | "anymore."); |
149 | 0 | } |
150 | 0 | } |
151 | | |
152 | 7.25k | TableBuilder* builder; |
153 | 7.25k | std::unique_ptr<WritableFileWriter> file_writer; |
154 | 7.25k | { |
155 | 7.25k | std::unique_ptr<FSWritableFile> file; |
156 | | #ifndef NDEBUG |
157 | | bool use_direct_writes = file_options.use_direct_writes; |
158 | | TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes); |
159 | | #endif // !NDEBUG |
160 | 7.25k | FileOptions fo_copy = file_options; |
161 | 7.25k | fo_copy.write_hint = write_hint; |
162 | 7.25k | IOStatus io_s = NewWritableFile(fs, fname, &file, fo_copy); |
163 | 7.25k | assert(s.ok()); |
164 | 7.25k | s = io_s; |
165 | 7.25k | if (io_status->ok()) { |
166 | 7.25k | *io_status = io_s; |
167 | 7.25k | } |
168 | 7.25k | if (!s.ok()) { |
169 | 0 | EventHelpers::LogAndNotifyTableFileCreationFinished( |
170 | 0 | event_logger, ioptions.listeners, dbname, |
171 | 0 | tboptions.column_family_name, fname, job_id, meta->fd, |
172 | 0 | kInvalidBlobFileNumber, tp, tboptions.reason, s, file_checksum, |
173 | 0 | file_checksum_func_name); |
174 | 0 | return s; |
175 | 0 | } |
176 | | |
177 | 7.25k | table_file_created = true; |
178 | 7.25k | FileTypeSet tmp_set = ioptions.checksum_handoff_file_types; |
179 | 7.25k | file->SetIOPriority(tboptions.write_options.rate_limiter_priority); |
180 | | // Subsequent attempts to override the hint via SetWriteLifeTimeHint |
181 | | // with the very same value will be ignored by the fs. |
182 | 7.25k | file->SetWriteLifeTimeHint(fo_copy.write_hint); |
183 | 7.25k | file_writer.reset(new WritableFileWriter( |
184 | 7.25k | std::move(file), fname, file_options, ioptions.clock, io_tracer, |
185 | 7.25k | ioptions.stats, Histograms::SST_WRITE_MICROS, ioptions.listeners, |
186 | 7.25k | ioptions.file_checksum_gen_factory.get(), |
187 | 7.25k | tmp_set.Contains(FileType::kTableFile), false)); |
188 | | |
189 | 7.25k | builder = NewTableBuilder(tboptions, file_writer.get()); |
190 | 7.25k | } |
191 | | |
192 | 0 | auto ucmp = tboptions.internal_comparator.user_comparator(); |
193 | 7.25k | MergeHelper merge( |
194 | 7.25k | env, ucmp, ioptions.merge_operator.get(), compaction_filter.get(), |
195 | 7.25k | ioptions.logger, true /* internal key corruption is not ok */, |
196 | 7.25k | snapshots.empty() ? 0 : snapshots.back(), snapshot_checker); |
197 | | |
198 | 7.25k | std::unique_ptr<BlobFileBuilder> blob_file_builder( |
199 | 7.25k | (mutable_cf_options.enable_blob_files && |
200 | 0 | tboptions.level_at_creation >= |
201 | 0 | mutable_cf_options.blob_file_starting_level && |
202 | 0 | blob_file_additions) |
203 | 7.25k | ? new BlobFileBuilder( |
204 | 0 | versions, fs, &ioptions, &mutable_cf_options, &file_options, |
205 | 0 | &(tboptions.write_options), tboptions.db_id, |
206 | 0 | tboptions.db_session_id, job_id, tboptions.column_family_id, |
207 | 0 | tboptions.column_family_name, write_hint, io_tracer, |
208 | 0 | blob_callback, blob_creation_reason, &blob_file_paths, |
209 | 0 | blob_file_additions) |
210 | 7.25k | : nullptr); |
211 | | |
212 | 7.25k | const std::atomic<bool> kManualCompactionCanceledFalse{false}; |
213 | 7.25k | CompactionIterator c_iter( |
214 | 7.25k | iter, ucmp, &merge, kMaxSequenceNumber, &snapshots, earliest_snapshot, |
215 | 7.25k | earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, |
216 | 7.25k | ShouldReportDetailedTime(env, ioptions.stats), range_del_agg.get(), |
217 | 7.25k | blob_file_builder.get(), ioptions.allow_data_in_errors, |
218 | 7.25k | ioptions.enforce_single_del_contracts, |
219 | 7.25k | /*manual_compaction_canceled=*/kManualCompactionCanceledFalse, |
220 | 7.25k | true /* must_count_input_entries */, |
221 | 7.25k | /*compaction=*/nullptr, compaction_filter.get(), |
222 | 7.25k | /*shutting_down=*/nullptr, db_options.info_log, full_history_ts_low); |
223 | | |
224 | 7.25k | SequenceNumber smallest_preferred_seqno = kMaxSequenceNumber; |
225 | 7.25k | std::string key_after_flush_buf; |
226 | 7.25k | std::string value_buf; |
227 | 7.25k | c_iter.SeekToFirst(); |
228 | 15.2k | for (; c_iter.Valid(); c_iter.Next()) { |
229 | 7.97k | const Slice& key = c_iter.key(); |
230 | 7.97k | const Slice& value = c_iter.value(); |
231 | 7.97k | ParsedInternalKey ikey = c_iter.ikey(); |
232 | 7.97k | Slice key_after_flush = key; |
233 | 7.97k | Slice value_after_flush = value; |
234 | | |
235 | 7.97k | if (ikey.type == kTypeValuePreferredSeqno) { |
236 | 0 | auto [unpacked_value, unix_write_time] = |
237 | 0 | ParsePackedValueWithWriteTime(value); |
238 | 0 | SequenceNumber preferred_seqno = |
239 | 0 | seqno_to_time_mapping |
240 | 0 | ? seqno_to_time_mapping->GetProximalSeqnoBeforeTime( |
241 | 0 | unix_write_time) |
242 | 0 | : kMaxSequenceNumber; |
243 | 0 | if (preferred_seqno < ikey.sequence) { |
244 | 0 | value_after_flush = |
245 | 0 | PackValueAndSeqno(unpacked_value, preferred_seqno, &value_buf); |
246 | 0 | smallest_preferred_seqno = |
247 | 0 | std::min(smallest_preferred_seqno, preferred_seqno); |
248 | 0 | } else { |
249 | | // Cannot get a useful preferred seqno, convert it to a kTypeValue. |
250 | 0 | key_after_flush_buf.assign(key.data(), key.size()); |
251 | 0 | UpdateInternalKey(&key_after_flush_buf, ikey.sequence, kTypeValue); |
252 | 0 | ikey = ParsedInternalKey(ikey.user_key, ikey.sequence, kTypeValue); |
253 | 0 | key_after_flush = key_after_flush_buf; |
254 | 0 | value_after_flush = ParsePackedValueForValue(value); |
255 | 0 | } |
256 | 0 | } |
257 | | |
258 | | // Generate a rolling 64-bit hash of the key and values |
259 | | // Note : |
260 | | // Here "key" integrates 'sequence_number'+'kType'+'user key'. |
261 | 7.97k | s = output_validator.Add(key_after_flush, value_after_flush); |
262 | 7.97k | if (!s.ok()) { |
263 | 0 | break; |
264 | 0 | } |
265 | 7.97k | builder->Add(key_after_flush, value_after_flush); |
266 | | |
267 | 7.97k | if (flush_stats) { |
268 | 7.97k | flush_stats->num_output_records++; |
269 | 7.97k | } |
270 | | |
271 | 7.97k | s = meta->UpdateBoundaries(key_after_flush, value_after_flush, |
272 | 7.97k | ikey.sequence, ikey.type); |
273 | 7.97k | if (!s.ok()) { |
274 | 0 | break; |
275 | 0 | } |
276 | | |
277 | | // TODO(noetzli): Update stats after flush, too. |
278 | | // TODO(hx235): Replace `rate_limiter_priority` with `io_activity` for |
279 | | // flush IO in repair when we have an `Env::IOActivity` enum for it |
280 | 7.97k | if ((tboptions.write_options.io_activity == Env::IOActivity::kFlush || |
281 | 6.64k | tboptions.write_options.io_activity == Env::IOActivity::kDBOpen || |
282 | 0 | tboptions.write_options.rate_limiter_priority == Env::IO_HIGH) && |
283 | 7.97k | IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { |
284 | 6.59k | ThreadStatusUtil::SetThreadOperationProperty( |
285 | 6.59k | ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); |
286 | 6.59k | } |
287 | 7.97k | } |
288 | 7.25k | if (!s.ok()) { |
289 | 0 | c_iter.status().PermitUncheckedError(); |
290 | 7.25k | } else if (!c_iter.status().ok()) { |
291 | 0 | s = c_iter.status(); |
292 | 0 | } |
293 | | |
294 | 7.25k | if (s.ok()) { |
295 | 7.25k | auto range_del_it = range_del_agg->NewIterator(); |
296 | 7.25k | Slice last_tombstone_start_user_key{}; |
297 | 7.29k | for (range_del_it->SeekToFirst(); range_del_it->Valid(); |
298 | 7.25k | range_del_it->Next()) { |
299 | 47 | auto tombstone = range_del_it->Tombstone(); |
300 | 47 | std::pair<InternalKey, Slice> kv = tombstone.Serialize(); |
301 | 47 | builder->Add(kv.first.Encode(), kv.second); |
302 | 47 | if (flush_stats) { |
303 | 47 | flush_stats->num_output_records++; |
304 | 47 | } |
305 | 47 | InternalKey tombstone_end = tombstone.SerializeEndKey(); |
306 | 47 | meta->UpdateBoundariesForRange(kv.first, tombstone_end, tombstone.seq_, |
307 | 47 | tboptions.internal_comparator); |
308 | 47 | if (version) { |
309 | 47 | if (last_tombstone_start_user_key.empty() || |
310 | 38 | ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key, |
311 | 47 | range_del_it->start_key()) < 0) { |
312 | 47 | SizeApproximationOptions approx_opts; |
313 | 47 | approx_opts.files_size_error_margin = 0.1; |
314 | 47 | meta->compensated_range_deletion_size += versions->ApproximateSize( |
315 | 47 | approx_opts, tboptions.read_options, version, kv.first.Encode(), |
316 | 47 | tombstone_end.Encode(), 0 /* start_level */, -1 /* end_level */, |
317 | 47 | TableReaderCaller::kFlush); |
318 | 47 | } |
319 | 47 | last_tombstone_start_user_key = range_del_it->start_key(); |
320 | 47 | } |
321 | 47 | } |
322 | 7.25k | } |
323 | | |
324 | 7.25k | TEST_SYNC_POINT("BuildTable:BeforeFinishBuildTable"); |
325 | 7.25k | const bool empty = builder->IsEmpty(); |
326 | 7.25k | if (flush_stats) { |
327 | 7.25k | assert(c_iter.HasNumInputEntryScanned()); |
328 | 7.25k | flush_stats->num_input_records = |
329 | 7.25k | c_iter.NumInputEntryScanned() + num_unfragmented_tombstones; |
330 | 7.25k | } |
331 | 7.25k | if (!s.ok() || empty) { |
332 | 0 | builder->Abandon(); |
333 | 7.25k | } else { |
334 | 7.25k | SeqnoToTimeMapping relevant_mapping; |
335 | 7.25k | if (seqno_to_time_mapping) { |
336 | 0 | relevant_mapping.CopyFromSeqnoRange( |
337 | 0 | *seqno_to_time_mapping, |
338 | 0 | std::min(meta->fd.smallest_seqno, smallest_preferred_seqno), |
339 | 0 | meta->fd.largest_seqno); |
340 | 0 | relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST); |
341 | 0 | relevant_mapping.Enforce(tboptions.file_creation_time); |
342 | 0 | } |
343 | 7.25k | builder->SetSeqnoTimeTableProperties( |
344 | 7.25k | relevant_mapping, |
345 | 7.25k | ioptions.compaction_style == CompactionStyle::kCompactionStyleFIFO |
346 | 7.25k | ? meta->file_creation_time |
347 | 7.25k | : meta->oldest_ancester_time); |
348 | 7.25k | s = builder->Finish(); |
349 | 7.25k | } |
350 | 7.25k | if (io_status->ok()) { |
351 | 7.25k | *io_status = builder->io_status(); |
352 | 7.25k | } |
353 | | |
354 | 7.25k | if (s.ok() && !empty) { |
355 | 7.25k | if (flush_stats) { |
356 | 7.25k | flush_stats->bytes_written_pre_comp = builder->PreCompressionSize(); |
357 | | // Add worker CPU micros here. Caller needs to add CPU micros from |
358 | | // calling thread. |
359 | 7.25k | flush_stats->cpu_micros += builder->GetWorkerCPUMicros(); |
360 | 7.25k | } |
361 | 7.25k | uint64_t file_size = builder->FileSize(); |
362 | 7.25k | meta->fd.file_size = file_size; |
363 | 7.25k | meta->tail_size = builder->GetTailSize(); |
364 | 7.25k | meta->marked_for_compaction = builder->NeedCompact(); |
365 | 7.25k | meta->user_defined_timestamps_persisted = |
366 | 7.25k | ioptions.persist_user_defined_timestamps; |
367 | 7.25k | assert(meta->fd.GetFileSize() > 0); |
368 | 7.25k | tp = builder |
369 | 7.25k | ->GetTableProperties(); // refresh now that builder is finished |
370 | 7.25k | ExtractTimestampFromTableProperties(tp, meta); |
371 | 7.25k | if (memtable_payload_bytes != nullptr && |
372 | 1.14k | memtable_garbage_bytes != nullptr) { |
373 | 1.14k | const CompactionIterationStats& ci_stats = c_iter.iter_stats(); |
374 | 1.14k | uint64_t total_payload_bytes = ci_stats.total_input_raw_key_bytes + |
375 | 1.14k | ci_stats.total_input_raw_value_bytes + |
376 | 1.14k | total_tombstone_payload_bytes; |
377 | 1.14k | uint64_t total_payload_bytes_written = |
378 | 1.14k | (tp.raw_key_size + tp.raw_value_size); |
379 | | // Prevent underflow, which may still happen at this point |
380 | | // since we only support inserts, deletes, and deleteRanges. |
381 | 1.14k | if (total_payload_bytes_written <= total_payload_bytes) { |
382 | 1.14k | *memtable_payload_bytes = total_payload_bytes; |
383 | 1.14k | *memtable_garbage_bytes = |
384 | 1.14k | total_payload_bytes - total_payload_bytes_written; |
385 | 1.14k | } else { |
386 | 0 | *memtable_payload_bytes = 0; |
387 | 0 | *memtable_garbage_bytes = 0; |
388 | 0 | } |
389 | 1.14k | } |
390 | 7.25k | if (table_properties) { |
391 | 7.25k | *table_properties = tp; |
392 | 7.25k | } |
393 | 7.25k | } |
394 | 7.25k | delete builder; |
395 | | |
396 | | // Finish and check for file errors |
397 | 7.25k | TEST_SYNC_POINT("BuildTable:BeforeSyncTable"); |
398 | 7.25k | IOOptions opts; |
399 | 7.25k | *io_status = |
400 | 7.25k | WritableFileWriter::PrepareIOOptions(tboptions.write_options, opts); |
401 | 7.25k | if (s.ok() && io_status->ok() && !empty) { |
402 | 7.25k | StopWatch sw(ioptions.clock, ioptions.stats, TABLE_SYNC_MICROS); |
403 | 7.25k | *io_status = file_writer->Sync(opts, ioptions.use_fsync); |
404 | 7.25k | } |
405 | 7.25k | TEST_SYNC_POINT("BuildTable:BeforeCloseTableFile"); |
406 | 7.25k | if (s.ok() && io_status->ok() && !empty) { |
407 | 7.25k | *io_status = file_writer->Close(opts); |
408 | 7.25k | } |
409 | 7.25k | if (s.ok() && io_status->ok() && !empty) { |
410 | | // Add the checksum information to file metadata. |
411 | 7.25k | meta->file_checksum = file_writer->GetFileChecksum(); |
412 | 7.25k | meta->file_checksum_func_name = file_writer->GetFileChecksumFuncName(); |
413 | 7.25k | file_checksum = meta->file_checksum; |
414 | 7.25k | file_checksum_func_name = meta->file_checksum_func_name; |
415 | | // Set unique_id only if db_id and db_session_id exist |
416 | 7.25k | if (!tboptions.db_id.empty() && !tboptions.db_session_id.empty()) { |
417 | 7.25k | if (!GetSstInternalUniqueId(tboptions.db_id, tboptions.db_session_id, |
418 | 7.25k | meta->fd.GetNumber(), &(meta->unique_id)) |
419 | 7.25k | .ok()) { |
420 | | // if failed to get unique id, just set it Null |
421 | 0 | meta->unique_id = kNullUniqueId64x2; |
422 | 0 | } |
423 | 7.25k | } |
424 | 7.25k | } |
425 | | |
426 | 7.25k | if (s.ok()) { |
427 | 7.25k | s = *io_status; |
428 | 7.25k | } |
429 | | |
430 | | // TODO(yuzhangyu): handle the key copy in the blob when ts should be |
431 | | // stripped. |
432 | 7.25k | if (blob_file_builder) { |
433 | 0 | if (s.ok()) { |
434 | 0 | s = blob_file_builder->Finish(); |
435 | 0 | } else { |
436 | 0 | blob_file_builder->Abandon(s); |
437 | 0 | } |
438 | 0 | blob_file_builder.reset(); |
439 | 0 | } |
440 | | |
441 | | // TODO Also check the IO status when create the Iterator. |
442 | | |
443 | 7.25k | TEST_SYNC_POINT("BuildTable:BeforeOutputValidation"); |
444 | 7.25k | if (s.ok() && !empty) { |
445 | | // Verify that the table is usable |
446 | | // We set for_compaction to false and don't OptimizeForCompactionTableRead |
447 | | // here because this is a special case after we finish the table building. |
448 | | // No matter whether use_direct_io_for_flush_and_compaction is true, |
449 | | // the goal is to cache it here for further user reads. |
450 | 7.25k | std::unique_ptr<InternalIterator> it(table_cache->NewIterator( |
451 | 7.25k | tboptions.read_options, file_options, tboptions.internal_comparator, |
452 | 7.25k | *meta, nullptr /* range_del_agg */, mutable_cf_options, nullptr, |
453 | 7.25k | (internal_stats == nullptr) ? nullptr |
454 | 7.25k | : internal_stats->GetFileReadHist(0), |
455 | 7.25k | TableReaderCaller::kFlush, /*arena=*/nullptr, |
456 | 7.25k | /*skip_filter=*/false, tboptions.level_at_creation, |
457 | 7.25k | MaxFileSizeForL0MetaPin(mutable_cf_options), |
458 | 7.25k | /*smallest_compaction_key=*/nullptr, |
459 | 7.25k | /*largest_compaction_key*/ nullptr, |
460 | 7.25k | /*allow_unprepared_value*/ false)); |
461 | 7.25k | s = it->status(); |
462 | 7.25k | if (s.ok() && paranoid_file_checks) { |
463 | 0 | OutputValidator file_validator(tboptions.internal_comparator, |
464 | 0 | /*enable_hash=*/true); |
465 | 0 | for (it->SeekToFirst(); it->Valid(); it->Next()) { |
466 | | // Generate a rolling 64-bit hash of the key and values |
467 | 0 | file_validator.Add(it->key(), it->value()).PermitUncheckedError(); |
468 | 0 | } |
469 | 0 | s = it->status(); |
470 | 0 | if (s.ok() && !output_validator.CompareValidator(file_validator)) { |
471 | 0 | s = Status::Corruption("Paranoid checksums do not match"); |
472 | 0 | } |
473 | 0 | } |
474 | 7.25k | } |
475 | 7.25k | } |
476 | | |
477 | | // Check for input iterator errors |
478 | 7.25k | if (!iter->status().ok()) { |
479 | 0 | s = iter->status(); |
480 | 0 | } |
481 | | |
482 | 7.25k | if (!s.ok() || meta->fd.GetFileSize() == 0) { |
483 | 0 | TEST_SYNC_POINT("BuildTable:BeforeDeleteFile"); |
484 | |
|
485 | 0 | constexpr IODebugContext* dbg = nullptr; |
486 | |
|
487 | 0 | if (table_file_created) { |
488 | 0 | IOOptions opts; |
489 | 0 | Status prepare = |
490 | 0 | WritableFileWriter::PrepareIOOptions(tboptions.write_options, opts); |
491 | 0 | if (prepare.ok()) { |
492 | | // FIXME: track file for "slow" deletion, e.g. into the |
493 | | // VersionSet::obsolete_files_ pipeline |
494 | 0 | Status ignored = fs->DeleteFile(fname, opts, dbg); |
495 | 0 | ignored.PermitUncheckedError(); |
496 | 0 | } |
497 | | // Ensure we don't leak table cache entries when throwing away output |
498 | | // files. (The usual logic in PurgeObsoleteFiles is not applicable because |
499 | | // this function deletes the obsolete file itself, while they should |
500 | | // probably go into the VersionSet::obsolete_files_ pipeline.) |
501 | 0 | TableCache::ReleaseObsolete(table_cache->get_cache().get(), |
502 | 0 | meta->fd.GetNumber(), nullptr /*handle*/, |
503 | 0 | mutable_cf_options.uncache_aggressiveness); |
504 | 0 | } |
505 | |
|
506 | 0 | assert(blob_file_additions || blob_file_paths.empty()); |
507 | |
|
508 | 0 | if (blob_file_additions) { |
509 | 0 | for (const std::string& blob_file_path : blob_file_paths) { |
510 | 0 | Status ignored = DeleteDBFile(&db_options, blob_file_path, dbname, |
511 | 0 | /*force_bg=*/false, /*force_fg=*/false); |
512 | 0 | ignored.PermitUncheckedError(); |
513 | 0 | TEST_SYNC_POINT("BuildTable::AfterDeleteFile"); |
514 | 0 | } |
515 | 0 | } |
516 | 0 | } |
517 | | |
518 | 7.25k | Status status_for_listener = s; |
519 | 7.25k | if (meta->fd.GetFileSize() == 0) { |
520 | 0 | fname = "(nil)"; |
521 | 0 | if (s.ok()) { |
522 | 0 | status_for_listener = Status::Aborted("Empty SST file not kept"); |
523 | 0 | } |
524 | 0 | } |
525 | | // Output to event logger and fire events. |
526 | 7.25k | EventHelpers::LogAndNotifyTableFileCreationFinished( |
527 | 7.25k | event_logger, ioptions.listeners, dbname, tboptions.column_family_name, |
528 | 7.25k | fname, job_id, meta->fd, meta->oldest_blob_file_number, tp, |
529 | 7.25k | tboptions.reason, status_for_listener, file_checksum, |
530 | 7.25k | file_checksum_func_name); |
531 | | |
532 | 7.25k | return s; |
533 | 7.25k | } |
534 | | |
535 | | } // namespace ROCKSDB_NAMESPACE |