Coverage Report

Created: 2026-05-31 07:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/external_sst_file_ingestion_job.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/external_sst_file_ingestion_job.h"
7
8
#include <algorithm>
9
#include <cinttypes>
10
#include <string>
11
#include <unordered_set>
12
#include <vector>
13
14
#include "db/builder.h"
15
#include "db/db_impl/db_impl.h"
16
#include "db/version_edit.h"
17
#include "file/file_util.h"
18
#include "file/random_access_file_reader.h"
19
#include "logging/logging.h"
20
#include "monitoring/statistics_impl.h"
21
#include "table/merging_iterator.h"
22
#include "table/sst_file_writer_collectors.h"
23
#include "table/table_builder.h"
24
#include "table/unique_id_impl.h"
25
#include "test_util/sync_point.h"
26
#include "util/udt_util.h"
27
28
namespace ROCKSDB_NAMESPACE {
29
30
Status ExternalSstFileIngestionJob::Prepare(
31
    const std::vector<std::string>& external_files_paths,
32
    const std::vector<std::string>& files_checksums,
33
    const std::vector<std::string>& files_checksum_func_names,
34
    const std::optional<RangeOpt>& atomic_replace_range,
35
    const Temperature& file_temperature, uint64_t next_file_number,
36
0
    SuperVersion* sv) {
37
0
  Status status;
38
39
  // Read the information of files we are ingesting
40
0
  for (const std::string& file_path : external_files_paths) {
41
0
    IngestedFileInfo file_to_ingest;
42
    // For temperature, first assume it matches provided hint
43
0
    file_to_ingest.file_temperature = file_temperature;
44
0
    status =
45
0
        GetIngestedFileInfo(file_path, next_file_number++, &file_to_ingest, sv);
46
0
    if (!status.ok()) {
47
0
      ROCKS_LOG_WARN(db_options_.info_log,
48
0
                     "Failed to get ingested file info: %s: %s",
49
0
                     file_path.c_str(), status.ToString().c_str());
50
0
      return status;
51
0
    }
52
53
    // Files generated in another DB or CF may have a different column family
54
    // ID, so we let it pass here.
55
0
    if (file_to_ingest.cf_id !=
56
0
            TablePropertiesCollectorFactory::Context::kUnknownColumnFamily &&
57
0
        file_to_ingest.cf_id != cfd_->GetID() &&
58
0
        !ingestion_options_.allow_db_generated_files) {
59
0
      return Status::InvalidArgument(
60
0
          "External file column family id don't match");
61
0
    }
62
63
0
    if (file_to_ingest.num_entries == 0 &&
64
0
        file_to_ingest.num_range_deletions == 0) {
65
0
      return Status::InvalidArgument("File contain no entries");
66
0
    }
67
68
0
    if (!file_to_ingest.smallest_internal_key.Valid() ||
69
0
        !file_to_ingest.largest_internal_key.Valid()) {
70
0
      return Status::Corruption("Generated table have corrupted keys");
71
0
    }
72
73
0
    files_to_ingest_.emplace_back(std::move(file_to_ingest));
74
0
  }
75
76
0
  auto num_files = files_to_ingest_.size();
77
0
  if (num_files == 0) {
78
0
    return Status::InvalidArgument("The list of files is empty");
79
0
  } else if (num_files > 1) {
80
    // Verify that passed files don't have overlapping ranges
81
0
    autovector<const IngestedFileInfo*> sorted_files;
82
0
    for (size_t i = 0; i < num_files; i++) {
83
0
      sorted_files.push_back(&files_to_ingest_[i]);
84
0
    }
85
86
0
    std::sort(sorted_files.begin(), sorted_files.end(), file_range_checker_);
87
88
0
    for (size_t i = 0; i + 1 < num_files; i++) {
89
0
      if (file_range_checker_.Overlaps(*sorted_files[i], *sorted_files[i + 1],
90
0
                                       /* known_sorted= */ true)) {
91
0
        files_overlap_ = true;
92
0
        break;
93
0
      }
94
0
    }
95
0
  }
96
97
0
  if (atomic_replace_range.has_value()) {
98
0
    atomic_replace_range_.emplace();
99
100
0
    if (atomic_replace_range->start && atomic_replace_range->limit) {
101
      // User keys to internal keys (with timestamps)
102
0
      const size_t ts_sz = ucmp_->timestamp_size();
103
0
      std::string start_with_ts, limit_with_ts;
104
0
      auto [start, limit] = MaybeAddTimestampsToRange(
105
0
          atomic_replace_range->start, atomic_replace_range->limit, ts_sz,
106
0
          &start_with_ts, &limit_with_ts);
107
0
      assert(start.has_value());
108
0
      assert(limit.has_value());
109
0
      atomic_replace_range_->smallest_internal_key.Set(
110
0
          *start, kMaxSequenceNumber, kValueTypeForSeek);
111
0
      atomic_replace_range_->largest_internal_key.Set(
112
0
          *limit, kMaxSequenceNumber, kValueTypeForSeek);
113
      // Check files to ingest against replace range
114
0
      for (size_t i = 0; i < num_files; i++) {
115
0
        if (!file_range_checker_.Contains(*atomic_replace_range_,
116
0
                                          files_to_ingest_[i])) {
117
0
          return Status::InvalidArgument(
118
0
              "Atomic replace range does not contain all files");
119
0
        }
120
0
      }
121
0
    } else {
122
      // Currently if either bound is not present, both must be
123
0
      assert(atomic_replace_range->start.has_value() == false);
124
0
      assert(atomic_replace_range->limit.has_value() == false);
125
0
      assert(atomic_replace_range_->smallest_internal_key.unset());
126
0
      assert(atomic_replace_range_->largest_internal_key.unset());
127
0
    }
128
0
  }
129
130
0
  if (files_overlap_) {
131
0
    if (ingestion_options_.ingest_behind) {
132
0
      return Status::NotSupported(
133
0
          "Files with overlapping ranges cannot be ingested with ingestion "
134
0
          "behind mode.");
135
0
    }
136
137
    // Overlapping files need at least two different sequence numbers. If
138
    // settings disables global seqno, ingestion will fail anyway, so fail
139
    // fast in prepare.
140
0
    if (!ingestion_options_.allow_global_seqno &&
141
0
        !ingestion_options_.allow_db_generated_files) {
142
0
      return Status::InvalidArgument(
143
0
          "Global seqno is required, but disabled (because external files key "
144
0
          "range overlaps).");
145
0
    }
146
147
0
    if (ucmp_->timestamp_size() > 0) {
148
0
      return Status::NotSupported(
149
0
          "Files with overlapping ranges cannot be ingested to column "
150
0
          "family with user-defined timestamp enabled.");
151
0
    }
152
0
  }
153
154
  // Copy/Move external files into DB
155
0
  std::unordered_set<size_t> ingestion_path_ids;
156
0
  for (IngestedFileInfo& f : files_to_ingest_) {
157
0
    f.copy_file = false;
158
0
    const std::string path_outside_db = f.external_file_path;
159
0
    const std::string path_inside_db = TableFileName(
160
0
        cfd_->ioptions().cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
161
0
    if (ingestion_options_.move_files || ingestion_options_.link_files) {
162
0
      status =
163
0
          fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(), nullptr);
164
0
      if (status.ok()) {
165
        // It is unsafe to assume application had sync the file and file
166
        // directory before ingest the file. For integrity of RocksDB we need
167
        // to sync the file.
168
169
        // TODO(xingbo), We should in general be moving away from production
170
        // uses of ReuseWritableFile (except explicitly for WAL recycling),
171
        // ReopenWritableFile, and NewRandomRWFile. We should create a
172
        // FileSystem::SyncFile/FsyncFile API that by default does the
173
        // re-open+sync+close combo but can (a) be reused easily, and (b) be
174
        // overridden to do that more cleanly, e.g. in EncryptedEnv.
175
        // https://github.com/facebook/rocksdb/issues/13741
176
0
        std::unique_ptr<FSWritableFile> file_to_sync;
177
0
        Status s = fs_->ReopenWritableFile(path_inside_db, env_options_,
178
0
                                           &file_to_sync, nullptr);
179
0
        TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:Reopen",
180
0
                                 &s);
181
        // Some file systems (especially remote/distributed) don't support
182
        // reopening a file for writing and don't require reopening and
183
        // syncing the file. Ignore the NotSupported error in that case.
184
0
        if (!s.IsNotSupported()) {
185
0
          status = s;
186
0
          if (status.ok()) {
187
0
            TEST_SYNC_POINT(
188
0
                "ExternalSstFileIngestionJob::BeforeSyncIngestedFile");
189
0
            status = SyncIngestedFile(file_to_sync.get());
190
0
            TEST_SYNC_POINT(
191
0
                "ExternalSstFileIngestionJob::AfterSyncIngestedFile");
192
0
            if (!status.ok()) {
193
0
              ROCKS_LOG_WARN(db_options_.info_log,
194
0
                             "Failed to sync ingested file %s: %s",
195
0
                             path_inside_db.c_str(), status.ToString().c_str());
196
0
            }
197
0
          }
198
0
        }
199
0
      } else if (status.IsNotSupported() &&
200
0
                 ingestion_options_.failed_move_fall_back_to_copy) {
201
        // Original file is on a different FS, use copy instead of hard linking.
202
0
        f.copy_file = true;
203
0
        ROCKS_LOG_INFO(db_options_.info_log,
204
0
                       "Tried to link file %s but it's not supported : %s",
205
0
                       path_outside_db.c_str(), status.ToString().c_str());
206
0
      } else {
207
0
        ROCKS_LOG_WARN(db_options_.info_log, "Failed to link file %s to %s: %s",
208
0
                       path_outside_db.c_str(), path_inside_db.c_str(),
209
0
                       status.ToString().c_str());
210
0
      }
211
0
    } else {
212
0
      f.copy_file = true;
213
0
    }
214
215
0
    if (f.copy_file) {
216
0
      TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile",
217
0
                               nullptr);
218
      // Always determining the destination temperature from the ingested-to
219
      // level would be difficult because in general we only find out the level
220
      // ingested to later, during Run().
221
      // However, we can guarantee "last level" temperature for when the user
222
      // requires ingestion to the last level.
223
0
      Temperature dst_temp =
224
0
          (ingestion_options_.ingest_behind ||
225
0
           ingestion_options_.fail_if_not_bottommost_level)
226
0
              ? sv->mutable_cf_options.last_level_temperature
227
0
              : sv->mutable_cf_options.default_write_temperature;
228
      // Note: CopyFile also syncs the new file.
229
0
      status = CopyFile(fs_.get(), path_outside_db, f.file_temperature,
230
0
                        path_inside_db, dst_temp, 0, db_options_.use_fsync,
231
0
                        io_tracer_);
232
      // The destination of the copy will be ingested
233
0
      f.file_temperature = dst_temp;
234
235
0
      if (!status.ok()) {
236
0
        ROCKS_LOG_WARN(db_options_.info_log, "Failed to copy file %s to %s: %s",
237
0
                       path_outside_db.c_str(), path_inside_db.c_str(),
238
0
                       status.ToString().c_str());
239
0
      }
240
0
    } else {
241
      // Note: we currently assume that linking files does not cross
242
      // temperatures, so no need to change f.file_temperature
243
0
    }
244
0
    TEST_SYNC_POINT("ExternalSstFileIngestionJob::Prepare:FileAdded");
245
0
    if (!status.ok()) {
246
0
      break;
247
0
    }
248
0
    f.internal_file_path = path_inside_db;
249
    // Initialize the checksum information of ingested files.
250
0
    f.file_checksum = kUnknownFileChecksum;
251
0
    f.file_checksum_func_name = kUnknownFileChecksumFuncName;
252
0
    ingestion_path_ids.insert(f.fd.GetPathId());
253
0
  }
