Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/import_column_family_job.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
7
#include "db/version_builder.h"
8
9
#include "db/import_column_family_job.h"
10
11
#include <algorithm>
12
#include <cinttypes>
13
#include <string>
14
#include <vector>
15
16
#include "db/version_edit.h"
17
#include "file/file_util.h"
18
#include "file/random_access_file_reader.h"
19
#include "logging/logging.h"
20
#include "table/merging_iterator.h"
21
#include "table/sst_file_writer_collectors.h"
22
#include "table/table_builder.h"
23
#include "table/unique_id_impl.h"
24
#include "util/stop_watch.h"
25
26
namespace ROCKSDB_NAMESPACE {
27
28
Status ImportColumnFamilyJob::Prepare(uint64_t next_file_number,
29
0
                                      SuperVersion* sv) {
30
0
  Status status;
31
0
  std::vector<ColumnFamilyIngestFileInfo> cf_ingest_infos;
32
0
  for (const auto& metadata_per_cf : metadatas_) {
33
    // Read the information of files we are importing
34
0
    ColumnFamilyIngestFileInfo cf_file_info;
35
0
    InternalKey smallest, largest;
36
0
    int num_files = 0;
37
0
    std::vector<IngestedFileInfo> files_to_import_per_cf;
38
0
    for (size_t i = 0; i < metadata_per_cf.size(); i++) {
39
0
      auto file_metadata = *metadata_per_cf[i];
40
0
      const auto file_path = file_metadata.db_path + "/" + file_metadata.name;
41
0
      IngestedFileInfo file_to_import;
42
0
      status = GetIngestedFileInfo(file_path, next_file_number++, sv,
43
0
                                   file_metadata, &file_to_import);
44
0
      if (!status.ok()) {
45
0
        return status;
46
0
      }
47
48
0
      if (file_to_import.num_entries == 0) {
49
0
        status = Status::InvalidArgument("File contain no entries");
50
0
        return status;
51
0
      }
52
53
0
      if (!file_to_import.smallest_internal_key.Valid() ||
54
0
          !file_to_import.largest_internal_key.Valid()) {
55
0
        status = Status::Corruption("File has corrupted keys");
56
0
        return status;
57
0
      }
58
59
0
      files_to_import_per_cf.push_back(file_to_import);
60
0
      num_files++;
61
62
      // Calculate the smallest and largest keys of all files in this CF
63
0
      if (i == 0) {
64
0
        smallest = file_to_import.smallest_internal_key;
65
0
        largest = file_to_import.largest_internal_key;
66
0
      } else {
67
0
        if (cfd_->internal_comparator().Compare(
68
0
                smallest, file_to_import.smallest_internal_key) > 0) {
69
0
          smallest = file_to_import.smallest_internal_key;
70
0
        }
71
0
        if (cfd_->internal_comparator().Compare(
72
0
                largest, file_to_import.largest_internal_key) < 0) {
73
0
          largest = file_to_import.largest_internal_key;
74
0
        }
75
0
      }
76
0
    }
77
78
0
    if (num_files == 0) {
79
0
      status = Status::InvalidArgument("The list of files is empty");
80
0
      return status;
81
0
    }
82
0
    files_to_import_.push_back(files_to_import_per_cf);
83
0
    cf_file_info.smallest_internal_key = smallest;
84
0
    cf_file_info.largest_internal_key = largest;
85
0
    cf_ingest_infos.push_back(cf_file_info);
86
0
  }
87
88
0
  std::sort(cf_ingest_infos.begin(), cf_ingest_infos.end(),
89
0
            [this](const ColumnFamilyIngestFileInfo& info1,
90
0
                   const ColumnFamilyIngestFileInfo& info2) {
91
0
              return cfd_->user_comparator()->Compare(
92
0
                         info1.smallest_internal_key.user_key(),
93
0
                         info2.smallest_internal_key.user_key()) < 0;
94
0
            });
95
96
0
  for (size_t i = 0; i + 1 < cf_ingest_infos.size(); i++) {
97
0
    if (cfd_->user_comparator()->Compare(
98
0
            cf_ingest_infos[i].largest_internal_key.user_key(),
99
0
            cf_ingest_infos[i + 1].smallest_internal_key.user_key()) >= 0) {
100
0
      status = Status::InvalidArgument("CFs have overlapping ranges");
101
0
      return status;
102
0
    }
103
0
  }
104
105
  // Copy/Move external files into DB
106
0
  auto hardlink_files = import_options_.move_files;
107
108
0
  for (auto& files_to_import_per_cf : files_to_import_) {
109
0
    for (auto& f : files_to_import_per_cf) {
110
0
      const auto path_outside_db = f.external_file_path;
111
0
      const auto path_inside_db = TableFileName(
112
0
          cfd_->ioptions()->cf_paths, f.fd.GetNumber(), f.fd.GetPathId());
113
114
0
      if (hardlink_files) {
115
0
        status = fs_->LinkFile(path_outside_db, path_inside_db, IOOptions(),
116
0
                               nullptr);
117
0
        if (status.IsNotSupported()) {
118
          // Original file is on a different FS, use copy instead of hard
119
          // linking
120
0
          hardlink_files = false;
121
0
          ROCKS_LOG_INFO(db_options_.info_log,
122
0
                         "Try to link file %s but it's not supported : %s",
123
0
                         f.internal_file_path.c_str(),
124
0
                         status.ToString().c_str());
125
0
        }
126
0
      }
127
0
      if (!hardlink_files) {
128
        // FIXME: temperature handling (like ExternalSstFileIngestionJob)
129
0
        status = CopyFile(fs_.get(), path_outside_db, Temperature::kUnknown,
130
0
                          path_inside_db, Temperature::kUnknown, 0,
131
0
                          db_options_.use_fsync, io_tracer_);
132
0
      }
133
0
      if (!status.ok()) {
134
0
        break;
135
0
      }
136
0
      f.copy_file = !hardlink_files;
137
0
      f.internal_file_path = path_inside_db;
138
0
    }
139
0
    if (!status.ok()) {
140
0
      break;
141
0
    }
142
0
  }
143
144
0
  if (!status.ok()) {
145
    // We failed, remove all files that we copied into the db
146
0
    for (auto& files_to_import_per_cf : files_to_import_) {
147
0
      for (auto& f : files_to_import_per_cf) {
148
0
        if (f.internal_file_path.empty()) {
149
0
          break;
150
0
        }
151
0
        const auto s =
152
0
            fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
153
0
        if (!s.ok()) {
154
0
          ROCKS_LOG_WARN(db_options_.info_log,
155
0
                         "AddFile() clean up for file %s failed : %s",
156
0
                         f.internal_file_path.c_str(), s.ToString().c_str());
157
0
        }
158
0
      }
159
0
    }
160
0
  }
161
162
0
  return status;
163
0
}
164
165
// REQUIRES: we have become the only writer by entering both write_thread_ and
166
// nonmem_write_thread_
167
0
Status ImportColumnFamilyJob::Run() {
168
  // We use the import time as the ancester time. This is the time the data
169
  // is written to the database.
170
0
  int64_t temp_current_time = 0;
171
0
  uint64_t oldest_ancester_time = kUnknownOldestAncesterTime;
172
0
  uint64_t current_time = kUnknownOldestAncesterTime;
173
0
  if (clock_->GetCurrentTime(&temp_current_time).ok()) {
174
0
    current_time = oldest_ancester_time =
175
0
        static_cast<uint64_t>(temp_current_time);
176
0
  }
177
178
0
  Status s;
179
  // When importing multiple CFs, we should not reuse epoch number from ingested
180
  // files. Since these epoch numbers were assigned by different CFs, there may
181
  // be different files from different CFs with the same epoch number. With a
182
  // subsequent intra-L0 compaction we may end up with files with overlapping
183
  // key range but the same epoch number. Here we will create a dummy
184
  // VersionStorageInfo per CF being imported. Each CF's files will be assigned
185
  // increasing epoch numbers to avoid duplicated epoch number. This is done by
186
  // only resetting epoch number of the new CF in the first call to
187
  // RecoverEpochNumbers() below.
188
0
  for (size_t i = 0; s.ok() && i < files_to_import_.size(); ++i) {
189
0
    VersionBuilder dummy_version_builder(
190
0
        cfd_->current()->version_set()->file_options(), cfd_->ioptions(),
191
0
        cfd_->table_cache(), cfd_->current()->storage_info(),
192
0
        cfd_->current()->version_set(),
193
0
        cfd_->GetFileMetadataCacheReservationManager());
194
0
    VersionStorageInfo dummy_vstorage(
195
0
        &cfd_->internal_comparator(), cfd_->user_comparator(),
196
0
        cfd_->NumberLevels(), cfd_->ioptions()->compaction_style,
197
0
        nullptr /* src_vstorage */, cfd_->ioptions()->force_consistency_checks,
198
0
        EpochNumberRequirement::kMightMissing, cfd_->ioptions()->clock,
199
0
        cfd_->GetLatestMutableCFOptions()->bottommost_file_compaction_delay,
200
0
        cfd_->current()->version_set()->offpeak_time_option());
201
0
    for (size_t j = 0; s.ok() && j < files_to_import_[i].size(); ++j) {
202
0
      const auto& f = files_to_import_[i][j];
203
0
      const auto& file_metadata = *metadatas_[i][j];
204
205
0
      uint64_t tail_size = 0;
206
0
      bool contain_no_data_blocks = f.table_properties.num_entries > 0 &&
207
0
                                    (f.table_properties.num_entries ==
208
0
                                     f.table_properties.num_range_deletions);
209
0
      if (f.table_properties.tail_start_offset > 0 || contain_no_data_blocks) {
210
0
        uint64_t file_size = f.fd.GetFileSize();
211
0
        assert(f.table_properties.tail_start_offset <= file_size);
212
0
        tail_size = file_size - f.table_properties.tail_start_offset;
213
0
      }
214
215
0
      VersionEdit dummy_version_edit;
216
0
      dummy_version_edit.AddFile(
217
0
          file_metadata.level, f.fd.GetNumber(), f.fd.GetPathId(),
218
0
          f.fd.GetFileSize(), f.smallest_internal_key, f.largest_internal_key,
219
0
          file_metadata.smallest_seqno, file_metadata.largest_seqno, false,
220
0
          file_metadata.temperature, kInvalidBlobFileNumber,
221
0
          oldest_ancester_time, current_time, file_metadata.epoch_number,
222
0
          kUnknownFileChecksum, kUnknownFileChecksumFuncName, f.unique_id, 0,
223
0
          tail_size,
224
0
          static_cast<bool>(
225
0
              f.table_properties.user_defined_timestamps_persisted));
226
0
      s = dummy_version_builder.Apply(&dummy_version_edit);
227
0
    }
228
0
    if (s.ok()) {
229
0
      s = dummy_version_builder.SaveTo(&dummy_vstorage);
230
0
    }
231
0
    if (s.ok()) {
232
      // force resetting epoch number for each file
233
0
      dummy_vstorage.RecoverEpochNumbers(cfd_, /*restart_epoch=*/i == 0,
234
0
                                         /*force=*/true);
235
0
      edit_.SetColumnFamily(cfd_->GetID());
236
237
0
      for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
238
0
        for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
239
0
          edit_.AddFile(level, *file_meta);
240
          // If incoming sequence number is higher, update local sequence
241
          // number.
242
0
          if (file_meta->fd.largest_seqno > versions_->LastSequence()) {
243
0
            versions_->SetLastAllocatedSequence(file_meta->fd.largest_seqno);
244
0
            versions_->SetLastPublishedSequence(file_meta->fd.largest_seqno);
245
0
            versions_->SetLastSequence(file_meta->fd.largest_seqno);
246
0
          }
247
0
        }
248
0
      }
249
0
    }
250
    // Release resources occupied by the dummy VersionStorageInfo
251
0
    for (int level = 0; level < dummy_vstorage.num_levels(); level++) {
252
0
      for (FileMetaData* file_meta : dummy_vstorage.LevelFiles(level)) {
253
0
        file_meta->refs--;
254
0
        if (file_meta->refs <= 0) {
255
0
          delete file_meta;
256
0
        }
257
0
      }
258
0
    }
259
0
  }
