Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/file/delete_scheduler.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "file/delete_scheduler.h"
7
8
#include <cinttypes>
9
#include <thread>
10
#include <vector>
11
12
#include "file/sst_file_manager_impl.h"
13
#include "logging/logging.h"
14
#include "port/port.h"
15
#include "rocksdb/env.h"
16
#include "rocksdb/file_system.h"
17
#include "rocksdb/system_clock.h"
18
#include "test_util/sync_point.h"
19
#include "util/mutexlock.h"
20
21
namespace ROCKSDB_NAMESPACE {
22
23
DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
24
                                 int64_t rate_bytes_per_sec, Logger* info_log,
25
                                 SstFileManagerImpl* sst_file_manager,
26
                                 double max_trash_db_ratio,
27
                                 uint64_t bytes_max_delete_chunk)
28
68.5k
    : clock_(clock),
29
68.5k
      fs_(fs),
30
68.5k
      total_trash_size_(0),
31
68.5k
      rate_bytes_per_sec_(rate_bytes_per_sec),
32
68.5k
      pending_files_(0),
33
68.5k
      next_trash_bucket_(0),
34
68.5k
      bytes_max_delete_chunk_(bytes_max_delete_chunk),
35
68.5k
      closing_(false),
36
68.5k
      cv_(&mu_),
37
68.5k
      bg_thread_(nullptr),
38
68.5k
      info_log_(info_log),
39
68.5k
      sst_file_manager_(sst_file_manager),