254
255
0
  TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir");
256
0
  if (status.ok()) {
257
0
    for (auto path_id : ingestion_path_ids) {
258
0
      status = directories_->GetDataDir(path_id)->FsyncWithDirOptions(
259
0
          IOOptions(), nullptr,
260
0
          DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
261
0
      if (!status.ok()) {
262
0
        ROCKS_LOG_WARN(db_options_.info_log,
263
0
                       "Failed to sync directory %" ROCKSDB_PRIszt
264
0
                       " while ingest file: %s",
265
0
                       path_id, status.ToString().c_str());
266
0
        break;
267
0
      }
268
0
    }
269
0
  }
270
0
  TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir");
271
272
  // Generate and check the sst file checksum. Note that, if
273
  // IngestExternalFileOptions::write_global_seqno is true, we will not update
274
  // the checksum information in the files_to_ingests_ here, since the file is
275
  // updated with the new global_seqno. After global_seqno is updated, DB will
276
  // generate the new checksum and store it in the Manifest. In all other cases
277
  // if ingestion_options_.write_global_seqno == true and
278
  // verify_file_checksum is false, we only check the checksum function name.
279
0
  if (status.ok() && db_options_.file_checksum_gen_factory != nullptr) {
280
0
    if (ingestion_options_.verify_file_checksum == false &&
281
0
        files_checksums.size() == files_to_ingest_.size() &&
282
0
        files_checksum_func_names.size() == files_to_ingest_.size()) {
283
      // Only when verify_file_checksum == false and the checksum for ingested
284
      // files are provided, DB will use the provided checksum and does not
285
      // generate the checksum for ingested files.
286
0
      need_generate_file_checksum_ = false;
287
0
    } else {
288
0
      need_generate_file_checksum_ = true;
289
0
    }
290
0
    std::vector<std::string> generated_checksums;
291
0
    std::vector<std::string> generated_checksum_func_names;
292
    // Step 1: generate the checksum for ingested sst file.
293
0
    if (need_generate_file_checksum_) {
294
0
      for (size_t i = 0; i < files_to_ingest_.size(); i++) {
295
0
        std::string generated_checksum;
296
0
        std::string generated_checksum_func_name;
297
0
        std::string requested_checksum_func_name =
298
0
            i < files_checksum_func_names.size() ? files_checksum_func_names[i]
299
0
                                                 : "";
300
        // TODO: rate limit file reads for checksum calculation during file
301
        // ingestion.
302
        // TODO: plumb Env::IOActivity
303
0
        ReadOptions ro;
304
        // Pass user-provided checksums through FileOptions when available.
305
        // The caller may not have provided checksums at all (empty vectors),
306
        // so we guard with a bounds check.
307
0
        FileOptions fopts;
308
0
        if (i < files_checksums.size()) {
309
0
          fopts.file_checksum = files_checksums[i];
310
0
        }
311
0
        if (i < files_checksum_func_names.size()) {
312
0
          fopts.file_checksum_func_name = files_checksum_func_names[i];
313
0
        } else {
314
0
          fopts.file_checksum_func_name = kNoFileChecksumFuncName;
315
0
        }
316
0
        IOStatus io_s = GenerateOneFileChecksum(
317
0
            fs_.get(), files_to_ingest_[i].internal_file_path,
318
0
            db_options_.file_checksum_gen_factory.get(),
319
0
            requested_checksum_func_name, &generated_checksum,
320
0
            &generated_checksum_func_name,
321
0
            ingestion_options_.verify_checksums_readahead_size,
322
0
            db_options_.allow_mmap_reads, io_tracer_,
323
0
            db_options_.rate_limiter.get(), ro, db_options_.stats,
324
0
            db_options_.clock, fopts);
325
0
        if (!io_s.ok()) {
326
0
          status = io_s;
327
0
          ROCKS_LOG_WARN(db_options_.info_log,
328
0
                         "Sst file checksum generation of file: %s failed: %s",
329
0
                         files_to_ingest_[i].internal_file_path.c_str(),
330
0
                         status.ToString().c_str());
331
0
          break;
332
0
        }
333
0
        if (ingestion_options_.write_global_seqno == false) {
334
0
          files_to_ingest_[i].file_checksum = generated_checksum;
335
0
          files_to_ingest_[i].file_checksum_func_name =
336
0
              generated_checksum_func_name;
337
0
        }
338
0
        generated_checksums.push_back(generated_checksum);
339
0
        generated_checksum_func_names.push_back(generated_checksum_func_name);
340
0
      }
341
0
    }
342
343
    // Step 2: based on the verify_file_checksum and ingested checksum
344
    // information, do the verification.
345
0
    if (status.ok()) {
346
0
      if (files_checksums.size() == files_to_ingest_.size() &&
347
0
          files_checksum_func_names.size() == files_to_ingest_.size()) {
348
        // Verify the checksum and checksum function name.
349
0
        if (ingestion_options_.verify_file_checksum) {
350
0
          for (size_t i = 0; i < files_to_ingest_.size(); i++) {
351
0
            if (files_checksum_func_names[i] !=
352
0
                generated_checksum_func_names[i]) {
353
0
              status = Status::InvalidArgument(
354
0
                  "DB file checksum gen factory " +
355
0
                  std::string(db_options_.file_checksum_gen_factory->Name()) +
356
0
                  " generated checksum function name " +
357
0
                  generated_checksum_func_names[i] + " for file " +
358
0
                  external_files_paths[i] +
359
0
                  " which does not match requested/provided " +
360
0
                  files_checksum_func_names[i]);
361
0
              break;
362
0
            }
363
0
            if (files_checksums[i] != generated_checksums[i]) {
364
0
              status = Status::Corruption(
365
0
                  "Checksum verification mismatch for ingestion file " +
366
0
                  external_files_paths[i] + " using function " +
367
0
                  generated_checksum_func_names[i] + ". Expected: " +
368
0
                  Slice(files_checksums[i]).ToString(/*hex=*/true) +
369
0
                  " Computed: " +
370
0
                  Slice(generated_checksums[i]).ToString(/*hex=*/true));
371
0
              break;
372
0
            }
373
0
          }
374
0
        } else {
375
          // If verify_file_checksum is not enabled, we only verify the factory
376
          // recognizes the checksum function name. If it does not match, fail
377
          // the ingestion. If matches, we trust the ingested checksum
378
          // information and store in the Manifest.
379
0
          for (size_t i = 0; i < files_to_ingest_.size(); i++) {
380
0
            FileChecksumGenContext gen_context;
381
0
            gen_context.file_name = files_to_ingest_[i].internal_file_path;
382
0
            gen_context.requested_checksum_func_name =
383
0
                files_checksum_func_names[i];
384
0
            auto file_checksum_gen =
385
0
                db_options_.file_checksum_gen_factory
386
0
                    ->CreateFileChecksumGenerator(gen_context);
387
388
0
            if (file_checksum_gen == nullptr ||
389
0
                files_checksum_func_names[i] != file_checksum_gen->Name()) {
390
0
              status = Status::InvalidArgument(
391
0
                  "Checksum function name " + files_checksum_func_names[i] +
392
0
                  " for file " + external_files_paths[i] +
393
0
                  " not recognized by DB checksum gen factory" +
394
0
                  db_options_.file_checksum_gen_factory->Name() +
395
0
                  (file_checksum_gen ? (" Returned function " +
396
0
                                        std::string(file_checksum_gen->Name()))
397
0
                                     : ""));
398
0
              break;
399
0
            }
400
0
            files_to_ingest_[i].file_checksum = files_checksums[i];
401
0
            files_to_ingest_[i].file_checksum_func_name =
402
0
                files_checksum_func_names[i];
403
0
          }
404
0
        }
405
0
      } else if (files_checksums.size() != files_checksum_func_names.size() ||
406
0
                 files_checksums.size() != 0) {
407
        // The checksum or checksum function name vector are not both empty
408
        // and they are incomplete.
409
0
        status = Status::InvalidArgument(
410
0
            "The checksum information of ingested sst files are nonempty and "
411
0
            "the size of checksums or the size of the checksum function "
412
0
            "names does not match with the number of ingested sst files");
413
0
      }
414
0
      if (!status.ok()) {
415
0
        ROCKS_LOG_WARN(db_options_.info_log, "Ingestion failed: %s",
416
0
                       status.ToString().c_str());
417
0
      }
418
0
    }
419
0
  }
420
421
0
  if (status.ok()) {
422
0
    DivideInputFilesIntoBatches();
423
0
  }
424
425
0
  return status;
426
0
}
427
428
0
void ExternalSstFileIngestionJob::DivideInputFilesIntoBatches() {
429
0
  if (!files_overlap_) {
430
    // No overlap, treat as one batch without the need of tracking overall batch
431
    // range.
432
0
    file_batches_to_ingest_.emplace_back(/* _track_batch_range= */ false);
433
0
    for (auto& file : files_to_ingest_) {
434
0
      file_batches_to_ingest_.back().AddFile(&file, file_range_checker_);
435
0
    }
436
0
    return;
437
0
  }
438
439
0
  file_batches_to_ingest_.emplace_back(/* _track_batch_range= */ true);
440
0
  for (auto& file : files_to_ingest_) {
441
0
    if (!file_batches_to_ingest_.back().unset() &&
442
0
        file_range_checker_.Overlaps(file_batches_to_ingest_.back(), file,
443
0
                                     /* known_sorted= */ false)) {
444
0
      file_batches_to_ingest_.emplace_back(/* _track_batch_range= */ true);
445
0
    }
446
0
    file_batches_to_ingest_.back().AddFile(&file, file_range_checker_);
447
0
  }
448
0
}
449
450
Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed,
451
0
                                               SuperVersion* super_version) {
452
0
  Status status;
453
0
  if (atomic_replace_range_.has_value() && atomic_replace_range_->unset()) {
454
    // For replacing whole CF, we can simply check whether memtable is empty
455
0
    *flush_needed = !super_version->mem->IsEmpty();
456
0
  } else {
457
0
    autovector<UserKeyRange> ranges;
458
0
    if (atomic_replace_range_.has_value()) {
459
0
      assert(!atomic_replace_range_->smallest_internal_key.unset());
460
0
      assert(!atomic_replace_range_->largest_internal_key.unset());
461
      // NOTE: we already checked in Prepare() that the atomic_replace_range
462
      // covers all the files_to_ingest
463
      // FIXME: need to make upper bound key exclusive (not easy here because
464
      // the existing internal APIs deal in inclusive upper bound user keys)
465
0
      ranges.emplace_back(
466
0
          atomic_replace_range_->smallest_internal_key.user_key(),
467
0
          atomic_replace_range_->largest_internal_key.user_key());
468
0
    } else {
469
0
      ranges.reserve(files_to_ingest_.size());
470
0
      for (const IngestedFileInfo& file_to_ingest : files_to_ingest_) {
471
0
        ranges.emplace_back(file_to_ingest.start_ukey,
472
0
                            file_to_ingest.limit_ukey);
473
0
      }
474
0
    }
475
0
    status = cfd_->RangesOverlapWithMemtables(
476
0
        ranges, super_version, db_options_.allow_data_in_errors, flush_needed);
477
0
    if (!status.ok()) {
478
0
      ROCKS_LOG_WARN(db_options_.info_log,
479
0
                     "Failed to check ranges overlap with memtables: %s",
480
0
                     status.ToString().c_str());
481
0
    }
482
0
  }
483
0
  if (status.ok() && *flush_needed) {
484
0
    if (!ingestion_options_.allow_blocking_flush) {
485
0
      status = Status::InvalidArgument("External file requires flush");
486
0
    }
487
0
    if (ucmp_->timestamp_size() > 0) {
488
0
      status = Status::InvalidArgument(
489
0
          "Column family enables user-defined timestamps, please make "
490
0
          "sure the key range (without timestamp) of external file does not "
491
0
          "overlap with key range in the memtables.");
492
0
    }
493
0
  }
494
0
  return status;
495
0
}
496
497
// REQUIRES: we have become the only writer by entering both write_thread_ and
498
// nonmem_write_thread_
499
0
Status ExternalSstFileIngestionJob::Run() {
500
0
  SuperVersion* super_version = cfd_->GetSuperVersion();
501
  // If column family is flushed after Prepare and before Run, we should have a
502
  // specific state of Memtables. The mutable Memtable should be empty, and the
503
  // immutable Memtable list should be empty.
504
0
  if (flushed_before_run_ && (super_version->imm->NumNotFlushed() != 0 ||
505
0
                              !super_version->mem->IsEmpty())) {
506
0
    return Status::TryAgain(
507
0
        "Inconsistent memtable state detected when flushed before run.");
508
0
  }
509
0
  Status status;
510
#ifndef NDEBUG
511
  // We should never run the job with a memtable that is overlapping
512
  // with the files we are ingesting
513
  bool need_flush = false;
514
  status = NeedsFlush(&need_flush, super_version);
515
  if (!status.ok()) {
516
    ROCKS_LOG_WARN(db_options_.info_log,
517
                   "Failed to check if flush is needed: %s",
518
                   status.ToString().c_str());
519
    return status;
520
  }
521
  if (need_flush) {
522
    return Status::TryAgain("need_flush");
523
  }
524
  assert(status.ok() && need_flush == false);
525
#endif
526
527
0
  bool force_global_seqno = false;
528
529
0
  if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
530
    // We need to assign a global sequence number to all the files even
531
    // if the don't overlap with any ranges since we have snapshots
532
0
    force_global_seqno = true;
533
0
  }
