Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/import_column_family_job.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
7
#include "db/import_column_family_job.h"
8
9
#include <algorithm>
10
#include <cinttypes>
11
#include <string>
12
#include <vector>
13
14
#include "db/version_builder.h"
15
#include "db/version_edit.h"
16
#include "file/file_util.h"
17
#include "file/random_access_file_reader.h"
18
#include "logging/logging.h"
19
#include "table/merging_iterator.h"
20
#include "table/sst_file_writer_collectors.h"
21
#include "table/table_builder.h"
22
#include "table/unique_id_impl.h"
23
#include "util/stop_watch.h"
24
25
namespace ROCKSDB_NAMESPACE {
26
27
Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
28
0
                                      SuperVersion* sv) {
29
0
  Status status;
30
0
  std::vector<ColumnFamilyIngestFileInfo> cf_ingest_infos;
31
0
  for (const auto& metadata_per_cf : metadatas_) {
32
    // Read the information of files we are importing
33
0
    ColumnFamilyIngestFileInfo cf_file_info;
34
0
    InternalKey smallest, largest;
35
0
    int num_files = 0;
36
0
    std::vector<IngestedFileInfo> files_to_import_per_cf;
37
0
    for (size_t i = 0; i < metadata_per_cf.size(); i++) {
38
0
      auto file_metadata = *metadata_per_cf[i];
39
0
      const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
40
0
      IngestedFileInfo file_to_import;
41
0
      status = GetIngestedFileInfo(file_path, next_file_number++, sv,
42
0
                                   file_metadata, &file_to_import);
43
0
      if (!status.ok()) {
44
0
        return status;
45
0
      }
46
47
0
      if (file_to_import.num_entries == 0) {
48
0
        status = Status::InvalidArgument("File contain no entries");
49
0
        return status;
50
0
      }
51
52
0
      if (!file_to_import.smallest_internal_key.Valid() ||
53
0
          !file_to_import.largest_internal_key.Valid()) {
54
0
        status = Status::Corruption("File has corrupted keys");
55
0
        return status;
56
0
      }
57
58
0
      files_to_import_per_cf.push_back(file_to_import);
59
0
      num_files++;
60
61
      // Calculate the smallest and largest keys of all files in this CF
62
0
      if (i == 0) {
63
0
        smallest = file_to_import.smallest_internal_key;
64
0
        largest = file_to_import.largest_internal_key;
65
0
      } else {
66
0
        if (cfd_->internal_comparator().Compare(
67
0
                smallest, file_to_import.smallest_internal_key) > 0) {
68
0
          smallest = file_to_import.smallest_internal_key;
69
0
        }
70
0
        if (cfd_->internal_comparator().Compare(
71
0
                largest, file_to_import.largest_internal_key) < 0) {
72
0
          largest = file_to_import.largest_internal_key;
73
0
        }
74
0
      }
75
0
    }
76
77
0
    if (num_files == 0) {
78
0
      status = Status::InvalidArgument("The list of files is empty");
79
0
      return status;
80
0
    }
81
0
    files_to_import_.push_back(files_to_import_per_cf);
82
0
    cf_file_info.smallest_internal_key = smallest;
83
0
    cf_file_info.largest_internal_key = largest;
84
0
    cf_ingest_infos.push_back(cf_file_info);
85
0
  }
86
87
0
  std::sort(cf_ingest_infos.begin(), cf_ingest_infos.end(),
88
0
            [this](const ColumnFamilyIngestFileInfo& info1,
89
0
                   const ColumnFamilyIngestFileInfo& info2) {
90
0
              return cfd_->user_comparator()->Compare(
91
0
                         info1.smallest_internal_key.user_key(),
92
0
                         info2.smallest_internal_key.user_key()) < 0;
93
0
            });
94
95
0
  for (size_t i = 0; i + 1 < cf_ingest_infos.size(); i++) {
96
0
    if (cfd_->user_comparator()->Compare(
97
0
            cf_ingest_infos[i].largest_internal_key.user_key(),
98
0
            cf_ingest_infos[i + 1].smallest_internal_key.user_key()) >= 0) {
99
0
      status = Status::InvalidArgument("CFs have overlapping ranges");
100
0
      return status;
101
0
    }
102
0
  }
103
104
  // Copy/Move external files into DB
105
0
  auto hardlink_files = import_options_.move_files;
106
107
0
  for (auto& files_to_import_per_cf : files_to_import_) {
108
0
    for (auto& f : files_to_import_per_cf) {
109
0
      const auto path_outside_db = f.external_file_path;
110
0
      const auto path_inside_db = TableFileName(
111
0
          cfd_->ioptions().cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
112
113
0
      if (hardlink_files) {
114
0
        status = fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(),
115
0
                               nullptr);
116
0
        if (status.IsNotSupported()) {
117
          // Original file is on a different FS, use copy instead of hard
118
          // linking
119
0
          hardlink_files = false;
120
0
          ROCKS_LOG_INFO(db_options_.info_log,
121
0
                         "Try to link file %s but it's not supported : %s",
122
0
                         f.internal_file_path.c_str(),
123
0
                         status.ToString().c_str());
124
0
        }
125
0
      }
126
0
      if (!hardlink_files) {
127
        // FIXME: temperature handling (like ExternalSstFileIngestionJob)
128
0
        status = CopyFile(fs_.get(), path_outside_db, Temperature::kUnknown,
129
0
                          path_inside_db, Temperature::kUnknown, 0,
130
0
                          db_options_.use_fsync, io_tracer_);
131
0
      }
132
0
      if (!status.ok()) {
133
0
        break;
134
0
      }
135
0
      f.copy_file = !hardlink_files;
136
0
      f.internal_file_path = path_inside_db;
137
0
    }
138
0
    if (!status.ok()) {
139
0
      break;
140
0
    }
141
0
  }
142
143
0
  if (!status.ok()) {
144
    // We failed, remove all files that we copied into the db
145
0
    for (auto& files_to_import_per_cf : files_to_import_) {
146
0
      for (auto& f : files_to_import_per_cf) {
147
0
        if (f.internal_file_path.empty()) {
148
0
          break;
149
0
        }
150
0
        const auto s =
151
0
            fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
152
0
        if (!s.ok()) {
153
0
          ROCKS_LOG_WARN(db_options_.info_log,
154
0
                         "AddFile() clean up for file %s failed : %s",
155
0
                         f.internal_file_path.c_str(), s.ToString().c_str());
156
0
        }
157
0
      }
158
0
    }
159
0
  }
160
161
0
  return status;
162
0
}
163
164
// REQUIRES: we have become the only writer by entering both write_thread_ and
165
// nonmem_write_thread_
166
0
Status ImportColumnFamilyJob::Run() {
167
  // We use the import time as the ancester time. This is the time the data
168
  // is written to the database.
169
0
  int64_t temp_current_time = 0;
170
0
  uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
171
0
  uint64_t current_time = kUnknownOldestAncesterTime;
172
0
  if (clock_->GetCurrentTime(&temp_current_time).ok()) {
173
0
    current_time = oldest_ancester_time =
174
0
        static_cast<uint64_t>(temp_current_time);
175
0
  }
176
177
0
  Status s;
178
  // When importing multiple CFs, we should not reuse epoch number from ingested
179
  // files. Since these epoch numbers were assigned by different CFs, there may
180
  // be different files from different CFs with the same epoch number. With a
181
  // subsequent intra-L0 compaction we may end up with files with overlapping
182
  // key range but the same epoch number. Here we will create a dummy
183
  // VersionStorageInfo per CF being imported. Each CF's files will be assigned
184
  // increasing epoch numbers to avoid duplicated epoch number. This is done by
185
  // only resetting epoch number of the new CF in the first call to
186
  // RecoverEpochNumbers() below.
187
0
  for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {
188
0
    VersionBuilder dummy_version_builder(
189
0
        cfd_->current()->version_set()->file_options(), &cfd_->ioptions(),
190
0
        cfd_->table_cache(), cfd_->current()->storage_info(),
191
0
        cfd_->current()->version_set(),
192
0
        cfd_->GetFileMetadataCacheReservationManager());
193
0
    VersionStorageInfo dummy_vstorage(
194
0
        &cfd_->internal_comparator(), cfd_->user_comparator(),
195
0
        cfd_->NumberLevels(), cfd_->ioptions().compaction_style,
196
0
        nullptr /* src_vstorage */, cfd_->ioptions().force_consistency_checks,
197
0
        EpochNumberRequirement::kMightMissing, cfd_->ioptions().clock,
198
0
        cfd_->GetLatestMutableCFOptions().bottommost_file_compaction_delay,
199
0
        cfd_->current()->version_set()->offpeak_time_option());
200
0
    for (size_t j = 0; s.ok() && j < files_to_import_[i].size(); ++j) {
201
0
      const auto& f = files_to_import_[i][j];
202
0
      const auto& file_metadata = *metadatas_[i][j];
203
204
0
      uint64_t tail_size = FileMetaData::CalculateTailSize(f.fd.GetFileSize(),
205
0
                                                           f.table_properties);
206
207
0
      VersionEdit dummy_version_edit;
208
0
      dummy_version_edit.AddFile(
209
0
          file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
210
0
          f.fd.GetFileSize(), f.smallest_internal_key, f.largest_internal_key,
211
0
          file_metadata.smallest_seqno, file_metadata.largest_seqno, false,
212
0
          file_metadata.temperature, kInvalidBlobFileNumber,
213
0
          oldest_ancester_time, current_time, file_metadata.epoch_number,
214
0
          kUnknownFileChecksum, kUnknownFileChecksumFuncName, f.unique_id, 0,
215
0
          tail_size,
216
0
          static_cast<bool>(
217
0
              f.table_properties.user_defined_timestamps_persisted));
218
0
      s = dummy_version_builder.Apply(&dummy_version_edit);
219
0
    }
220
0
    if (s.ok()) {
221
0
      s = dummy_version_builder.SaveTo(&dummy_vstorage);
222
0
    }
223
0
    if (s.ok()) {
224
      // force resetting epoch number for each file
225
0
      dummy_vstorage.RecoverEpochNumbers(cfd_, /*restart_epoch=*/i == 0,
226
0
                                         /*force=*/true);
227
0
      edit_.SetColumnFamily(cfd_->GetID());
228
229
0
      for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
230
0
        for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
231
0
          edit_.AddFile(level, *file_meta);
232
          // If incoming sequence number is higher, update local sequence
233
          // number.
234
0
          if (file_meta->fd.largest_seqno > versions_->LastSequence()) {
235
0
            versions_->SetLastAllocatedSequence(file_meta->fd.largest_seqno);
236
0
            versions_->SetLastPublishedSequence(file_meta->fd.largest_seqno);
237
0
            versions_->SetLastSequence(file_meta->fd.largest_seqno);
238
0
          }
239
0
        }
240
0
      }
241
0
    }