260
261
0
  return s;
262
0
}
263
264
0
void ImportColumnFamilyJob::Cleanup(const Status& status) {
265
0
  if (!status.ok()) {
266
    // We failed to add files to the database remove all the files we copied.
267
0
    for (auto& files_to_import_per_cf : files_to_import_) {
268
0
      for (auto& f : files_to_import_per_cf) {
269
0
        const auto s =
270
0
            fs_->DeleteFile(f.internal_file_path, IOOptions(), nullptr);
271
0
        if (!s.ok()) {
272
0
          ROCKS_LOG_WARN(db_options_.info_log,
273
0
                         "AddFile() clean up for file %s failed : %s",
274
0
                         f.internal_file_path.c_str(), s.ToString().c_str());
275
0
        }
276
0
      }
277
0
    }
278
0
  } else if (status.ok() && import_options_.move_files) {
279
    // The files were moved and added successfully, remove original file links
280
0
    for (auto& files_to_import_per_cf : files_to_import_) {
281
0
      for (auto& f : files_to_import_per_cf) {
282
0
        const auto s =
283
0
            fs_->DeleteFile(f.external_file_path, IOOptions(), nullptr);
284
0
        if (!s.ok()) {
285
0
          ROCKS_LOG_WARN(
286
0
              db_options_.info_log,
287
0
              "%s was added to DB successfully but failed to remove original "
288
0
              "file link : %s",
289
0
              f.external_file_path.c_str(), s.ToString().c_str());
290
0
        }
291
0
      }
292
0
    }
293
0
  }
294
0
}
295
296
Status ImportColumnFamilyJob::GetIngestedFileInfo(
297
    const std::string& external_file, uint64_t new_file_number,
298
    SuperVersion* sv, const LiveFileMetaData& file_meta,
299
0
    IngestedFileInfo* file_to_import) {
300
0
  file_to_import->external_file_path = external_file;
301
0
  Status status;
302
0
  if (file_meta.size > 0) {
303
0
    file_to_import->file_size = file_meta.size;
304
0
  } else {
305
    // Get external file size
306
0
    status = fs_->GetFileSize(external_file, IOOptions(),
307
0
                              &file_to_import->file_size, nullptr);
308
0
    if (!status.ok()) {
309
0
      return status;
310
0
    }
311
0
  }
312
  // Assign FD with number
313
0
  file_to_import->fd =
314
0
      FileDescriptor(new_file_number, 0, file_to_import->file_size);
315
316
  // Create TableReader for external file
317
0
  std::unique_ptr<TableReader> table_reader;
318
0
  std::unique_ptr<FSRandomAccessFile> sst_file;
319
0
  std::unique_ptr<RandomAccessFileReader> sst_file_reader;
320
321
0
  status =
322
0
      fs_->NewRandomAccessFile(external_file, env_options_, &sst_file, nullptr);
323
0
  if (!status.ok()) {
324
0
    return status;
325
0
  }
326
0
  sst_file_reader.reset(new RandomAccessFileReader(
327
0
      std::move(sst_file), external_file, nullptr /*Env*/, io_tracer_));
328
329
  // TODO(yuzhangyu): User-defined timestamps doesn't support importing column
330
  //  family. Pass in the correct `user_defined_timestamps_persisted` flag for
331
  //  creating `TableReaderOptions` when the support is there.
332
0
  status = cfd_->ioptions()->table_factory->NewTableReader(
333
0
      TableReaderOptions(
334
0
          *cfd_->ioptions(), sv->mutable_cf_options.prefix_extractor,
335
0
          env_options_, cfd_->internal_comparator(),
336
0
          sv->mutable_cf_options.block_protection_bytes_per_key,
337
0
          /*skip_filters*/ false, /*immortal*/ false,
338
0
          /*force_direct_prefetch*/ false, /*level*/ -1,
339
0
          /*block_cache_tracer*/ nullptr,
340
0
          /*max_file_size_for_l0_meta_pin*/ 0, versions_->DbSessionId(),
341
0
          /*cur_file_num*/ new_file_number),
342
0
      std::move(sst_file_reader), file_to_import->file_size, &table_reader);
343
0
  if (!status.ok()) {
344
0
    return status;
345
0
  }
346
347
  // Get the external file properties
348
0
  auto props = table_reader->GetTableProperties();
349
350
  // Set original_seqno to 0.
351
0
  file_to_import->original_seqno = 0;
352
353
  // Get number of entries in table
354
0
  file_to_import->num_entries = props->num_entries;
355
356
  // If the importing files were exported with Checkpoint::ExportColumnFamily(),
357
  // we cannot simply recompute smallest and largest used to truncate range
358
  // tombstones from file content, and we expect smallest and largest populated
359
  // in file_meta.
360
0
  if (file_meta.smallest.empty()) {
361
0
    assert(file_meta.largest.empty());
362
    // TODO: plumb Env::IOActivity, Env::IOPriority
363
0
    ReadOptions ro;
364
0
    std::unique_ptr<InternalIterator> iter(table_reader->NewIterator(
365
0
        ro, sv->mutable_cf_options.prefix_extractor.get(), /*arena=*/nullptr,
366
0
        /*skip_filters=*/false, TableReaderCaller::kExternalSSTIngestion));
367
368
    // Get first (smallest) key from file
369
0
    iter->SeekToFirst();
370
0
    bool bound_set = false;
371
0
    if (iter->Valid()) {
372
0
      file_to_import->smallest_internal_key.DecodeFrom(iter->key());
373
0
      Slice largest;
374
0
      if (strcmp(cfd_->ioptions()->table_factory->Name(), "PlainTable") == 0) {
375
        // PlainTable iterator does not support SeekToLast().
376
0
        largest = iter->key();
377
0
        for (; iter->Valid(); iter->Next()) {
378
0
          if (cfd_->internal_comparator().Compare(iter->key(), largest) > 0) {
379
0
            largest = iter->key();
380
0
          }
381
0
        }
382
0
        if (!iter->status().ok()) {
383
0
          return iter->status();
384
0
        }
385
0
      } else {
386
0
        iter->SeekToLast();
387
0
        if (!iter->Valid()) {
388
0
          if (iter->status().ok()) {
389
            // The file contains at least 1 key since iter is valid after
390
            // SeekToFirst().
391
0
            return Status::Corruption("Can not find largest key in sst file");
392
0
          } else {
393
0
            return iter->status();
394
0
          }
395
0
        }
396
0
        largest = iter->key();
397
0
      }
398
0
      file_to_import->largest_internal_key.DecodeFrom(largest);
399
0
      bound_set = true;
400
0
    } else if (!iter->status().ok()) {
401
0
      return iter->status();
402
0
    }
403
404
0
    std::unique_ptr<InternalIterator> range_del_iter{
405
0
        table_reader->NewRangeTombstoneIterator(ro)};
406
0
    if (range_del_iter != nullptr) {
407
0
      range_del_iter->SeekToFirst();
408
0
      if (range_del_iter->Valid()) {
409
0
        ParsedInternalKey key;
410
0
        Status pik_status = ParseInternalKey(range_del_iter->key(), &key,
411
0
                                             db_options_.allow_data_in_errors);
412
0
        if (!pik_status.ok()) {
413
0
          return Status::Corruption("Corrupted key in external file. ",
414
0
                                    pik_status.getState());
415
0
        }
416
0
        RangeTombstone first_tombstone(key, range_del_iter->value());
417
0
        InternalKey start_key = first_tombstone.SerializeKey();
418
0
        const InternalKeyComparator* icmp = &cfd_->internal_comparator();
419
0
        if (!bound_set ||
420
0
            icmp->Compare(start_key, file_to_import->smallest_internal_key) <
421
0
                0) {
422
0
          file_to_import->smallest_internal_key = start_key;
423
0
        }
424
425
0
        range_del_iter->SeekToLast();
426
0
        pik_status = ParseInternalKey(range_del_iter->key(), &key,
427
0
                                      db_options_.allow_data_in_errors);
428
0
        if (!pik_status.ok()) {
429
0
          return Status::Corruption("Corrupted key in external file. ",
430
0
                                    pik_status.getState());
431
0
        }
432
0
        RangeTombstone last_tombstone(key, range_del_iter->value());
433
0
        InternalKey end_key = last_tombstone.SerializeEndKey();
434
0
        if (!bound_set ||
435
0
            icmp->Compare(end_key, file_to_import->largest_internal_key) > 0) {
436
0
          file_to_import->largest_internal_key = end_key;
437
0
        }
438
0
        bound_set = true;
439
0
      }
440
0
    }
441
0
    assert(bound_set);
442
0
  } else {
443
0
    assert(!file_meta.largest.empty());
444
0
    file_to_import->smallest_internal_key.DecodeFrom(file_meta.smallest);
445
0
    file_to_import->largest_internal_key.DecodeFrom(file_meta.largest);
446
0
  }
447
448
0
  file_to_import->cf_id = static_cast<uint32_t>(props->column_family_id);
449
450
0
  file_to_import->table_properties = *props;
451
452
0
  auto s = GetSstInternalUniqueId(props->db_id, props->db_session_id,
453
0
                                  props->orig_file_number,
454
0
                                  &(file_to_import->unique_id));
455
0
  if (!s.ok()) {
456
0
    ROCKS_LOG_WARN(db_options_.info_log,
457
0
                   "Failed to get SST unique id for file %s",
458
0
                   file_to_import->internal_file_path.c_str());
459
0
  }
460
461
0
  return status;
462
0
}
463
}  // namespace ROCKSDB_NAMESPACE