534
  // It is safe to use this instead of LastAllocatedSequence since we are
535
  // the only active writer, and hence they are equal
536
0
  SequenceNumber last_seqno = versions_->LastSequence();
537
0
  edit_.SetColumnFamily(cfd_->GetID());
538
539
0
  if (atomic_replace_range_.has_value()) {
540
0
    auto* vstorage = super_version->current->storage_info();
541
0
    if (atomic_replace_range_->unset()) {
542
0
      if (cfd_->compaction_picker()->IsCompactionInProgress()) {
543
0
        return Status::InvalidArgument(
544
0
            "Atomic replace range (full) overlaps with pending compaction");
545
0
      }
546
0
      for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
547
0
        for (auto file : vstorage->LevelFiles(lvl)) {
548
          // Set up to delete file to be replaced
549
0
          edit_.DeleteFile(lvl, file->fd.GetNumber());
550
0
        }
551
0
      }
552
0
    } else {
553
0
      assert(!atomic_replace_range_->smallest_internal_key.unset());
554
0
      assert(!atomic_replace_range_->largest_internal_key.unset());
555
0
      for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
556
0
        if (cfd_->RangeOverlapWithCompaction(
557
0
                atomic_replace_range_->smallest_internal_key.user_key(),
558
0
                atomic_replace_range_->largest_internal_key.user_key(), lvl)) {
559
0
          return Status::InvalidArgument(
560
0
              "Atomic replace range overlaps with pending compaction");
561
0
        }
562
0
        for (auto file : vstorage->LevelFiles(lvl)) {
563
0
          if (file_range_checker_.Overlaps(*atomic_replace_range_,
564
0
                                           file->smallest, file->largest)) {
565
0
            if (file_range_checker_.Contains(*atomic_replace_range_,
566
0
                                             file->smallest, file->largest)) {
567
              // Set up to delete file to be replaced
568
0
              edit_.DeleteFile(lvl, file->fd.GetNumber());
569
0
            } else {
570
              // TODO: generate and ingest a tombstone file also
571
0
              return Status::InvalidArgument(
572
0
                  "Atomic replace range partially overlaps with existing file");
573
0
            }
574
0
          }
575
0
        }
576
0
      }
577
0
    }
578
0
  }
579
580
  // Find levels to ingest into
581
0
  std::optional<int> prev_batch_uppermost_level;
582
  // batches at the front of file_batches_to_ingest_ contains older updates and
583
  // are placed in smaller levels.
584
0
  for (auto& batch : file_batches_to_ingest_) {
585
0
    int batch_uppermost_level = 0;
586
0
    status = AssignLevelsForOneBatch(batch, super_version, force_global_seqno,
587
0
                                     &last_seqno, &batch_uppermost_level,
588
0
                                     prev_batch_uppermost_level);
589
0
    if (!status.ok()) {
590
0
      ROCKS_LOG_WARN(db_options_.info_log,
591
0
                     "Failed to assign levels for one batch: %s",
592
0
                     status.ToString().c_str());
593
0
      return status;
594
0
    }
595
596
0
    prev_batch_uppermost_level = batch_uppermost_level;
597
0
  }
598
599
0
  CreateEquivalentFileIngestingCompactions();
600
0
  return status;