40
68.5k
      max_trash_db_ratio_(max_trash_db_ratio) {
41
68.5k
  assert(sst_file_manager != nullptr);
42
68.5k
  assert(max_trash_db_ratio >= 0);
43
68.5k
  MaybeCreateBackgroundThread();
44
68.5k
}
45
46
68.5k
DeleteScheduler::~DeleteScheduler() {
47
68.5k
  {
48
68.5k
    InstrumentedMutexLock l(&mu_);
49
68.5k
    closing_ = true;
50
68.5k
    cv_.SignalAll();
51
68.5k
  }
52
68.5k
  if (bg_thread_) {
53
0
    bg_thread_->join();
54
0
  }
55
68.5k
  for (const auto& it : bg_errors_) {
56
0
    it.second.PermitUncheckedError();
57
0
  }
58
68.5k
}
59
60
Status DeleteScheduler::DeleteFile(const std::string& file_path,
61
                                   const std::string& dir_to_sync,
62
58.6k
                                   const bool force_bg) {
63
58.6k
  uint64_t total_size = sst_file_manager_->GetTotalSize();
64
58.6k
  if (rate_bytes_per_sec_.load() <= 0 ||
65
58.6k
      (!force_bg &&
66
58.6k
       total_trash_size_.load() > total_size * max_trash_db_ratio_.load())) {
67
    // Rate limiting is disabled or trash size makes up more than
68
    // max_trash_db_ratio_ (default 25%) of the total DB size
69
58.6k
    Status s = DeleteFileImmediately(file_path, /*accounted=*/true);
70
58.6k
    if (s.ok()) {
71
58.6k
      ROCKS_LOG_INFO(info_log_,
72
58.6k
                     "Deleted file %s immediately, rate_bytes_per_sec %" PRIi64
73
58.6k
                     ", total_trash_size %" PRIu64 ", total_size %" PRIi64
74
58.6k
                     ", max_trash_db_ratio %lf",
75
58.6k
                     file_path.c_str(), rate_bytes_per_sec_.load(),
76
58.6k
                     total_trash_size_.load(), total_size,
77
58.6k
                     max_trash_db_ratio_.load());
78
58.6k
    }
79
58.6k
    return s;
80
58.6k
  }
81
0
  return AddFileToDeletionQueue(file_path, dir_to_sync, /*bucket=*/std::nullopt,
82
0
                                /*accounted=*/true);
83
58.6k
}
84
85
Status DeleteScheduler::DeleteUnaccountedFile(const std::string& file_path,
86
                                              const std::string& dir_to_sync,
87
                                              const bool force_bg,
88
26.5k
                                              std::optional<int32_t> bucket) {
89
26.5k
  uint64_t num_hard_links = 1;
90
26.5k
  fs_->NumFileLinks(file_path, IOOptions(), &num_hard_links, nullptr)
91
26.5k
      .PermitUncheckedError();
92
93
  // We can tolerate rare races where we might immediately delete both links
94
  // to a file.
95
26.5k
  if (rate_bytes_per_sec_.load() <= 0 || (!force_bg && num_hard_links > 1)) {
96
26.5k
    Status s = DeleteFileImmediately(file_path, /*accounted=*/false);
97
26.5k
    if (s.ok()) {
98
26.5k
      ROCKS_LOG_INFO(info_log_,
99
26.5k
                     "Deleted file %s immediately, rate_bytes_per_sec %" PRIi64,
100
26.5k
                     file_path.c_str(), rate_bytes_per_sec_.load());
101
26.5k
    }
102
26.5k
    return s;
103
26.5k
  }
104
0
  return AddFileToDeletionQueue(file_path, dir_to_sync, bucket,
105
0
                                /*accounted=*/false);
106
26.5k
}
107
108
Status DeleteScheduler::DeleteFileImmediately(const std::string& file_path,
109
85.2k
                                              bool accounted) {
110
85.2k
  TEST_SYNC_POINT("DeleteScheduler::DeleteFile");
111
85.2k
  TEST_SYNC_POINT_CALLBACK("DeleteScheduler::DeleteFile::cb",
112
85.2k
                           const_cast<std::string*>(&file_path));
113
85.2k
  Status s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
114
85.2k
  if (s.ok()) {
115
85.2k
    s = OnDeleteFile(file_path, accounted);
116
85.2k
    InstrumentedMutexLock l(&mu_);
117
85.2k
    RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
118
85.2k
  }
119
85.2k
  return s;
120
85.2k
}
121
122
Status DeleteScheduler::AddFileToDeletionQueue(const std::string& file_path,
123
                                               const std::string& dir_to_sync,
124
                                               std::optional<int32_t> bucket,
125
0
                                               bool accounted) {
126
  // Move file to trash
127
0
  std::string trash_file;
128
0
  Status s = MarkAsTrash(file_path, accounted, &trash_file);
129
0
  ROCKS_LOG_INFO(info_log_, "Mark file: %s as trash -- %s", trash_file.c_str(),
130
0
                 s.ToString().c_str());
131
132
0
  if (!s.ok()) {
133
0
    IGNORE_STATUS_IF_ERROR(s);
134
0
    ROCKS_LOG_ERROR(info_log_, "Failed to mark %s as trash -- %s",
135
0
                    file_path.c_str(), s.ToString().c_str());
136
0
    s = fs_->DeleteFile(file_path, IOOptions(), nullptr);
137
0
    if (s.ok()) {
138
0
      s = OnDeleteFile(file_path, accounted);
139
0
      ROCKS_LOG_INFO(info_log_, "Deleted file %s immediately",
140
0
                     trash_file.c_str());
141
0
      InstrumentedMutexLock l(&mu_);
142
0
      RecordTick(stats_.get(), FILES_DELETED_IMMEDIATELY);
143
0
    }
144
0
    return s;
145
0
  }
146
147
  // Update the total trash size
148
0
  if (accounted) {
149
0
    uint64_t trash_file_size = 0;
150
0
    IOStatus io_s =
151
0
        fs_->GetFileSize(trash_file, IOOptions(), &trash_file_size, nullptr);
152
0
    if (io_s.ok()) {
153
0
      total_trash_size_.fetch_add(trash_file_size);
154
0
    }
155
0
    IGNORE_STATUS_IF_ERROR(s);
156
0
  }
157
  //**TODO: What should we do if we failed to
158
  // get the file size?
159
160
  // Add file to delete queue
161
0
  {
162
0
    InstrumentedMutexLock l(&mu_);
163
0
    RecordTick(stats_.get(), FILES_MARKED_TRASH);
164
0
    queue_.emplace(trash_file, dir_to_sync, accounted, bucket);
165
0
    pending_files_++;
166
0
    if (bucket.has_value()) {
167
0
      auto iter = pending_files_in_buckets_.find(bucket.value());
168
0
      assert(iter != pending_files_in_buckets_.end());
169
0
      if (iter != pending_files_in_buckets_.end()) {
170
0
        iter->second++;
171
0
      }
172
0
    }
173
0
    if (pending_files_ == 1) {
174
0
      cv_.SignalAll();
175
0
    }
176
0
  }
177
0
  return s;
178
0
}
179
180
0
std::map<std::string, Status> DeleteScheduler::GetBackgroundErrors() {
181
0
  InstrumentedMutexLock l(&mu_);
182
0
  return bg_errors_;
183
0
}
184
185
const std::string DeleteScheduler::kTrashExtension = ".trash";
186
1.21M
bool DeleteScheduler::IsTrashFile(const std::string& file_path) {
187
1.21M
  return (file_path.size() >= kTrashExtension.size() &&
188
1.21M
          file_path.rfind(kTrashExtension) ==
189
1.09M
              file_path.size() - kTrashExtension.size());
190
1.21M
}
191
192
Status DeleteScheduler::CleanupDirectory(Env* env, SstFileManagerImpl* sfm,
193
58.3k
                                         const std::string& path) {
194
58.3k
  Status s;
195
  // Check if there are any files marked as trash in this path
196
58.3k
  std::vector<std::string> files_in_path;
197
58.3k
  const auto& fs = env->GetFileSystem();
198
58.3k
  IOOptions io_opts;
199
58.3k
  io_opts.do_not_recurse = true;
200
58.3k
  s = fs->GetChildren(path, io_opts, &files_in_path,
201
58.3k
                      /*IODebugContext*=*/nullptr);
202
58.3k
  if (!s.ok()) {
203
0
    return s;
204
0
  }
205
1.21M
  for (const std::string& current_file : files_in_path) {
206
1.21M
    if (!DeleteScheduler::IsTrashFile(current_file)) {
207
      // not a trash file, skip
208
1.21M
      continue;
209
1.21M
    }
210
211
0
    Status file_delete;
212
0
    std::string trash_file = path + "/" + current_file;
213
0
    if (sfm) {
214
      // We have an SstFileManager that will schedule the file delete
215
0
      s = sfm->OnAddFile(trash_file);
216
0
      file_delete = sfm->ScheduleFileDeletion(trash_file, path);
217
0
    } else {
218
      // Delete the file immediately
219
0
      file_delete = env->DeleteFile(trash_file);
220
0
    }
221
222
0
    if (s.ok() && !file_delete.ok()) {
223
0
      s = file_delete;
224
0
    }
225
0
  }
226
227
58.3k
  return s;
228
58.3k
}
229
230
Status DeleteScheduler::MarkAsTrash(const std::string& file_path,
231
0
                                    bool accounted, std::string* trash_file) {
232
  // Sanity check of the path
233
0
  size_t idx = file_path.rfind('/');
234
0
  if (idx == std::string::npos || idx == file_path.size() - 1) {
235
0
    return Status::InvalidArgument("file_path is corrupted");
236
0
  }
237
238
0
  if (DeleteScheduler::IsTrashFile(file_path)) {
239
    // This is already a trash file
240
0
    *trash_file = file_path;
241
0
    return Status::OK();
242
0
  }
243
244
0
  *trash_file = file_path + kTrashExtension;
245
  // TODO(tec) : Implement Env::RenameFileIfNotExist and remove
246
  //             file_move_mu mutex.
247
0
  int cnt = 0;
248
0
  Status s;
249
0
  InstrumentedMutexLock l(&file_move_mu_);
250
0
  while (true) {
251
0
    s = fs_->FileExists(*trash_file, IOOptions(), nullptr);
252
0
    if (s.IsNotFound()) {
253
      // We found a path for our file in trash
254
0
      s = fs_->RenameFile(file_path, *trash_file, IOOptions(), nullptr);
255
0
      break;
256
0
    } else if (s.ok()) {
257
      // Name conflict, generate new random suffix
258
0
      *trash_file = file_path + std::to_string(cnt) + kTrashExtension;
259
0
    } else {
260
      // Error during FileExists call, we cannot continue
261
0
      break;
262
0
    }
263
0
    cnt++;
264
0
  }
265
0
  if (s.ok() && accounted) {
266
0
    s = sst_file_manager_->OnMoveFile(file_path, *trash_file);
267
0
  }
268
0
  return s;
269
0
}
270
271
0
void DeleteScheduler::BackgroundEmptyTrash() {
272
0
  TEST_SYNC_POINT("DeleteScheduler::BackgroundEmptyTrash");
273
274
0
  while (true) {
275
0
    InstrumentedMutexLock l(&mu_);
276
0
    while (queue_.empty() && !closing_) {
277
0
      cv_.Wait();
278
0
    }
279
280
0
    if (closing_) {
281
0
      return;
282
0
    }
283
284
    // Delete all files in queue_
285
0
    uint64_t start_time = clock_->NowMicros();
286
0
    uint64_t total_deleted_bytes = 0;
287
0
    int64_t current_delete_rate = rate_bytes_per_sec_.load();
288
0
    while (!queue_.empty() && !closing_) {
289
      // Satisfy static analysis.
290
0
      std::optional<int32_t> bucket = std::nullopt;
291
0
      if (current_delete_rate != rate_bytes_per_sec_.load()) {
292
        // User changed the delete rate
293
0
        current_delete_rate = rate_bytes_per_sec_.load();
294
0
        start_time = clock_->NowMicros();
295
0
        total_deleted_bytes = 0;
296
0
        ROCKS_LOG_INFO(info_log_, "rate_bytes_per_sec is changed to %" PRIi64,
297
0
                       current_delete_rate);
298
0
      }
299
300
      // Get new file to delete
301
0
      const FileAndDir& fad = queue_.front();
302
0
      std::string path_in_trash = fad.fname;
303
0
      std::string dir_to_sync = fad.dir;
304
0
      bool accounted = fad.accounted;
305
0
      bucket = fad.bucket;
306
307
      // We don't need to hold the lock while deleting the file
308
0
      mu_.Unlock();
309
0
      uint64_t deleted_bytes = 0;
310
0
      bool is_complete = true;
311
      // Delete file from trash and update total_penlty value
312
0
      Status s = DeleteTrashFile(path_in_trash, dir_to_sync, accounted,
313
0
                                 &deleted_bytes, &is_complete);
314
0
      total_deleted_bytes += deleted_bytes;
315
0
      mu_.Lock();
316
0
      if (is_complete) {
317
0
        RecordTick(stats_.get(), FILES_DELETED_FROM_TRASH_QUEUE);
318
0
        queue_.pop();
319
0
      }
320
321
0
      if (!s.ok()) {
322
0
        bg_errors_[path_in_trash] = s;
323
0
      }
324
325
      // Apply penalty if necessary
326
0
      uint64_t total_penalty;
327
0
      if (current_delete_rate > 0) {
328
        // rate limiting is enabled
329
0
        total_penalty =
330
0
            ((total_deleted_bytes * kMicrosInSecond) / current_delete_rate);
331
0
        ROCKS_LOG_INFO(info_log_,
332
0
                       "Rate limiting is enabled with penalty %" PRIu64
333
0
                       " after deleting file %s",
334
0
                       total_penalty, path_in_trash.c_str());
335
0
        while (!closing_ && !cv_.TimedWait(start_time + total_penalty)) {
336
0
        }
337
0
      } else {
338
        // rate limiting is disabled
339
0
        total_penalty = 0;
340
0
        ROCKS_LOG_INFO(info_log_,
341
0
                       "Rate limiting is disabled after deleting file %s",
342
0
                       path_in_trash.c_str());
343
0
      }
344
0
      TEST_SYNC_POINT_CALLBACK("DeleteScheduler::BackgroundEmptyTrash:Wait",
345
0
                               &total_penalty);
346
347
0
      int32_t pending_files_in_bucket = std::numeric_limits<int32_t>::max();
348
0
      if (is_complete) {
349
0
        pending_files_--;
350
0
        if (bucket.has_value()) {
351
0
          auto iter = pending_files_in_buckets_.find(bucket.value());
352
0
          assert(iter != pending_files_in_buckets_.end());
353
0
          if (iter != pending_files_in_buckets_.end()) {
354
0
            pending_files_in_bucket = iter->second--;
355
0
          }
356
0
        }
357
0
      }
358
0
      if (pending_files_ == 0 || pending_files_in_bucket == 0) {
359
        // Unblock WaitForEmptyTrash or WaitForEmptyTrashBucket since there are
360
        // no more files waiting to be deleted
361
0
        cv_.SignalAll();
362
0
      }
363
0
    }
364
0
  }
365
0
}
366
367
Status DeleteScheduler::DeleteTrashFile(const std::string& path_in_trash,
368
                                        const std::string& dir_to_sync,
369
                                        bool accounted, uint64_t* deleted_bytes,
370
0
                                        bool* is_complete) {
371
0
  uint64_t file_size;
372
0
  Status s = fs_->GetFileSize(path_in_trash, IOOptions(), &file_size, nullptr);
373
0
  *is_complete = true;
374
0
  TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:DeleteFile");
375
0
  TEST_SYNC_POINT_CALLBACK("DeleteScheduler::DeleteTrashFile::cb",
376
0
                           const_cast<std::string*>(&path_in_trash));
377
0
  if (s.ok()) {
378
0
    bool need_full_delete = true;
379
0
    if (bytes_max_delete_chunk_ != 0 && file_size > bytes_max_delete_chunk_) {
380
0
      uint64_t num_hard_links = 2;
381
      // We don't have to worry aobut data race between linking a new
382
      // file after the number of file link check and ftruncte because
383
      // the file is now in trash and no hardlink is supposed to create
384
      // to trash files by RocksDB.
385
0
      Status my_status = fs_->NumFileLinks(path_in_trash, IOOptions(),
386
0
                                           &num_hard_links, nullptr);
387
0
      if (my_status.ok()) {
388
0
        if (num_hard_links == 1) {
389
0
          std::unique_ptr<FSWritableFile> wf;
390
0
          my_status = fs_->ReopenWritableFile(path_in_trash, FileOptions(), &wf,
391
0
                                              nullptr);
392
0
          if (my_status.ok()) {
393
0
            my_status = wf->Truncate(file_size - bytes_max_delete_chunk_,
394
0
                                     IOOptions(), nullptr);
395
0
            if (my_status.ok()) {
396
0
              TEST_SYNC_POINT("DeleteScheduler::DeleteTrashFile:Fsync");
397
0
              my_status = wf->Fsync(IOOptions(), nullptr);
398
0
            }
399
0
          }
400
0
          if (my_status.ok()) {
401
0
            *deleted_bytes = bytes_max_delete_chunk_;
402
0
            need_full_delete = false;
403
0
            *is_complete = false;
404
0
          } else {
405
0
            ROCKS_LOG_WARN(info_log_,
406
0
                           "Failed to partially delete %s from trash -- %s",
407
0
                           path_in_trash.c_str(), my_status.ToString().c_str());
408
0
          }
409
0
        } else {
410
0
          ROCKS_LOG_INFO(info_log_,
411
0
                         "Cannot delete %s slowly through ftruncate from trash "
412
0
                         "as it has other links",
413
0
                         path_in_trash.c_str());
414
0
        }
415
0
      } else if (!num_link_error_printed_) {
416
0
        ROCKS_LOG_INFO(
417
0
            info_log_,
418
0
            "Cannot delete files slowly through ftruncate from trash "
419
0
            "as Env::NumFileLinks() returns error: %s",
420
0
            my_status.ToString().c_str());
421
0
        num_link_error_printed_ = true;
422
0
      }
423
0
    }
424
425
0
    if (need_full_delete) {
426
0
      s = fs_->DeleteFile(path_in_trash, IOOptions(), nullptr);
427
0
      if (!dir_to_sync.empty()) {
428
0
        std::unique_ptr<FSDirectory> dir_obj;
429
0
        if (s.ok()) {
430
0
          s = fs_->NewDirectory(dir_to_sync, IOOptions(), &dir_obj, nullptr);
431
0
        }
432
0
        if (s.ok()) {
433
0
          s = dir_obj->FsyncWithDirOptions(
434
0
              IOOptions(), nullptr,
435
0
              DirFsyncOptions(DirFsyncOptions::FsyncReason::kFileDeleted));
436
0
          TEST_SYNC_POINT_CALLBACK(
437
0
              "DeleteScheduler::DeleteTrashFile::AfterSyncDir",
438
0
              static_cast<void*>(const_cast<std::string*>(&dir_to_sync)));
439
0
        }
440
0
      }
441
0
      if (s.ok()) {
442
0
        *deleted_bytes = file_size;
443
0
        s = OnDeleteFile(path_in_trash, accounted);
444
0
      }
445
0
    }
446
0
  }
447
0
  if (!s.ok()) {
448
    // Error while getting file size or while deleting
449
0
    ROCKS_LOG_ERROR(info_log_, "Failed to delete %s from trash -- %s",
450
0
                    path_in_trash.c_str(), s.ToString().c_str());
451
0
    *deleted_bytes = 0;
452
0
  } else {
453
0
    if (accounted) {
454
0
      total_trash_size_.fetch_sub(*deleted_bytes);
455
0
    }
456
0
  }
457
458
0
  return s;
459
0
}
460
461
Status DeleteScheduler::OnDeleteFile(const std::string& file_path,
462
85.2k
                                     bool accounted) {
463
85.2k
  if (accounted) {
464
58.6k
    return sst_file_manager_->OnDeleteFile(file_path);
465
58.6k
  }
466
26.5k
  TEST_SYNC_POINT_CALLBACK("DeleteScheduler::OnDeleteFile",
467
26.5k
                           const_cast<std::string*>(&file_path));
468
26.5k
  return Status::OK();
469
85.2k
}
470
471
0
void DeleteScheduler::WaitForEmptyTrash() {
472
0
  InstrumentedMutexLock l(&mu_);
473
0
  while (pending_files_ > 0 && !closing_) {
474
0
    cv_.Wait();
475
0
  }
476
0
}
477
478
0
std::optional<int32_t> DeleteScheduler::NewTrashBucket() {
479
0
  if (rate_bytes_per_sec_.load() <= 0) {
480
0
    return std::nullopt;
481
0
  }
482
0
  InstrumentedMutexLock l(&mu_);
483
0
  int32_t bucket_number = next_trash_bucket_++;
484
0
  pending_files_in_buckets_.emplace(bucket_number, 0);
485
0
  return bucket_number;
486
0
}
487
488
0
void DeleteScheduler::WaitForEmptyTrashBucket(int32_t bucket) {
489
0
  InstrumentedMutexLock l(&mu_);
490
0
  if (bucket >= next_trash_bucket_) {
491
0
    return;
492
0
  }
493
0
  auto iter = pending_files_in_buckets_.find(bucket);
494
0
  while (iter != pending_files_in_buckets_.end() && iter->second > 0 &&
495
0
         !closing_) {
496
0
    cv_.Wait();
497
0
    iter = pending_files_in_buckets_.find(bucket);
498
0
  }
499
0
  pending_files_in_buckets_.erase(bucket);
500
0
}
501
502
68.5k
void DeleteScheduler::MaybeCreateBackgroundThread() {
503
68.5k
  if (bg_thread_ == nullptr && rate_bytes_per_sec_.load() > 0) {
504
0
    bg_thread_.reset(
505
0
        new port::Thread(&DeleteScheduler::BackgroundEmptyTrash, this));
506
0
    ROCKS_LOG_INFO(info_log_,
507
0
                   "Created background thread for deletion scheduler with "
508
0
                   "rate_bytes_per_sec: %" PRIi64,
509
0
                   rate_bytes_per_sec_.load());
510
0
  }
511
68.5k
}
512
513
}  // namespace ROCKSDB_NAMESPACE