242
    // Release resources occupied by the dummy VersionStorageInfo
243
0
    for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
244
0
      for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
245
0
        file_meta->refs--;
246
0
        if (file_meta->refs <= 0) {
247
0
          delete file_meta;
248
0
        }
249
0
      }
250
0
    }
251
0
  }
252
253
0
  return s;
254
0
}
255
256
0
void ImportColumnFamilyJob::Cleanup(const Status& status) {
257
0
  if (!status.ok()) {
258
    // We failed to add files to the database remove all the files we copied.
259
0
    for (auto& files_to_import_per_cf : files_to_import_) {
260
0
      for (auto& f : files_to_import_per_cf) {
261
0
        const auto s =
262
0
            fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
263
0
        if (!s.ok()) {
264
0
          ROCKS_LOG_WARN(db_options_.info_log,
265
0
                         "AddFile() clean up for file %s failed : %s",
266
0
                         f.internal_file_path.c_str(), s.ToString().c_str());
267
0
        }
268
0
      }
269
0
    }
270
0
  } else if (status.ok() && import_options_.move_files) {
271
    // The files were moved and added successfully, remove original file links
272
0
    for (auto& files_to_import_per_cf : files_to_import_) {
273
0
      for (auto& f : files_to_import_per_cf) {
274
0
        const auto s =
275
0
            fs_->DeleteFile(f.external_file_path, IOOptions(), nullptr);
276
0
        if (!s.ok()) {
277
0
          ROCKS_LOG_WARN(
278
0
              db_options_.info_log,
279
0
              "%s was added to DB successfully but failed to remove original "
280
0
              "file link : %s",
281
0
              f.external_file_path.c_str(), s.ToString().c_str());
282
0
        }
283
0
      }
284
0
    }
285
0
  }
286
0
}
287
288
Status ImportColumnFamilyJob::GetIngestedFileInfo(
289
    const std::string& external_file, uint64_t new_file_number,
290
    SuperVersion* sv, const LiveFileMetaData& file_meta,
291
0
    IngestedFileInfo* file_to_import) {
292
0
  file_to_import->external_file_path = external_file;
293
0
  Status status;
294
0
  if (file_meta.size > 0) {
295
0
    file_to_import->file_size = file_meta.size;
296
0
  } else {
297
    // Get external file size
298
0
    status = fs_->GetFileSize(external_file, IOOptions(),
299
0
                              &file_to_import->file_size, nullptr);
300
0
    if (!status.ok()) {
301
0
      return status;
302
0
    }
303
0
  }
304
  // Assign FD with number
305
0
  file_to_import->fd =
306
0
      FileDescriptor(new_file_number, 0, file_to_import->file_size);
307
308
  // Create TableReader for external file
309
0
  std::unique_ptr<TableReader> table_reader;
310
0
  std::unique_ptr<FSRandomAccessFile> sst_file;
311
0
  std::unique_ptr<RandomAccessFileReader> sst_file_reader;
312
313
0
  FileOptions fo{env_options_};
314
0
  fo.file_checksum = file_meta.file_checksum;
315
0
  fo.file_checksum_func_name = file_meta.file_checksum_func_name;
316
0
  status = fs_->NewRandomAccessFile(external_file, fo, &sst_file, nullptr);
317
0
  if (!status.ok()) {
318
0
    return status;
319
0
  }
320
0
  sst_file_reader.reset(new RandomAccessFileReader(
321
0
      std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
322
323
  // TODO(yuzhangyu): User-defined timestamps doesn't support importing column
324
  //  family. Pass in the correct `user_defined_timestamps_persisted` flag for
325
  //  creating `TableReaderOptions` when the support is there.
326
0
  status = sv->mutable_cf_options.table_factory->NewTableReader(
327
0
      TableReaderOptions(
328
0
          cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
329
0
          sv->mutable_cf_options.compression_manager.get(), env_options_,
330
0
          cfd_->internal_comparator(),
331
0
          sv->mutable_cf_options.block_protection_bytes_per_key,
332
0
          /*skip_filters*/ false, /*immortal*/ false,
333
0
          /*force_direct_prefetch*/ false, /*level*/ -1,
334
0
          /*block_cache_tracer*/ nullptr,
335
0
          /*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
336
0
          /*cur_file_num*/ new_file_number),
337
0
      std::move(sst_file_reader), file_to_import->file_size, &table_reader);
338
0
  if (!status.ok()) {
339
0
    return status;
340
0
  }
341
342
  // Get the external file properties
343
0
  auto props = table_reader->GetTableProperties();
344
345
  // Set original_seqno to 0.
346
0
  file_to_import->original_seqno = 0;
347
348
  // Get number of entries in table
349
0
  file_to_import->num_entries = props->num_entries;
350
351
  // If the importing files were exported with Checkpoint::ExportColumnFamily(),
352
  // we cannot simply recompute smallest and largest used to truncate range
353
  // tombstones from file content, and we expect smallest and largest populated
354
  // in file_meta.
355
0
  if (file_meta.smallest.empty()) {
356
0
    assert(file_meta.largest.empty());
357
    // TODO: plumb Env::IOActivity, Env::IOPriority
358
0
    ReadOptions ro;
359
0
    std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
360
0
        ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
361
0
        /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
362
363
    // Get first (smallest) key from file
364
0
    iter->SeekToFirst();
365
0
    bool bound_set = false;
366
0
    if (iter->Valid()) {
367
0
      file_to_import->smallest_internal_key.DecodeFrom(iter->key());
368
0
      Slice largest;
369
0
      if (strcmp(sv->mutable_cf_options.table_factory->Name(), "PlainTable") ==
370
0
          0) {
371
        // PlainTable iterator does not support SeekToLast().
372
0
        largest = iter->key();
373
0
        for (; iter->Valid(); iter->Next()) {
374
0
          if (cfd_->internal_comparator().Compare(iter->key(), largest) > 0) {
375
0
            largest = iter->key();
376
0
          }
377
0
        }
378
0
        if (!iter->status().ok()) {
379
0
          return iter->status();
380
0
        }
381
0
      } else {
382
0
        iter->SeekToLast();
383
0
        if (!iter->Valid()) {
384
0
          if (iter->status().ok()) {
385
            // The file contains at least 1 key since iter is valid after
386
            // SeekToFirst().
387
0
            return Status::Corruption("Can not find largest key in sst file");
388
0
          } else {
389
0
            return iter->status();
390
0
          }
391
0
        }
392
0
        largest = iter->key();
393
0
      }
394
0
      file_to_import->largest_internal_key.DecodeFrom(largest);
395
0
      bound_set = true;
396
0
    } else if (!iter->status().ok()) {
397
0
      return iter->status();
398
0
    }
399
400
0
    std::unique_ptr<InternalIterator> range_del_iter{
401
0
        table_reader->NewRangeTombstoneIterator(ro)};
402
0
    if (range_del_iter != nullptr) {
403
0
      range_del_iter->SeekToFirst();
404
0
      if (range_del_iter->Valid()) {
405
0
        ParsedInternalKey key;
406
0
        Status pik_status = ParseInternalKey(range_del_iter->key(), &key,
407
0
                                             db_options_.allow_data_in_errors);
408
0
        if (!pik_status.ok()) {
409
0
          return Status::Corruption("Corrupted key in external file. ",
410
0
                                    pik_status.getState());
411
0
        }
412
0
        RangeTombstone first_tombstone(key, range_del_iter->value());
413
0
        InternalKey start_key = first_tombstone.SerializeKey();
414
0
        const InternalKeyComparator* icmp = &cfd_->internal_comparator();
415
0
        if (!bound_set ||
416
0
            icmp->Compare(start_key, file_to_import->smallest_internal_key) <
417
0
                0) {
418
0
          file_to_import->smallest_internal_key = start_key;
419
0
        }
420
421
0
        range_del_iter->SeekToLast();
422
0
        pik_status = ParseInternalKey(range_del_iter->key(), &key,
423
0
                                      db_options_.allow_data_in_errors);
424
0
        if (!pik_status.ok()) {
425
0
          return Status::Corruption("Corrupted key in external file. ",
426
0
                                    pik_status.getState());
427
0
        }
428
0
        RangeTombstone last_tombstone(key, range_del_iter->value());
429
0
        InternalKey end_key = last_tombstone.SerializeEndKey();
430
0
        if (!bound_set ||
431
0
            icmp->Compare(end_key, file_to_import->largest_internal_key) > 0) {
432
0
          file_to_import->largest_internal_key = end_key;
433
0
        }
434
0
        bound_set = true;
435
0
      }
436
0
    }
437
0
    assert(bound_set);
438
0
  } else {
439
0
    assert(!file_meta.largest.empty());
440
0
    file_to_import->smallest_internal_key.DecodeFrom(file_meta.smallest);
441
0
    file_to_import->largest_internal_key.DecodeFrom(file_meta.largest);
442
0
  }
443
444
0
  file_to_import->cf_id = static_cast<uint32_t>(props->column_family_id);
445
446
0
  file_to_import->table_properties = *props;
447
448
0
  auto s = GetSstInternalUniqueId(props->db_id, props->db_session_id,
449
0
                                  props->orig_file_number,
450
0
                                  &(file_to_import->unique_id));
451
0
  if (!s.ok()) {
452
0
    ROCKS_LOG_WARN(db_options_.info_log,
453
0
                   "Failed to get SST unique id for file %s",
454
0
                   file_to_import->internal_file_path.c_str());
455
0
  }
456
457
0
  return status;
458
0
}
459
}  // namespace ROCKSDB_NAMESPACE