601
0
}
602
603
Status ExternalSstFileIngestionJob::AssignLevelsForOneBatch(
604
    FileBatchInfo& batch, SuperVersion* super_version, bool force_global_seqno,
605
    SequenceNumber* last_seqno, int* batch_uppermost_level,
606
0
    std::optional<int> prev_batch_uppermost_level) {
607
0
  Status status;
608
0
  assert(batch_uppermost_level);
609
0
  *batch_uppermost_level = std::numeric_limits<int>::max();
610
0
  for (IngestedFileInfo* file : batch.files) {
611
0
    assert(file);
612
0
    SequenceNumber assigned_seqno = 0;
613
0
    if (ingestion_options_.ingest_behind) {
614
0
      status = CheckLevelForIngestedBehindFile(file);
615
0
    } else {
616
0
      status = AssignLevelAndSeqnoForIngestedFile(
617
0
          super_version, force_global_seqno, cfd_->ioptions().compaction_style,
618
0
          *last_seqno, file, &assigned_seqno, prev_batch_uppermost_level);
619
0
    }
620
621
    // Modify the smallest/largest internal key to include the sequence number
622
    // that we just learned. Only overwrite sequence number zero. There could
623
    // be a nonzero sequence number already to indicate a range tombstone's
624
    // exclusive endpoint.
625
0
    ParsedInternalKey smallest_parsed, largest_parsed;
626
0
    if (status.ok()) {
627
0
      status = ParseInternalKey(*(file->smallest_internal_key.rep()),
628
0
                                &smallest_parsed, false /* log_err_key */);
629
0
    }
630
0
    if (status.ok()) {
631
0
      status = ParseInternalKey(*(file->largest_internal_key.rep()),
632
0
                                &largest_parsed, false /* log_err_key */);
633
0
    }
634
0
    if (!status.ok()) {
635
0
      ROCKS_LOG_WARN(db_options_.info_log, "Failed to parse internal key: %s",
636
0
                     status.ToString().c_str());
637
0
      return status;
638
0
    }
639
640
    // If any ingested file overlaps with the DB, it will fail here.
641
0
    if (ingestion_options_.allow_db_generated_files && assigned_seqno != 0) {
642
0
      return Status::InvalidArgument(
643
0
          "An ingested file overlaps with existing data in the DB and has been "
644
0
          "assigned a non-zero sequence number, which is not allowed when "
645
0
          "'allow_db_generated_files' is enabled.");
646
0
    }
647
648
0
    if (smallest_parsed.sequence == 0 && assigned_seqno != 0) {
649
0
      UpdateInternalKey(file->smallest_internal_key.rep(), assigned_seqno,
650
0
                        smallest_parsed.type);
651
0
    }
652
0
    if (largest_parsed.sequence == 0 && assigned_seqno != 0) {
653
0
      UpdateInternalKey(file->largest_internal_key.rep(), assigned_seqno,
654
0
                        largest_parsed.type);
655
0
    }
656
657
0
    status = AssignGlobalSeqnoForIngestedFile(file, assigned_seqno);
658
0
    if (!status.ok()) {
659
0
      ROCKS_LOG_WARN(
660
0
          db_options_.info_log,
661
0
          "Failed to assign global sequence number for ingested file: %s",
662
0
          status.ToString().c_str());
663
0
      return status;
664
0
    }
665
0
    TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
666
0
                             &assigned_seqno);
667
0
    assert(assigned_seqno == 0 || assigned_seqno == *last_seqno + 1);
668
0
    if (assigned_seqno > *last_seqno) {
669
0
      *last_seqno = assigned_seqno;
670
0
    }
671
0
    max_assigned_seqno_ = std::max(max_assigned_seqno_, assigned_seqno);
672
673
0
    status = GenerateChecksumForIngestedFile(file);
674
0
    if (!status.ok()) {
675
0
      ROCKS_LOG_WARN(db_options_.info_log,
676
0
                     "Failed to generate checksum for ingested file: %s",
677
0
                     status.ToString().c_str());
678
0
      return status;
679
0
    }
680
681
    // We use the import time as the ancester time. This is the time the data
682
    // is written to the database.
683
0
    int64_t temp_current_time = 0;
684
0
    uint64_t current_time = kUnknownFileCreationTime;
685
0
    uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
686
0
    if (clock_->GetCurrentTime(&temp_current_time).ok()) {
687
0
      current_time = oldest_ancester_time =
688
0
          static_cast<uint64_t>(temp_current_time);
689
0
    }
690
0
    uint64_t tail_size = FileMetaData::CalculateTailSize(
691
0
        file->fd.GetFileSize(), file->table_properties);
692
693
0
    bool marked_for_compaction =
694
0
        file->table_properties.num_range_deletions == 1 &&
695
0
        (file->table_properties.num_entries ==
696
0
         file->table_properties.num_range_deletions);
697
0
    SequenceNumber smallest_seqno = file->assigned_seqno;
698
0
    SequenceNumber largest_seqno = file->assigned_seqno;
699
0
    if (ingestion_options_.allow_db_generated_files) {
700
0
      assert(file->assigned_seqno == 0);
701
0
      assert(file->smallest_seqno != kMaxSequenceNumber);
702
0
      assert(file->largest_seqno != kMaxSequenceNumber);
703
0
      smallest_seqno = file->smallest_seqno;
704
0
      largest_seqno = file->largest_seqno;
705
0
      max_assigned_seqno_ = std::max(max_assigned_seqno_, file->largest_seqno);
706
0
    }
707
0
    FileMetaData f_metadata(
708
0
        file->fd.GetNumber(), file->fd.GetPathId(), file->fd.GetFileSize(),
709
0
        file->smallest_internal_key, file->largest_internal_key, smallest_seqno,
710
0
        largest_seqno, false, file->file_temperature, kInvalidBlobFileNumber,
711
0
        oldest_ancester_time, current_time,
712
0
        ingestion_options_.ingest_behind
713
0
            ? kReservedEpochNumberForFileIngestedBehind
714
0
            : cfd_->NewEpochNumber(),  // orders files ingested to L0
715
0
        file->file_checksum, file->file_checksum_func_name, file->unique_id, 0,
716
0
        tail_size, file->user_defined_timestamps_persisted, "", "");
717
0
    f_metadata.temperature = file->file_temperature;
718
0
    f_metadata.marked_for_compaction = marked_for_compaction;
719
    // Extract min/max timestamps from table properties for UDT support.
720
    // This ensures ingested files have proper timestamp ranges in FileMetaData,
721
    // similar to files created by flush and compaction.
722
0
    ExtractTimestampFromTableProperties(file->table_properties, &f_metadata);
723
    // Retrieve file open metadata for fast SST open
724
0
    if (mutable_db_options_.fast_sst_open) {
725
0
      std::unique_ptr<FSRandomAccessFile> readable_file;
726
0
      FileOptions fopts{env_options_};
727
0
      fopts.file_checksum = f_metadata.file_checksum;
728
0
      fopts.file_checksum_func_name = f_metadata.file_checksum_func_name;
729
0
      IOStatus io_s = fs_->NewRandomAccessFile(file->internal_file_path, fopts,
730
0
                                               &readable_file, nullptr);
731
0
      if (io_s.ok()) {
732
0
        io_s =
733
0
            readable_file->GetFileOpenMetadata(&f_metadata.file_open_metadata);
734
0
        if (io_s.ok() && !f_metadata.file_open_metadata.empty() &&
735
0
            f_metadata.file_open_metadata.size() <=
736
0
                FSRandomAccessFile::kMaxFileOpenMetadataSize) {
737
0
          RecordTick(db_options_.stats, FILE_OPEN_METADATA_RETRIEVED);
738
0
        } else {
739
0
          if (io_s.ok() && f_metadata.file_open_metadata.size() >
740
0
                               FSRandomAccessFile::kMaxFileOpenMetadataSize) {
741
0
            ROCKS_LOG_WARN(db_options_.info_log,
742
0
                           "File open metadata for %s too large (%zu bytes), "
743
0
                           "ignoring",
744
0
                           file->internal_file_path.c_str(),
745
0
                           f_metadata.file_open_metadata.size());
746
0
          }
747
0
          f_metadata.file_open_metadata.clear();
748
0
        }
749
0
      }
750
0
    }
751
0
    edit_.AddFile(file->picked_level, f_metadata);
752
753
0
    *batch_uppermost_level =
754
0
        std::min(*batch_uppermost_level, file->picked_level);
755
0
  }
756
757
0
  return Status::OK();
758
0
}
759
760
0
void ExternalSstFileIngestionJob::CreateEquivalentFileIngestingCompactions() {
761
  // A map from output level to input of compactions equivalent to this
762
  // ingestion job.
763
  // TODO: simplify below logic to creating compaction per ingested file
764
  // instead of per output level, once we figure out how to treat ingested files
765
  // with adjacent range deletion tombstones to same output level in the same
766
  // job as non-overlapping compactions.
767
0
  std::map<int, CompactionInputFiles>
768
0
      output_level_to_file_ingesting_compaction_input;
769
770
0
  for (const auto& pair : edit_.GetNewFiles()) {
771
0
    int output_level = pair.first;
772
0
    const FileMetaData& f_metadata = pair.second;
773
774
0
    CompactionInputFiles& input =
775
0
        output_level_to_file_ingesting_compaction_input[output_level];
776
0
    if (input.files.empty()) {
777
      // Treat the source level of ingested files to be level 0
778
0
      input.level = 0;
779
0
    }
780
781
0
    compaction_input_metdatas_.push_back(new FileMetaData(f_metadata));
782
0
    input.files.push_back(compaction_input_metdatas_.back());
783
0
  }
784
785
0
  for (const auto& pair : output_level_to_file_ingesting_compaction_input) {
786
0
    int output_level = pair.first;
787
0
    const CompactionInputFiles& input = pair.second;
788
789
0
    const auto& mutable_cf_options = cfd_->GetLatestMutableCFOptions();
790
0
    file_ingesting_compactions_.push_back(new Compaction(
791
0
        cfd_->current()->storage_info(), cfd_->ioptions(), mutable_cf_options,
792
0
        mutable_db_options_, {input}, output_level,
793
        /* output file size limit not applicable */
794
0
        MaxFileSizeForLevel(mutable_cf_options, output_level,
795
0
                            cfd_->ioptions().compaction_style),
796
0
        LLONG_MAX /* max compaction bytes, not applicable */,
797
0
        0 /* output path ID, not applicable */, mutable_cf_options.compression,
798
0
        mutable_cf_options.compression_opts, Temperature::kUnknown,
799
0
        0 /* max_subcompaction, not applicable */,
800
0
        {} /* grandparents, not applicable */,
801
0
        std::nullopt /* earliest_snapshot */, nullptr /* snapshot_checker */,
802
0
        CompactionReason::kExternalSstIngestion, "" /* trim_ts */,
803
0
        -1 /* score, not applicable */,
804
0
        files_overlap_ /* l0_files_might_overlap, not applicable */));
805
0
  }
806
0
}
807
808
0
void ExternalSstFileIngestionJob::RegisterRange() {
809
0
  for (const auto& c : file_ingesting_compactions_) {
810
0
    cfd_->compaction_picker()->RegisterCompaction(c);
811
0
  }
812
0
}
813
814
0
void ExternalSstFileIngestionJob::UnregisterRange() {
815
0
  for (const auto& c : file_ingesting_compactions_) {
816
0
    cfd_->compaction_picker()->UnregisterCompaction(c);
817
0
    delete c;
818
0
  }
819
0
  file_ingesting_compactions_.clear();
820
821
0
  for (const auto& f : compaction_input_metdatas_) {
822
0
    delete f;
823
0
  }
824
0
  compaction_input_metdatas_.clear();
825
0
}
826
827
0
void ExternalSstFileIngestionJob::UpdateStats() {
828
  // Update internal stats for new ingested files
829
0
  uint64_t total_keys = 0;
830
0
  uint64_t total_l0_files = 0;
831
0
  uint64_t total_time = clock_->NowMicros() - job_start_time_;
832
833
0
  EventLoggerStream stream = event_logger_->Log();
834
0
  stream << "event" << "ingest_finished";
835
0
  stream << "files_ingested";
836
0
  stream.StartArray();
837
838
0
  for (IngestedFileInfo& f : files_to_ingest_) {
839
0
    InternalStats::CompactionStats stats(
840
0
        CompactionReason::kExternalSstIngestion, 1);
841
0
    stats.micros = total_time;
842
    // If actual copy occurred for this file, then we need to count the file
843
    // size as the actual bytes written. If the file was linked, then we ignore
844
    // the bytes written for file metadata.
845
    // TODO (yanqin) maybe account for file metadata bytes for exact accuracy?
846
0
    if (f.copy_file) {
847
0
      stats.bytes_written = f.fd.GetFileSize();
848
0
    } else {
849
0
      stats.bytes_moved = f.fd.GetFileSize();
850
0
    }
851
0
    stats.num_output_files = 1;
852
0
    cfd_->internal_stats()->AddCompactionStats(f.picked_level,
853
0
                                               Env::Priority::USER, stats);
854
0
    cfd_->internal_stats()->AddCFStats(InternalStats::BYTES_INGESTED_ADD_FILE,
855
0
                                       f.fd.GetFileSize());
856
0
    total_keys += f.num_entries;
857
0
    if (f.picked_level == 0) {
858
0
      total_l0_files += 1;
859
0
    }
860
0
    ROCKS_LOG_INFO(
861
0
        db_options_.info_log,
862
0
        "[AddFile] External SST file %s was ingested in L%d with path %s "
863
0
        "(global_seqno=%" PRIu64 ")\n",
864
0
        f.external_file_path.c_str(), f.picked_level,
865
0
        f.internal_file_path.c_str(), f.assigned_seqno);
866
0
    stream << "file" << f.internal_file_path << "level" << f.picked_level;
867
0
  }
868
0
  stream.EndArray();
869
870
0
  stream << "lsm_state";
871
0
  stream.StartArray();
872
0
  auto vstorage = cfd_->current()->storage_info();
873
0
  for (int level = 0; level < vstorage->num_levels(); ++level) {
874
0
    stream << vstorage->NumLevelFiles(level);
875
0
  }
876
0
  stream.EndArray();
877
878
0
  cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_KEYS_TOTAL,
879
0
                                     total_keys);
880
0
  cfd_->internal_stats()->AddCFStats(InternalStats::INGESTED_NUM_FILES_TOTAL,
881
0
                                     files_to_ingest_.size());
882
0
  cfd_->internal_stats()->AddCFStats(
883
0
      InternalStats::INGESTED_LEVEL0_NUM_FILES_TOTAL, total_l0_files);
884
0
}
885
886
0
void ExternalSstFileIngestionJob::Cleanup(const Status& status) {
887
0
  IOOptions io_opts;
888
0
  if (!status.ok()) {
889
    // We failed to add the files to the database
890
    // remove all the files we copied
891
0
    DeleteInternalFiles();
892
0
    files_overlap_ = false;
893
0
  } else if (status.ok() && ingestion_options_.move_files) {
894
    // The files were moved and added successfully, remove original file links
895
0
    for (IngestedFileInfo& f : files_to_ingest_) {
896
0
      Status s = fs_->DeleteFile(f.external_file_path, io_opts, nullptr);
897
0
      if (!s.ok()) {
898
0
        ROCKS_LOG_WARN(
899
0
            db_options_.info_log,
900
0
            "%s was added to DB successfully but failed to remove original "
901
0
            "file link : %s",
902
0
            f.external_file_path.c_str(), s.ToString().c_str());
903
0
      }
904
0
    }
905
0
  }
906
0
}
907
908
0
void ExternalSstFileIngestionJob::DeleteInternalFiles() {
909
0
  IOOptions io_opts;
910
0
  for (IngestedFileInfo& f : files_to_ingest_) {
911
0
    if (f.internal_file_path.empty()) {
912
0
      continue;
913
0
    }
914
0
    Status s = fs_->DeleteFile(f.internal_file_path, io_opts, nullptr);
915
0
    if (!s.ok()) {
916
0
      ROCKS_LOG_WARN(db_options_.info_log,
917
0
                     "AddFile() clean up for file %s failed : %s",
918
0
                     f.internal_file_path.c_str(), s.ToString().c_str());
919
0
    }
920
0
  }
921
0
}
922
923
Status ExternalSstFileIngestionJob::ResetTableReader(
924
    const std::string& external_file, uint64_t new_file_number,
925
    bool user_defined_timestamps_persisted, SuperVersion* sv,
926
    IngestedFileInfo* file_to_ingest,
927
0
    std::unique_ptr<TableReader>* table_reader) {
928
0
  std::unique_ptr<FSRandomAccessFile> sst_file;
929
0
  FileOptions fo{env_options_};
930
0
  fo.temperature = file_to_ingest->file_temperature;
931
0
  Status status =
932
0
      fs_->NewRandomAccessFile(external_file, fo, &sst_file, nullptr);
933
0
  if (!status.ok()) {
934
0
    ROCKS_LOG_WARN(
935
0
        db_options_.info_log,
936
0
        "Failed to create random access file for external file %s: %s",
937
0
        external_file.c_str(), status.ToString().c_str());
938
0
    return status;
939
0
  }
940
0
  Temperature updated_temp = sst_file->GetTemperature();
941
0
  if (updated_temp != Temperature::kUnknown &&
942
0
      updated_temp != file_to_ingest->file_temperature) {
943
    // The hint was missing or wrong. Track temperature reported by storage.
944
0
    file_to_ingest->file_temperature = updated_temp;
945
0
  }
946
0
  std::unique_ptr<RandomAccessFileReader> sst_file_reader(
947
0
      new RandomAccessFileReader(std::move(sst_file), external_file,
948
0
                                 nullptr /*Env*/, io_tracer_));
949
0
  table_reader->reset();
950
0
  ReadOptions ro;
951
0
  ro.fill_cache = ingestion_options_.fill_cache;
952
0
  status = sv->mutable_cf_options.table_factory->NewTableReader(
953
0
      ro,
954
0
      TableReaderOptions(
955
0
          cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
956
0
          sv->mutable_cf_options.compression_manager.get(), env_options_,
957
0
          cfd_->internal_comparator(),
958
0
          sv->mutable_cf_options.block_protection_bytes_per_key,
959
0
          /*skip_filters*/ false, /*immortal*/ false,
960
0
          /*force_direct_prefetch*/ false, /*level*/ -1,
961
0
          /*block_cache_tracer*/ nullptr,
962
0
          /*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
963
0
          /*cur_file_num*/ new_file_number,
964
0
          /* unique_id */ {}, /* largest_seqno */ 0,
965
0
          /* tail_size */ 0, user_defined_timestamps_persisted),
966
0
      std::move(sst_file_reader), file_to_ingest->file_size, table_reader,
967
      // No need to prefetch index/filter if caching is not needed.
968
0
      /*prefetch_index_and_filter_in_cache=*/ingestion_options_.fill_cache);
969
0
  return status;
970
0
}
971
972
Status ExternalSstFileIngestionJob::SanityCheckTableProperties(
973
    const std::string& external_file, uint64_t new_file_number,
974
    SuperVersion* sv, IngestedFileInfo* file_to_ingest,
975
0
    std::unique_ptr<TableReader>* table_reader) {
976
  // Get the external file properties
977
0
  auto props = table_reader->get()->GetTableProperties();
978
0
  assert(props.get());
979
0
  const auto& uprops = props->user_collected_properties;
980
981
  // Get table version
982
0
  auto version_iter = uprops.find(ExternalSstFilePropertyNames::kVersion);
983
0
  if (version_iter == uprops.end()) {
984
0
    assert(!SstFileWriter::CreatedBySstFileWriter(*props));
985
0
    if (!ingestion_options_.allow_db_generated_files) {
986
0
      return Status::Corruption("External file version not found");
987
0
    } else {
988
      // 0 is special version for when a file from live DB does not have the
989
      // version table property
990
0
      file_to_ingest->version = 0;
991
0
    }
992
0
  } else {
993
0
    assert(SstFileWriter::CreatedBySstFileWriter(*props));
994
0
    file_to_ingest->version = DecodeFixed32(version_iter->second.c_str());
995
0
  }
996
997
0
  auto seqno_iter = uprops.find(ExternalSstFilePropertyNames::kGlobalSeqno);
998
0
  if (file_to_ingest->version == 2) {
999
    // version 2 imply that we have global sequence number
1000
0
    if (seqno_iter == uprops.end()) {
1001
0
      return Status::Corruption(
1002
0
          "External file global sequence number not found");
1003
0
    }
1004
1005
    // Set the global sequence number
1006
0
    file_to_ingest->original_seqno = DecodeFixed64(seqno_iter->second.c_str());
1007
0
    if (props->external_sst_file_global_seqno_offset == 0) {
1008
0
      file_to_ingest->global_seqno_offset = 0;
1009
0
      return Status::Corruption("Was not able to find file global seqno field");
1010
0
    }
1011
0
    file_to_ingest->global_seqno_offset =
1012
0
        static_cast<size_t>(props->external_sst_file_global_seqno_offset);
1013
0
  } else if (file_to_ingest->version == 1) {
1014
    // SST file V1 should not have global seqno field
1015
0
    assert(seqno_iter == uprops.end());
1016
0
    file_to_ingest->original_seqno = 0;
1017
0
    if (ingestion_options_.allow_blocking_flush ||
1018
0
        ingestion_options_.allow_global_seqno) {
1019
0
      return Status::InvalidArgument(
1020
0
          "External SST file V1 does not support global seqno");
1021
0
    }
1022
0
  } else if (file_to_ingest->version == 0) {
1023
    // allow_db_generated_files is true
1024
0
    assert(seqno_iter == uprops.end());
1025
0
    file_to_ingest->original_seqno = 0;
1026
0
    file_to_ingest->global_seqno_offset = 0;
1027
0
  } else {
1028
0
    return Status::InvalidArgument("External file version " +
1029
0
                                   std::to_string(file_to_ingest->version) +
1030
0
                                   " is not supported");
1031
0
  }
1032
1033
0
  file_to_ingest->cf_id = static_cast<uint32_t>(props->column_family_id);
1034
  // This assignment works fine even though `table_reader` may later be reset,
1035
  // since that will not affect how table properties are parsed, and this
1036
  // assignment is making a copy.
1037
0
  file_to_ingest->table_properties = *props;
1038
1039
  // Get number of entries in table
1040
0
  file_to_ingest->num_entries = props->num_entries;
1041
0
  file_to_ingest->num_range_deletions = props->num_range_deletions;
1042
1043
  // Validate table properties related to comparator name and user defined
1044
  // timestamps persisted flag.
1045
0
  file_to_ingest->user_defined_timestamps_persisted =
1046
0
      static_cast<bool>(props->user_defined_timestamps_persisted);
1047
0
  bool mark_sst_file_has_no_udt = false;
1048
0
  Status s = ValidateUserDefinedTimestampsOptions(
1049
0
      cfd_->user_comparator(), props->comparator_name,
1050
0
      cfd_->ioptions().persist_user_defined_timestamps,
1051
0
      file_to_ingest->user_defined_timestamps_persisted,
1052
0
      &mark_sst_file_has_no_udt);
1053
0
  if (s.ok() && mark_sst_file_has_no_udt) {
1054
    // A column family that enables user-defined timestamps in Memtable only
1055
    // feature can also ingest external files created by a setting that disables
1056
    // user-defined timestamps. In that case, we need to re-mark the
1057
    // user_defined_timestamps_persisted flag for the file.
1058
0
    file_to_ingest->user_defined_timestamps_persisted = false;
1059
0
  } else if (!s.ok()) {
1060
0
    ROCKS_LOG_WARN(
1061
0
        db_options_.info_log,
1062
0
        "ValidateUserDefinedTimestampsOptions failed for external file %s: %s",
1063
0
        external_file.c_str(), s.ToString().c_str());
1064
0
    return s;
1065
0
  }
1066
1067
  // `TableReader` is initialized with `user_defined_timestamps_persisted` flag
1068
  // to be true. If its value changed to false after this sanity check, we
1069
  // need to reset the `TableReader`.
1070
0
  if (ucmp_->timestamp_size() > 0 &&
1071
0
      !file_to_ingest->user_defined_timestamps_persisted) {
1072
0
    s = ResetTableReader(external_file, new_file_number,
1073
0
                         file_to_ingest->user_defined_timestamps_persisted, sv,
1074
0
                         file_to_ingest, table_reader);
1075
0
  }
1076
0
  return s;
1077
0
}
1078
1079
Status ExternalSstFileIngestionJob::GetIngestedFileInfo(
1080
    const std::string& external_file, uint64_t new_file_number,
1081
0
    IngestedFileInfo* file_to_ingest, SuperVersion* sv) {
1082
0
  file_to_ingest->external_file_path = external_file;
1083
1084
  // Get external file size
1085
0
  Status status = fs_->GetFileSize(external_file, IOOptions(),
1086
0
                                   &file_to_ingest->file_size, nullptr);
1087
0
  if (!status.ok()) {
1088
0
    ROCKS_LOG_WARN(db_options_.info_log,
1089
0
                   "Failed to get file size for external file %s: %s",
1090
0
                   external_file.c_str(), status.ToString().c_str());
1091
0
    return status;
1092
0
  }
1093
1094
  // Assign FD with number
1095
0
  file_to_ingest->fd =
1096
0
      FileDescriptor(new_file_number, 0, file_to_ingest->file_size);
1097
1098
  // Create TableReader for external file
1099
0
  std::unique_ptr<TableReader> table_reader;
1100
  // Initially create the `TableReader` with flag
1101
  // `user_defined_timestamps_persisted` to be true since that's the most common
1102
  // case
1103
0
  status = ResetTableReader(external_file, new_file_number,
1104
0
                            /*user_defined_timestamps_persisted=*/true, sv,
1105
0
                            file_to_ingest, &table_reader);
1106
0
  if (!status.ok()) {
1107
0
    ROCKS_LOG_WARN(db_options_.info_log,
1108
0
                   "Failed to reset table reader for external file %s: %s",
1109
0
                   external_file.c_str(), status.ToString().c_str());
1110
0
    return status;
1111
0
  }
1112
1113
0
  status = SanityCheckTableProperties(external_file, new_file_number, sv,
1114
0
                                      file_to_ingest, &table_reader);
1115
0
  if (!status.ok()) {
1116
0
    ROCKS_LOG_WARN(
1117
0
        db_options_.info_log,
1118
0
        "Failed to sanity check table properties for external file %s: %s",
1119
0
        external_file.c_str(), status.ToString().c_str());
1120
0
    return status;
1121
0
  }
1122
1123
0
  const bool allow_data_in_errors = db_options_.allow_data_in_errors;
1124
0
  ParsedInternalKey key;
1125
0
  if (ingestion_options_.allow_db_generated_files) {
1126
    // We are ingesting a DB generated SST file for which we don't reassign
1127
    // sequence numbers. We need its smallest sequence number and largest
1128
    // sequence number for FileMetaData.
1129
0
    Status seqno_status = GetSeqnoBoundaryForFile(
1130
0
        table_reader.get(), sv, file_to_ingest, allow_data_in_errors);
1131
1132
0
    if (!seqno_status.ok()) {
1133
0
      ROCKS_LOG_WARN(
1134
0
          db_options_.info_log,
1135
0
          "Failed to get sequence number boundary for external file %s: %s",
1136
0
          external_file.c_str(), seqno_status.ToString().c_str());
1137
0
      return seqno_status;
1138
0
    }
1139
0
    assert(file_to_ingest->smallest_seqno <= file_to_ingest->largest_seqno);
1140
0
    assert(file_to_ingest->largest_seqno < kMaxSequenceNumber);
1141
0
  } else {
1142
0
    SequenceNumber largest_seqno =
1143
0
        table_reader.get()->GetTableProperties()->key_largest_seqno;
1144
    // UINT64_MAX means unknown and the file is generated before table property
1145
    // `key_largest_seqno` is introduced.
1146
0
    if (largest_seqno != UINT64_MAX && largest_seqno > 0) {
1147
0
      return Status::Corruption(
1148
0
          "External file has non zero largest sequence number " +
1149
0
          std::to_string(largest_seqno));
1150
0
    }
1151
0
  }
1152
1153
0
  if (ingestion_options_.verify_checksums_before_ingest) {
1154
    // If customized readahead size is needed, we can pass a user option
1155
    // all the way to here. Right now we just rely on the default readahead
1156
    // to keep things simple.
1157
    // TODO: plumb Env::IOActivity, Env::IOPriority
1158
0
    ReadOptions ro;
1159
0
    ro.readahead_size = ingestion_options_.verify_checksums_readahead_size;
1160
0
    ro.fill_cache = ingestion_options_.fill_cache;
1161
0
    status = table_reader->VerifyChecksum(
1162
0
        ro, TableReaderCaller::kExternalSSTIngestion);
1163
0
    if (!status.ok()) {
1164
0
      ROCKS_LOG_WARN(db_options_.info_log,
1165
0
                     "Failed to verify checksum for table reader: %s",
1166
0
                     status.ToString().c_str());
1167
0
      return status;
1168
0
    }
1169
0
  }
1170
1171
  // TODO: plumb Env::IOActivity, Env::IOPriority
1172
0
  ReadOptions ro;
1173
0
  ro.fill_cache = ingestion_options_.fill_cache;
1174
0
  std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
1175
0
      ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
1176
0
      /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
1177
1178
  // Get first (smallest) and last (largest) key from file.
1179
0
  iter->SeekToFirst();
1180
0
  if (iter->Valid()) {
1181
0
    Status pik_status =
1182
0
        ParseInternalKey(iter->key(), &key, allow_data_in_errors);
1183
0
    if (!pik_status.ok()) {
1184
0
      return Status::Corruption("Corrupted key in external file. ",
1185
0
                                pik_status.getState());
1186
0
    }
1187
0
    if (key.sequence != 0 && !ingestion_options_.allow_db_generated_files) {
1188
0
      return Status::Corruption("External file has non zero sequence number");
1189
0
    }
1190
0
    file_to_ingest->smallest_internal_key.SetFrom(key);
1191
1192
0
    Slice largest;
1193
0
    if (strcmp(sv->mutable_cf_options.table_factory->Name(), "PlainTable") ==
1194
0
        0) {
1195
      // PlainTable iterator does not support SeekToLast().
1196
0
      largest = iter->key();
1197
0
      for (; iter->Valid(); iter->Next()) {
1198
0
        if (cfd_->internal_comparator().Compare(iter->key(), largest) > 0) {
1199
0
          largest = iter->key();
1200
0
        }
1201
0
      }
1202
0
      if (!iter->status().ok()) {
1203
0
        return iter->status();
1204
0
      }
1205
0
    } else {
1206
0
      iter->SeekToLast();
1207
0
      if (!iter->Valid()) {
1208
0
        if (iter->status().ok()) {
1209
          // The file contains at least 1 key since iter is valid after
1210
          // SeekToFirst().
1211
0
          return Status::Corruption("Can not find largest key in sst file");
1212
0
        } else {
1213
0
          return iter->status();
1214
0
        }
1215
0
      }
1216
0
      largest = iter->key();
1217
0
    }
1218
1219
0
    pik_status = ParseInternalKey(largest, &key, allow_data_in_errors);
1220
0
    if (!pik_status.ok()) {
1221
0
      return Status::Corruption("Corrupted key in external file. ",
1222
0
                                pik_status.getState());
1223
0
    }
1224
0
    if (key.sequence != 0 && !ingestion_options_.allow_db_generated_files) {
1225
0
      return Status::Corruption("External file has non zero sequence number");
1226
0
    }
1227
0
    file_to_ingest->largest_internal_key.SetFrom(key);
1228
0
  } else if (!iter->status().ok()) {
1229
0
    return iter->status();
1230
0
  }
1231
1232
0
  std::unique_ptr<InternalIterator> range_del_iter(
1233
0
      table_reader->NewRangeTombstoneIterator(ro));
1234
  // We may need to adjust these key bounds, depending on whether any range
1235
  // deletion tombstones extend past them.
1236
0
  if (range_del_iter != nullptr) {
1237
0
    for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
1238
0
         range_del_iter->Next()) {
1239
0
      Status pik_status =
1240
0
          ParseInternalKey(range_del_iter->key(), &key, allow_data_in_errors);
1241
0
      if (!pik_status.ok()) {
1242
0
        return Status::Corruption("Corrupted key in external file. ",
1243
0
                                  pik_status.getState());
1244
0
      }
1245
0
      if (key.sequence != 0 && !ingestion_options_.allow_db_generated_files) {
1246
0
        return Status::Corruption(
1247
0
            "External file has a range deletion with non zero sequence "
1248
0
            "number.");
1249
0
      }
1250
0
      RangeTombstone tombstone(key, range_del_iter->value());
1251
0
      file_range_checker_.MaybeUpdateRange(tombstone.SerializeKey(),
1252
0
                                           tombstone.SerializeEndKey(),
1253
0
                                           file_to_ingest);
1254
0
    }
1255
0
  }
1256
1257
0
  const size_t ts_sz = ucmp_->timestamp_size();
1258
0
  Slice smallest = file_to_ingest->smallest_internal_key.user_key();
1259
0
  Slice largest = file_to_ingest->largest_internal_key.user_key();
1260
0
  if (ts_sz > 0) {
1261
0
    AppendUserKeyWithMaxTimestamp(&file_to_ingest->start_ukey, smallest, ts_sz);
1262
0
    AppendUserKeyWithMinTimestamp(&file_to_ingest->limit_ukey, largest, ts_sz);
1263
0
  } else {
1264
0
    file_to_ingest->start_ukey.assign(smallest.data(), smallest.size());
1265
0
    file_to_ingest->limit_ukey.assign(largest.data(), largest.size());
1266
0
  }
1267
1268
0
  auto s =
1269
0
      GetSstInternalUniqueId(file_to_ingest->table_properties.db_id,
1270
0
                             file_to_ingest->table_properties.db_session_id,
1271
0
                             file_to_ingest->table_properties.orig_file_number,
1272
0
                             &(file_to_ingest->unique_id));
1273
0
  if (!s.ok()) {
1274
0
    ROCKS_LOG_WARN(db_options_.info_log,
1275
0
                   "Failed to get SST unique id for file %s",
1276
0
                   file_to_ingest->internal_file_path.c_str());
1277
0
    file_to_ingest->unique_id = kNullUniqueId64x2;
1278
0
  }
1279
1280
0
  return status;
1281
0
}
1282
1283
Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
1284
    SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
1285
    SequenceNumber last_seqno, IngestedFileInfo* file_to_ingest,
1286
    SequenceNumber* assigned_seqno,
1287
0
    std::optional<int> prev_batch_uppermost_level) {
1288
0
  Status status;
1289
0
  *assigned_seqno = 0;
1290
0
  const size_t ts_sz = ucmp_->timestamp_size();
1291
0
  assert(!prev_batch_uppermost_level.has_value() ||
1292
0
         prev_batch_uppermost_level.value() < cfd_->NumberLevels());
1293
0
  bool must_assign_to_l0 = (prev_batch_uppermost_level.has_value() &&
1294
0
                            prev_batch_uppermost_level.value() == 0) ||
1295
0
                           compaction_style == kCompactionStyleFIFO;
1296
1297
0
  if (force_global_seqno || (!ingestion_options_.allow_db_generated_files &&
1298
0
                             (files_overlap_ || must_assign_to_l0))) {
1299
0
    *assigned_seqno = last_seqno + 1;
1300
0
    if (must_assign_to_l0) {
1301
0
      assert(ts_sz == 0);
1302
0
      file_to_ingest->picked_level = 0;
1303
0
      if (ingestion_options_.fail_if_not_bottommost_level &&
1304
0
          cfd_->NumberLevels() > 1) {
1305
0
        status = Status::TryAgain(
1306
0
            "Files cannot be ingested to Lmax. Please make sure key range of "
1307
0
            "Lmax does not overlap with files to ingest.");
1308
0
      }
1309
0
      return status;
1310
0
    }
1311
0
  }
1312
1313
0
  bool overlap_with_db = false;
1314
0
  Arena arena;
1315
  // TODO: plumb Env::IOActivity, Env::IOPriority
1316
0
  ReadOptions ro;
1317
0
  ro.fill_cache = ingestion_options_.fill_cache;
1318
0
  ro.total_order_seek = true;
1319
0
  int target_level = 0;
1320
0
  auto* vstorage = cfd_->current()->storage_info();
1321
0
  assert(!must_assign_to_l0 || ingestion_options_.allow_db_generated_files);
1322
0
  int assigned_level_exclusive_end = cfd_->NumberLevels();
1323
0
  if (must_assign_to_l0) {
1324
0
    assigned_level_exclusive_end = 0;
1325
0
  } else if (prev_batch_uppermost_level.has_value()) {
1326
0
    assigned_level_exclusive_end = prev_batch_uppermost_level.value();
1327
0
  }
1328
1329
  // When ingesting db generated files, we require that ingested files do not
1330
  // overlap with any file in the DB. So we need to check all levels.
1331
0
  int overlap_checking_exclusive_end =
1332
0
      ingestion_options_.allow_db_generated_files
1333
0
          ? cfd_->NumberLevels()
1334
0
          : assigned_level_exclusive_end;
1335
0
  for (int lvl = 0; lvl < overlap_checking_exclusive_end; lvl++) {
1336
0
    if (lvl > 0 && lvl < vstorage->base_level()) {
1337
0
      continue;
1338
0
    }
1339
0
    if (lvl < assigned_level_exclusive_end &&
1340
0
        atomic_replace_range_.has_value()) {
1341
0
      target_level = lvl;
1342
0
      continue;
1343
0
    }
1344
0
    if (cfd_->RangeOverlapWithCompaction(file_to_ingest->start_ukey,
1345
0
                                         file_to_ingest->limit_ukey, lvl)) {
1346
      // We must use L0 or any level higher than `lvl` to be able to overwrite
1347
      // the compaction output keys that we overlap with in this level, We also
1348
      // need to assign this file a seqno to overwrite the compaction output
1349
      // keys in level `lvl`
1350
0
      overlap_with_db = true;
1351
0
      break;
1352
0
    } else if (vstorage->NumLevelFiles(lvl) > 0) {
1353
0
      bool overlap_with_level = false;
1354
0
      status = sv->current->OverlapWithLevelIterator(
1355
0
          ro, env_options_, file_to_ingest->start_ukey,
1356
0
          file_to_ingest->limit_ukey, lvl, &overlap_with_level);
1357
0
      if (!status.ok()) {
1358
0
        ROCKS_LOG_WARN(db_options_.info_log,
1359
0
                       "Failed to check overlap with level iterator: %s",
1360
0
                       status.ToString().c_str());
1361
0
        return status;
1362
0
      }
1363
0
      if (overlap_with_level) {
1364
        // We must use L0 or any level higher than `lvl` to be able to overwrite
1365
        // the keys that we overlap with in this level, We also need to assign
1366
        // this file a seqno to overwrite the existing keys in level `lvl`
1367
0
        overlap_with_db = true;
1368
0
        break;
1369
0
      }
1370
0
    }
1371
1372
    // We don't overlap with any keys in this level, but we still need to check
1373
    // if our file can fit in it
1374
0
    if (lvl < assigned_level_exclusive_end &&
1375
0
        IngestedFileFitInLevel(file_to_ingest, lvl)) {
1376
0
      target_level = lvl;
1377
0
    }
1378
0
  }
1379
1380
0
  if (ingestion_options_.fail_if_not_bottommost_level &&
1381
0
      target_level < cfd_->NumberLevels() - 1) {
1382
0
    status = Status::TryAgain(
1383
0
        "Files cannot be ingested to Lmax. Please make sure key range of Lmax "
1384
0
        "and ongoing compaction's output to Lmax does not overlap with files "
1385
0
        "to ingest. Input files overlapping with each other can cause some "
1386
0
        "file to be assigned to non Lmax level.");
1387
0
    return status;
1388
0
  }
1389
1390
0
  TEST_SYNC_POINT_CALLBACK(
1391
0
      "ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
1392
0
      &overlap_with_db);
1393
0
  file_to_ingest->picked_level = target_level;
1394
0
  if (overlap_with_db) {
1395
0
    if (ts_sz > 0) {
1396
0
      status = Status::InvalidArgument(
1397
0
          "Column family enables user-defined timestamps, please make sure the "
1398
0
          "key range (without timestamp) of external file does not overlap "
1399
0
          "with key range (without timestamp) in the db");
1400
0
      return status;
1401
0
    }
1402
0
    if (*assigned_seqno == 0) {
1403
0
      *assigned_seqno = last_seqno + 1;
1404
0
    }
1405
0
  }
1406
1407
0
  return status;
1408
0
}
1409
1410
Status ExternalSstFileIngestionJob::CheckLevelForIngestedBehindFile(
1411
0
    IngestedFileInfo* file_to_ingest) {
1412
0
  assert(!atomic_replace_range_.has_value());
1413
1414
0
  auto* vstorage = cfd_->current()->storage_info();
1415
  // First, check if new files fit in the last level
1416
0
  int last_lvl = cfd_->NumberLevels() - 1;
1417
0
  if (!IngestedFileFitInLevel(file_to_ingest, last_lvl)) {
1418
0
    return Status::InvalidArgument(
1419
0
        "Can't ingest_behind file as it doesn't fit "
1420
0
        "at the last level!");
1421
0
  }
1422
1423
  // Second, check if despite cf_allow_ingest_behind=true we still have 0
1424
  // seqnums at some upper level
1425
0
  for (int lvl = 0; lvl < cfd_->NumberLevels() - 1; lvl++) {
1426
0
    for (auto file : vstorage->LevelFiles(lvl)) {
1427
0
      if (file->fd.smallest_seqno == 0) {
1428
0
        return Status::InvalidArgument(
1429
0
            "Can't ingest_behind file as despite cf_allow_ingest_behind=true "
1430
0
            "there are files with 0 seqno in database at upper levels!");
1431
0
      }
1432
0
    }
1433
0
  }
1434
1435
0
  file_to_ingest->picked_level = last_lvl;
1436
0
  return Status::OK();
1437
0
}
1438
1439
Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile(
1440
0
    IngestedFileInfo* file_to_ingest, SequenceNumber seqno) {
1441
0
  if (ingestion_options_.allow_db_generated_files) {
1442
0
    assert(seqno == 0);
1443
0
    assert(file_to_ingest->original_seqno == 0);
1444
0
  }
1445
0
  if (file_to_ingest->original_seqno == seqno) {
1446
    // This file already has the correct global seqno.
1447
0
    return Status::OK();
1448
0
  } else if (!ingestion_options_.allow_global_seqno) {
1449
0
    return Status::InvalidArgument("Global seqno is required, but disabled");
1450
0
  } else if (ingestion_options_.write_global_seqno &&
1451
0
             file_to_ingest->global_seqno_offset == 0) {
1452
0
    return Status::InvalidArgument(
1453
0
        "Trying to set global seqno for a file that don't have a global seqno "
1454
0
        "field");
1455
0
  }
1456
1457
0
  if (ingestion_options_.write_global_seqno) {
1458
    // Determine if we can write global_seqno to a given offset of file.
1459
    // If the file system does not support random write, then we should not.
1460
    // Otherwise we should.
1461
0
    std::unique_ptr<FSRandomRWFile> rwfile;
1462
0
    Status status = fs_->NewRandomRWFile(file_to_ingest->internal_file_path,
1463
0
                                         env_options_, &rwfile, nullptr);
1464
0
    TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::NewRandomRWFile",
1465
0
                             &status);
1466
0
    if (status.ok()) {
1467
0
      FSRandomRWFilePtr fsptr(std::move(rwfile), io_tracer_,
1468
0
                              file_to_ingest->internal_file_path);
1469
0
      std::string seqno_val;
1470
0
      PutFixed64(&seqno_val, seqno);
1471
0
      status = fsptr->Write(file_to_ingest->global_seqno_offset, seqno_val,
1472
0
                            IOOptions(), nullptr);
1473
0
      if (!status.ok()) {
1474
0
        ROCKS_LOG_WARN(db_options_.info_log,
1475
0
                       "Failed to write global seqno to %s: %s",
1476
0
                       file_to_ingest->internal_file_path.c_str(),
1477
0
                       status.ToString().c_str());
1478
0
        return status;
1479
0
      }
1480
1481
0
      if (status.ok()) {
1482
0
        TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno");
1483
0
        status = SyncIngestedFile(fsptr.get());
1484
0
        TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno");
1485
0
        if (!status.ok()) {
1486
0
          ROCKS_LOG_WARN(db_options_.info_log,
1487
0
                         "Failed to sync ingested file %s after writing global "
1488
0
                         "sequence number: %s",
1489
0
                         file_to_ingest->internal_file_path.c_str(),
1490
0
                         status.ToString().c_str());
1491
0
        }
1492
0
      }
1493
0
      if (!status.ok()) {
1494
0
        return status;
1495
0
      }
1496
0
    } else if (!status.IsNotSupported()) {
1497
0
      ROCKS_LOG_WARN(
1498
0
          db_options_.info_log,
1499
0
          "Failed to open ingested file %s for random read/write: %s",
1500
0
          file_to_ingest->internal_file_path.c_str(),
1501
0
          status.ToString().c_str());
1502
0
      return status;
1503
0
    }
1504
0
  }
1505
1506
0
  file_to_ingest->assigned_seqno = seqno;
1507
0
  return Status::OK();
1508
0
}
1509
1510
IOStatus ExternalSstFileIngestionJob::GenerateChecksumForIngestedFile(
1511
0
    IngestedFileInfo* file_to_ingest) {
1512
0
  if (db_options_.file_checksum_gen_factory == nullptr ||
1513
0
      need_generate_file_checksum_ == false ||
1514
0
      ingestion_options_.write_global_seqno == false) {
1515
    // If file_checksum_gen_factory is not set, we are not able to generate
1516
    // the checksum. if write_global_seqno is false, it means we will use
1517
    // file checksum generated during Prepare(). This step will be skipped.
1518
0
    return IOStatus::OK();
1519
0
  }
1520
0
  std::string file_checksum;
1521
0
  std::string file_checksum_func_name;
1522
0
  std::string requested_checksum_func_name;
1523
  // TODO: rate limit file reads for checksum calculation during file ingestion.
1524
  // TODO: plumb Env::IOActivity
1525
0
  ReadOptions ro;
1526
0
  FileOptions gen_fopts;
1527
0
  gen_fopts.file_checksum_func_name = kNoFileChecksumFuncName;
1528
0
  IOStatus io_s = GenerateOneFileChecksum(
1529
0
      fs_.get(), file_to_ingest->internal_file_path,
1530
0
      db_options_.file_checksum_gen_factory.get(), requested_checksum_func_name,
1531
0
      &file_checksum, &file_checksum_func_name,
1532
0
      ingestion_options_.verify_checksums_readahead_size,
1533
0
      db_options_.allow_mmap_reads, io_tracer_, db_options_.rate_limiter.get(),
1534
0
      ro, db_options_.stats, db_options_.clock, gen_fopts);
1535
0
  if (!io_s.ok()) {
1536
0
    ROCKS_LOG_WARN(
1537
0
        db_options_.info_log, "Failed to generate checksum for %s: %s",
1538
0
        file_to_ingest->internal_file_path.c_str(), io_s.ToString().c_str());
1539
0
    return io_s;
1540
0
  }
1541
0
  file_to_ingest->file_checksum = std::move(file_checksum);
1542
0
  file_to_ingest->file_checksum_func_name = std::move(file_checksum_func_name);
1543
0
  return IOStatus::OK();
1544
0
}
1545
1546
bool ExternalSstFileIngestionJob::IngestedFileFitInLevel(
1547
0
    const IngestedFileInfo* file_to_ingest, int level) {
1548
0
  if (level == 0) {
1549
    // Files can always fit in L0
1550
0
    return true;
1551
0
  }
1552
1553
0
  auto* vstorage = cfd_->current()->storage_info();
1554
0
  Slice file_smallest_user_key(file_to_ingest->start_ukey);
1555
0
  Slice file_largest_user_key(file_to_ingest->limit_ukey);
1556
1557
0
  if (vstorage->OverlapInLevel(level, &file_smallest_user_key,
1558
0
                               &file_largest_user_key)) {
1559
    // File overlap with another files in this level, we cannot
1560
    // add it to this level
1561
0
    return false;
1562
0
  }
1563
1564
  // File did not overlap with level files, nor compaction output
1565
0
  return true;
1566
0
}
1567
1568
template <typename TWritableFile>
1569
0
Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) {
1570
0
  assert(file != nullptr);
1571
0
  if (db_options_.use_fsync) {
1572
0
    return file->Fsync(IOOptions(), nullptr);
1573
0
  } else {
1574
0
    return file->Sync(IOOptions(), nullptr);
1575
0
  }
1576
0
}
Unexecuted instantiation: rocksdb::Status rocksdb::ExternalSstFileIngestionJob::SyncIngestedFile<rocksdb::FSWritableFile>(rocksdb::FSWritableFile*)
Unexecuted instantiation: rocksdb::Status rocksdb::ExternalSstFileIngestionJob::SyncIngestedFile<rocksdb::FSRandomRWFile>(rocksdb::FSRandomRWFile*)
1577
1578
Status ExternalSstFileIngestionJob::GetSeqnoBoundaryForFile(
1579
    TableReader* table_reader, SuperVersion* sv,
1580
0
    IngestedFileInfo* file_to_ingest, bool allow_data_in_errors) {
1581
0
  const auto tp = table_reader->GetTableProperties();
1582
0
  const bool has_largest_seqno = tp->HasKeyLargestSeqno();
1583
0
  SequenceNumber largest_seqno = tp->key_largest_seqno;
1584
0
  if (has_largest_seqno) {
1585
0
    file_to_ingest->largest_seqno = largest_seqno;
1586
0
    if (largest_seqno == 0) {
1587
0
      file_to_ingest->smallest_seqno = 0;
1588
0
      return Status::OK();
1589
0
    }
1590
0
    if (tp->HasKeySmallestSeqno()) {
1591
0
      file_to_ingest->smallest_seqno = tp->key_smallest_seqno;
1592
0
      return Status::OK();
1593
0
    }
1594
0
  }
1595
1596
  // For older SST files they may not be recorded in table properties, so
1597
  // we scan the file to find out.
1598
0
  TEST_SYNC_POINT(
1599
0
      "ExternalSstFileIngestionJob::GetSeqnoBoundaryForFile:FileScan");
1600
0
  SequenceNumber smallest_seqno = kMaxSequenceNumber;
1601
0
  SequenceNumber largest_seqno_from_iter = 0;
1602
0
  ReadOptions ro;
1603
0
  ro.fill_cache = ingestion_options_.fill_cache;
1604
0
  std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
1605
0
      ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
1606
0
      /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
1607
0
  ParsedInternalKey key;
1608
0
  iter->SeekToFirst();
1609
0
  while (iter->Valid()) {
1610
0
    Status pik_status =
1611
0
        ParseInternalKey(iter->key(), &key, allow_data_in_errors);
1612
0
    if (!pik_status.ok()) {
1613
0
      return Status::Corruption("Corrupted key in external file. ",
1614
0
                                pik_status.getState());
1615
0
    }
1616
0
    smallest_seqno = std::min(smallest_seqno, key.sequence);
1617
0
    largest_seqno_from_iter = std::max(largest_seqno_from_iter, key.sequence);
1618
0
    iter->Next();
1619
0
  }
1620
0
  if (!iter->status().ok()) {
1621
0
    return iter->status();
1622
0
  }
1623
1624
0
  if (table_reader->GetTableProperties()->num_range_deletions > 0) {
1625
0
    std::unique_ptr<InternalIterator> range_del_iter(
1626
0
        table_reader->NewRangeTombstoneIterator(ro));
1627
0
    if (range_del_iter != nullptr) {
1628
0
      for (range_del_iter->SeekToFirst(); range_del_iter->Valid();
1629
0
           range_del_iter->Next()) {
1630
0
        Status pik_status =
1631
0
            ParseInternalKey(range_del_iter->key(), &key, allow_data_in_errors);
1632
0
        if (!pik_status.ok()) {
1633
0
          return Status::Corruption("Corrupted key in external file. ",
1634
0
                                    pik_status.getState());
1635
0
        }
1636
0
        smallest_seqno = std::min(smallest_seqno, key.sequence);
1637
0
        largest_seqno_from_iter =
1638
0
            std::max(largest_seqno_from_iter, key.sequence);
1639
0
      }
1640
0
      if (!range_del_iter->status().ok()) {
1641
0
        return range_del_iter->status();
1642
0
      }
1643
0
    }
1644
0
  }
1645
1646
0
  file_to_ingest->smallest_seqno = smallest_seqno;
1647
0
  if (!has_largest_seqno) {
1648
0
    file_to_ingest->largest_seqno = largest_seqno_from_iter;
1649
0
  } else {
1650
0
    assert(largest_seqno == largest_seqno_from_iter);
1651
0
    file_to_ingest->largest_seqno = largest_seqno;
1652
0
  }
1653
1654
0
  if (file_to_ingest->largest_seqno == kMaxSequenceNumber) {
1655
0
    return Status::InvalidArgument(
1656
0
        "Unknown smallest seqno for db generated file.");
1657
0
  }
1658
0
  if (file_to_ingest->smallest_seqno == kMaxSequenceNumber) {
1659
0
    return Status::InvalidArgument(
1660
0
        "Unknown largest seqno for db generated file.");
1661
0
  }
1662
0
  return Status::OK();
1663
0
}
1664
1665
}  // namespace ROCKSDB_NAMESPACE