Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/file/writable_file_writer.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "file/writable_file_writer.h"
11
12
#include <algorithm>
13
#include <mutex>
14
15
#include "db/version_edit.h"
16
#include "file/file_util.h"
17
#include "monitoring/histogram.h"
18
#include "monitoring/iostats_context_imp.h"
19
#include "port/port.h"
20
#include "rocksdb/io_status.h"
21
#include "rocksdb/system_clock.h"
22
#include "test_util/sync_point.h"
23
#include "util/crc32c.h"
24
#include "util/random.h"
25
#include "util/rate_limiter_impl.h"
26
27
namespace ROCKSDB_NAMESPACE {
28
inline Histograms GetFileWriteHistograms(Histograms file_writer_hist,
29
27.1M
                                         Env::IOActivity io_activity) {
30
27.1M
  if (file_writer_hist == Histograms::SST_WRITE_MICROS ||
31
26.9M
      file_writer_hist == Histograms::BLOB_DB_BLOB_FILE_WRITE_MICROS) {
32
199k
    switch (io_activity) {
33
18.8k
      case Env::IOActivity::kFlush:
34
18.8k
        return Histograms::FILE_WRITE_FLUSH_MICROS;
35
15.2k
      case Env::IOActivity::kCompaction:
36
15.2k
        return Histograms::FILE_WRITE_COMPACTION_MICROS;
37
165k
      case Env::IOActivity::kDBOpen:
38
165k
        return Histograms::FILE_WRITE_DB_OPEN_MICROS;
39
0
      default:
40
0
        break;
41
199k
    }
42
199k
  }
43
26.9M
  return Histograms::HISTOGRAM_ENUM_MAX;
44
27.1M
}
45
46
IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
47
                                    const std::string& fname,
48
                                    const FileOptions& file_opts,
49
                                    std::unique_ptr<WritableFileWriter>* writer,
50
0
                                    IODebugContext* dbg) {
51
0
  if (file_opts.use_direct_writes &&
52
0
      0 == file_opts.writable_file_max_buffer_size) {
53
0
    return IOStatus::InvalidArgument(
54
0
        "Direct write requires writable_file_max_buffer_size > 0");
55
0
  }
56
0
  std::unique_ptr<FSWritableFile> file;
57
0
  IOStatus io_s = fs->NewWritableFile(fname, file_opts, &file, dbg);
58
0
  if (io_s.ok()) {
59
0
    writer->reset(new WritableFileWriter(std::move(file), fname, file_opts));
60
0
  }
61
0
  return io_s;
62
0
}
63
64
IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
65
27.1M
                                    uint32_t crc32c_checksum) {
66
27.1M
  if (seen_error()) {
67
0
    return GetWriterHasPreviousErrorStatus();
68
0
  }
69
70
27.1M
  StopWatch sw(clock_, stats_, hist_type_,
71
27.1M
               GetFileWriteHistograms(hist_type_, opts.io_activity));
72
73
27.1M
  const IOOptions io_options = FinalizeIOOptions(opts);
74
27.1M
  const char* src = data.data();
75
27.1M
  size_t left = data.size();
76
27.1M
  IOStatus s;
77
27.1M
  pending_sync_ = true;
78
79
27.1M
  TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Append:0", REDUCE_ODDS2);
80
81
  // Calculate the checksum of appended data
82
27.1M
  UpdateFileChecksum(data);
83
84
27.1M
  {
85
27.1M
    IOSTATS_TIMER_GUARD(prepare_write_nanos);
86
27.1M
    TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
87
27.1M
    writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
88
27.1M
                                 io_options, nullptr);
89
27.1M
  }
90
91
  // See whether we need to enlarge the buffer to avoid the flush
92
27.1M
  if (buf_.Capacity() - buf_.CurrentSize() < left) {
93
1.39k
    for (size_t cap = buf_.Capacity();
94
1.74k
         cap < max_buffer_size_;  // There is still room to increase
95
1.74k
         cap *= 2) {
96
      // See whether the next available size is large enough.
97
      // Buffer will never be increased to more than max_buffer_size_.
98
1.74k
      size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
99
1.74k
      if (desired_capacity - buf_.CurrentSize() >= left ||
100
1.39k
          (use_direct_io() && desired_capacity == max_buffer_size_)) {
101
1.39k
        buf_.AllocateNewBuffer(desired_capacity, true);
102
1.39k
        break;
103
1.39k
      }
104
1.74k
    }
105
1.39k
  }
106
107
  // Flush only when buffered I/O
108
27.1M
  if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
109
0
    if (buf_.CurrentSize() > 0) {
110
0
      if (!buffered_data_with_checksum_) {
111
        // If we're not calculating checksum of buffered data, fill the
112
        // buffer before flushing so that the writes are aligned. This will
113
        // benefit file system performance.
114
0
        size_t appended = buf_.Append(src, left);
115
0
        left -= appended;
116
0
        src += appended;
117
0
      }
118
0
      s = Flush(io_options);
119
0
      if (!s.ok()) {
120
0
        set_seen_error(s);
121
0
        return s;
122
0
      }
123
0
    }
124
0
    assert(buf_.CurrentSize() == 0);
125
0
  }
126
127
27.1M
  if (perform_data_verification_ && buffered_data_with_checksum_ &&
128
0
      crc32c_checksum != 0) {
129
    // Since we want to use the checksum of the input data, we cannot break it
130
    // into several pieces. We will only write them in the buffer when buffer
131
    // size is enough. Otherwise, we will directly write it down.
132
0
    if (use_direct_io() || (buf_.Capacity() - buf_.CurrentSize()) >= left) {
133
0
      if ((buf_.Capacity() - buf_.CurrentSize()) >= left) {
134
0
        size_t appended = buf_.Append(src, left);
135
0
        if (appended != left) {
136
0
          s = IOStatus::Corruption("Write buffer append failure");
137
0
        }
138
0
        buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
139
0
            buffered_data_crc32c_checksum_, crc32c_checksum, appended);
140
0
      } else {
141
0
        while (left > 0) {
142
0
          size_t appended = buf_.Append(src, left);
143
0
          buffered_data_crc32c_checksum_ =
144
0
              crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
145
0
          left -= appended;
146
0
          src += appended;
147
148
0
          if (left > 0) {
149
0
            s = Flush(io_options);
150
0
            if (!s.ok()) {
151
0
              break;
152
0
            }
153
0
          }
154
0
        }
155
0
      }
156
0
    } else {
157
0
      assert(buf_.CurrentSize() == 0);
158
0
      buffered_data_crc32c_checksum_ = crc32c_checksum;
159
0
      s = WriteBufferedWithChecksum(io_options, src, left);
160
0
    }
161
27.1M
  } else {
162
    // In this case, either we do not need to do the data verification or
163
    // caller does not provide the checksum of the data (crc32c_checksum = 0).
164
    //
165
    // We never write directly to disk with direct I/O on.
166
    // or we simply use it for its original purpose to accumulate many small
167
    // chunks
168
27.1M
    if (use_direct_io() || (buf_.Capacity() >= left)) {
169
54.1M
      while (left > 0) {
170
27.0M
        size_t appended = buf_.Append(src, left);
171
27.0M
        if (perform_data_verification_ && buffered_data_with_checksum_) {
172
0
          buffered_data_crc32c_checksum_ =
173
0
              crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
174
0
        }
175
27.0M
        left -= appended;
176
27.0M
        src += appended;
177
178
27.0M
        if (left > 0) {
179
0
          s = Flush(io_options);
180
0
          if (!s.ok()) {
181
0
            break;
182
0
          }
183
0
        }
184
27.0M
      }
185
18.4E
    } else {
186
      // Writing directly to file bypassing the buffer
187
18.4E
      assert(buf_.CurrentSize() == 0);
188
18.4E
      if (perform_data_verification_ && buffered_data_with_checksum_) {
189
0
        buffered_data_crc32c_checksum_ = crc32c::Value(src, left);
190
0
        s = WriteBufferedWithChecksum(io_options, src, left);
191
18.4E
      } else {
192
18.4E
        s = WriteBuffered(io_options, src, left);
193
18.4E
      }
194
18.4E
    }
195
27.1M
  }
196
197
27.1M
  TEST_KILL_RANDOM("WritableFileWriter::Append:1");
198
27.1M
  if (s.ok()) {
199
27.1M
    uint64_t cur_size = filesize_.load(std::memory_order_acquire);
200
27.1M
    filesize_.store(cur_size + data.size(), std::memory_order_release);
201
18.4E
  } else {
202
18.4E
    set_seen_error(s);
203
18.4E
  }
204
27.1M
  return s;
205
27.1M
}
206
207
IOStatus WritableFileWriter::Pad(const IOOptions& opts, const size_t pad_bytes,
208
0
                                 const size_t max_pad_size) {
209
0
  (void)max_pad_size;
210
0
  if (seen_error()) {
211
0
    return GetWriterHasPreviousErrorStatus();
212
0
  }
213
0
  const IOOptions io_options = FinalizeIOOptions(opts);
214
0
  assert(pad_bytes < max_pad_size);
215
0
  size_t left = pad_bytes;
216
0
  size_t cap = buf_.Capacity() - buf_.CurrentSize();
217
218
  // Assume pad_bytes is small compared to buf_ capacity. So we always
219
  // use buf_ rather than write directly to file in certain cases like
220
  // Append() does.
221
0
  while (left) {
222
0
    size_t append_bytes = std::min(cap, left);
223
0
    buf_.PadWith(append_bytes, 0);
224
0
    left -= append_bytes;
225
226
0
    Slice data(buf_.BufferStart() + buf_.CurrentSize() - append_bytes,
227
0
               append_bytes);
228
0
    UpdateFileChecksum(data);
229
0
    if (perform_data_verification_) {
230
0
      buffered_data_crc32c_checksum_ = crc32c::Extend(
231
0
          buffered_data_crc32c_checksum_,
232
0
          buf_.BufferStart() + buf_.CurrentSize() - append_bytes, append_bytes);
233
0
    }
234
235
0
    if (left > 0) {
236
0
      IOStatus s = Flush(io_options);
237
0
      if (!s.ok()) {
238
0
        set_seen_error(s);
239
0
        return s;
240
0
      }
241
0
    }
242
0
    cap = buf_.Capacity() - buf_.CurrentSize();
243
0
  }
244
0
  pending_sync_ = true;
245
0
  uint64_t cur_size = filesize_.load(std::memory_order_acquire);
246
0
  filesize_.store(cur_size + pad_bytes, std::memory_order_release);
247
248
0
  return IOStatus::OK();
249
0
}
250
251
492k
IOStatus WritableFileWriter::Close(const IOOptions& opts) {
252
492k
  IOOptions io_options = FinalizeIOOptions(opts);
253
492k
  if (seen_error()) {
254
0
    IOStatus interim;
255
0
    if (writable_file_.get() != nullptr) {
256
0
      interim = writable_file_->Close(io_options, nullptr);
257
0
      writable_file_.reset();
258
0
    }
259
0
    if (interim.ok()) {
260
0
      return IOStatus::IOError(
261
0
          "File is closed but data not flushed as writer has previous error.");
262
0
    } else {
263
0
      return interim;
264
0
    }
265
0
  }
266
267
  // Do not quit immediately on failure the file MUST be closed
268
269
  // Possible to close it twice now as we MUST close
270
  // in __dtor, simply flushing is not enough
271
  // Windows when pre-allocating does not fill with zeros
272
  // also with unbuffered access we also set the end of data.
273
492k
  if (writable_file_.get() == nullptr) {
274
217k
    return IOStatus::OK();
275
217k
  }
276
277
275k
  IOStatus s;
278
275k
  s = Flush(io_options);  // flush cache to OS
279
280
275k
  IOStatus interim;
281
  // In direct I/O mode we write whole pages so
282
  // we need to let the file know where data ends.
283
275k
  if (use_direct_io()) {
284
0
    {
285
0
      FileOperationInfo::StartTimePoint start_ts;
286
0
      if (ShouldNotifyListeners()) {
287
0
        start_ts = FileOperationInfo::StartNow();
288
0
      }
289
0
      uint64_t filesz = filesize_.load(std::memory_order_acquire);
290
0
      interim = writable_file_->Truncate(filesz, io_options, nullptr);
291
0
      if (ShouldNotifyListeners()) {
292
0
        auto finish_ts = FileOperationInfo::FinishNow();
293
0
        NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
294
0
        if (!interim.ok()) {
295
0
          NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(),
296
0
                          filesz);
297
0
        }
298
0
      }
299
0
    }
300
0
    if (interim.ok()) {
301
0
      {
302
0
        FileOperationInfo::StartTimePoint start_ts;
303
0
        if (ShouldNotifyListeners()) {
304
0
          start_ts = FileOperationInfo::StartNow();
305
0
        }
306
0
        interim = writable_file_->Fsync(io_options, nullptr);
307
0
        if (ShouldNotifyListeners()) {
308
0
          auto finish_ts = FileOperationInfo::FinishNow();
309
0
          NotifyOnFileSyncFinish(start_ts, finish_ts, s,
310
0
                                 FileOperationType::kFsync);
311
0
          if (!interim.ok()) {
312
0
            NotifyOnIOError(interim, FileOperationType::kFsync, file_name());
313
0
          }
314
0
        }
315
0
      }
316
0
    }
317
0
    if (!interim.ok() && s.ok()) {
318
0
      s = interim;
319
0
    }
320
0
  }
321
322
275k
  TEST_KILL_RANDOM("WritableFileWriter::Close:0");
323
275k
  {
324
275k
    FileOperationInfo::StartTimePoint start_ts;
325
275k
    if (ShouldNotifyListeners()) {
326
0
      start_ts = FileOperationInfo::StartNow();
327
0
    }
328
275k
    interim = writable_file_->Close(io_options, nullptr);
329
275k
    if (ShouldNotifyListeners()) {
330
0
      auto finish_ts = FileOperationInfo::FinishNow();
331
0
      NotifyOnFileCloseFinish(start_ts, finish_ts, s);
332
0
      if (!interim.ok()) {
333
0
        NotifyOnIOError(interim, FileOperationType::kClose, file_name());
334
0
      }
335
0
    }
336
275k
  }
337
275k
  if (!interim.ok() && s.ok()) {
338
0
    s = interim;
339
0
  }
340
341
275k
  writable_file_.reset();
342
275k
  TEST_KILL_RANDOM("WritableFileWriter::Close:1");
343
344
275k
  if (s.ok()) {
345
275k
    if (checksum_generator_ != nullptr && !checksum_finalized_) {
346
0
      checksum_generator_->Finalize();
347
0
      checksum_finalized_ = true;
348
0
    }
349
275k
  } else {
350
0
    set_seen_error(s);
351
0
  }
352
353
275k
  return s;
354
492k
}
355
356
// write out the cached data to the OS cache or storage if direct I/O
357
// enabled
358
1.99M
IOStatus WritableFileWriter::Flush(const IOOptions& opts) {
359
1.99M
  if (seen_error()) {
360
0
    return GetWriterHasPreviousErrorStatus();
361
0
  }
362
363
1.99M
  const IOOptions io_options = FinalizeIOOptions(opts);
364
365
1.99M
  IOStatus s;
366
1.99M
  TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
367
368
1.99M
  if (buf_.CurrentSize() > 0) {
369
1.51M
    if (use_direct_io()) {
370
0
      if (pending_sync_) {
371
0
        if (perform_data_verification_ && buffered_data_with_checksum_) {
372
0
          s = WriteDirectWithChecksum(io_options);
373
0
        } else {
374
0
          s = WriteDirect(io_options);
375
0
        }
376
0
      }
377
1.51M
    } else {
378
1.51M
      if (perform_data_verification_ && buffered_data_with_checksum_) {
379
0
        s = WriteBufferedWithChecksum(io_options, buf_.BufferStart(),
380
0
                                      buf_.CurrentSize());
381
1.51M
      } else {
382
1.51M
        s = WriteBuffered(io_options, buf_.BufferStart(), buf_.CurrentSize());
383
1.51M
      }
384
1.51M
    }
385
1.51M
    if (!s.ok()) {
386
0
      set_seen_error(s);
387
0
      return s;
388
0
    }
389
1.51M
  }
390
391
1.99M
  {
392
1.99M
    FileOperationInfo::StartTimePoint start_ts;
393
1.99M
    if (ShouldNotifyListeners()) {
394
0
      start_ts = FileOperationInfo::StartNow();
395
0
    }
396
1.99M
    s = writable_file_->Flush(io_options, nullptr);
397
1.99M
    if (ShouldNotifyListeners()) {
398
0
      auto finish_ts = std::chrono::steady_clock::now();
399
0
      NotifyOnFileFlushFinish(start_ts, finish_ts, s);
400
0
      if (!s.ok()) {
401
0
        NotifyOnIOError(s, FileOperationType::kFlush, file_name());
402
0
      }
403
0
    }
404
1.99M
  }
405
406
1.99M
  if (!s.ok()) {
407
0
    set_seen_error(s);
408
0
    return s;
409
0
  }
410
411
  // sync OS cache to disk for every bytes_per_sync_
412
  // TODO: give log file and sst file different options (log
413
  // files could be potentially cached in OS for their whole
414
  // life time, thus we might not want to flush at all).
415
416
  // We try to avoid sync to the last 1MB of data. For two reasons:
417
  // (1) avoid rewrite the same page that is modified later.
418
  // (2) for older version of OS, write can block while writing out
419
  //     the page.
420
  // Xfs does neighbor page flushing outside of the specified ranges. We
421
  // need to make sure sync range is far from the write offset.
422
1.99M
  if (!use_direct_io() && bytes_per_sync_) {
423
0
    const uint64_t kBytesNotSyncRange =
424
0
        1024 * 1024;                                // recent 1MB is not synced.
425
0
    const uint64_t kBytesAlignWhenSync = 4 * 1024;  // Align 4KB.
426
0
    uint64_t cur_size = filesize_.load(std::memory_order_acquire);
427
0
    if (cur_size > kBytesNotSyncRange) {
428
0
      uint64_t offset_sync_to = cur_size - kBytesNotSyncRange;
429
0
      offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
430
0
      assert(offset_sync_to >= last_sync_size_);
431
0
      if (offset_sync_to > 0 &&
432
0
          offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
433
0
        s = RangeSync(io_options, last_sync_size_,
434
0
                      offset_sync_to - last_sync_size_);
435
0
        if (!s.ok()) {
436
0
          set_seen_error(s);
437
0
        }
438
0
        last_sync_size_ = offset_sync_to;
439
0
      }
440
0
    }
441
0
  }
442
443
1.99M
  return s;
444
1.99M
}
445
446
21.1k
std::string WritableFileWriter::GetFileChecksum() {
447
21.1k
  if (checksum_generator_ != nullptr) {
448
0
    assert(checksum_finalized_);
449
0
    return checksum_generator_->GetChecksum();
450
21.1k
  } else {
451
21.1k
    return kUnknownFileChecksum;
452
21.1k
  }
453
21.1k
}
454
455
21.1k
const char* WritableFileWriter::GetFileChecksumFuncName() const {
456
21.1k
  if (checksum_generator_ != nullptr) {
457
0
    return checksum_generator_->Name();
458
21.1k
  } else {
459
21.1k
    return kUnknownFileChecksumFuncName;
460
21.1k
  }
461
21.1k
}
462
463
IOStatus WritableFileWriter::PrepareIOOptions(const WriteOptions& wo,
464
2.46M
                                              IOOptions& opts) {
465
2.46M
  return PrepareIOFromWriteOptions(wo, opts);
466
2.46M
}
467
468
210k
IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) {
469
210k
  if (seen_error()) {
470
0
    return GetWriterHasPreviousErrorStatus();
471
0
  }
472
473
210k
  IOOptions io_options = FinalizeIOOptions(opts);
474
210k
  IOStatus s = Flush(io_options);
475
210k
  if (!s.ok()) {
476
0
    set_seen_error(s);
477
0
    return s;
478
0
  }
479
210k
  TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
480
210k
  if (!use_direct_io() && pending_sync_) {
481
210k
    s = SyncInternal(io_options, use_fsync);
482
210k
    if (!s.ok()) {
483
0
      set_seen_error(s);
484
0
      return s;
485
0
    }
486
210k
  }
487
210k
  TEST_KILL_RANDOM("WritableFileWriter::Sync:1");
488
210k
  pending_sync_ = false;
489
210k
  return IOStatus::OK();
490
210k
}
491
492
IOStatus WritableFileWriter::SyncWithoutFlush(const IOOptions& opts,
493
0
                                              bool use_fsync) {
494
0
  if (seen_error()) {
495
0
    return GetWriterHasPreviousErrorStatus();
496
0
  }
497
0
  IOOptions io_options = FinalizeIOOptions(opts);
498
0
  if (!writable_file_->IsSyncThreadSafe()) {
499
0
    return IOStatus::NotSupported(
500
0
        "Can't WritableFileWriter::SyncWithoutFlush() because "
501
0
        "WritableFile::IsSyncThreadSafe() is false");
502
0
  }
503
0
  TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
504
0
  IOStatus s = SyncInternal(io_options, use_fsync);
505
0
  TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
506
0
  if (!s.ok()) {
507
0
    set_seen_error(s);
508
0
  }
509
0
  return s;
510
0
}
511
512
IOStatus WritableFileWriter::SyncInternal(const IOOptions& opts,
513
210k
                                          bool use_fsync) {
514
  // Caller is supposed to check seen_error_
515
210k
  IOStatus s;
516
210k
  IOSTATS_TIMER_GUARD(fsync_nanos);
517
210k
  TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
518
210k
  auto prev_perf_level = GetPerfLevel();
519
520
210k
  IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
521
522
210k
  FileOperationInfo::StartTimePoint start_ts;
523
210k
  if (ShouldNotifyListeners()) {
524
0
    start_ts = FileOperationInfo::StartNow();
525
0
  }
526
527
210k
  if (use_fsync) {
528
86.7k
    s = writable_file_->Fsync(opts, nullptr);
529
124k
  } else {
530
124k
    s = writable_file_->Sync(opts, nullptr);
531
124k
  }
532
210k
  if (ShouldNotifyListeners()) {
533
0
    auto finish_ts = std::chrono::steady_clock::now();
534
0
    NotifyOnFileSyncFinish(
535
0
        start_ts, finish_ts, s,
536
0
        use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
537
0
    if (!s.ok()) {
538
0
      NotifyOnIOError(
539
0
          s, (use_fsync ? FileOperationType::kFsync : FileOperationType::kSync),
540
0
          file_name());
541
0
    }
542
0
  }
543
210k
  SetPerfLevel(prev_perf_level);
544
545
  // The caller will be responsible to call set_seen_error(s) if s is not OK.
546
210k
  return s;
547
210k
}
548
549
IOStatus WritableFileWriter::RangeSync(const IOOptions& opts, uint64_t offset,
550
0
                                       uint64_t nbytes) {
551
0
  if (seen_error()) {
552
0
    return GetWriterHasPreviousErrorStatus();
553
0
  }
554
555
0
  IOSTATS_TIMER_GUARD(range_sync_nanos);
556
0
  TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
557
0
  FileOperationInfo::StartTimePoint start_ts;
558
0
  if (ShouldNotifyListeners()) {
559
0
    start_ts = FileOperationInfo::StartNow();
560
0
  }
561
0
  IOStatus s = writable_file_->RangeSync(offset, nbytes, opts, nullptr);
562
0
  if (!s.ok()) {
563
0
    set_seen_error(s);
564
0
  }
565
0
  if (ShouldNotifyListeners()) {
566
0
    auto finish_ts = std::chrono::steady_clock::now();
567
0
    NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
568
0
    if (!s.ok()) {
569
0
      NotifyOnIOError(s, FileOperationType::kRangeSync, file_name(), nbytes,
570
0
                      offset);
571
0
    }
572
0
  }
573
0
  return s;
574
0
}
575
576
// This method writes to disk the specified data and makes use of the rate
577
// limiter if available
578
IOStatus WritableFileWriter::WriteBuffered(const IOOptions& opts,
579
1.51M
                                           const char* data, size_t size) {
580
1.51M
  if (seen_error()) {
581
0
    return GetWriterHasPreviousErrorStatus();
582
0
  }
583
584
1.51M
  IOStatus s;
585
1.51M
  assert(!use_direct_io());
586
1.51M
  const char* src = data;
587
1.51M
  size_t left = size;
588
1.51M
  DataVerificationInfo v_info;
589
1.51M
  char checksum_buf[sizeof(uint32_t)];
590
1.51M
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
591
592
3.02M
  while (left > 0) {
593
1.51M
    size_t allowed = left;
594
1.51M
    if (rate_limiter_ != nullptr &&
595
0
        rate_limiter_priority_used != Env::IO_TOTAL) {
596
0
      allowed = rate_limiter_->RequestToken(left, 0 /* alignment */,
597
0
                                            rate_limiter_priority_used, stats_,
598
0
                                            RateLimiter::OpType::kWrite);
599
0
    }
600
601
1.51M
    {
602
1.51M
      IOSTATS_TIMER_GUARD(write_nanos);
603
1.51M
      TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
604
605
1.51M
      FileOperationInfo::StartTimePoint start_ts;
606
1.51M
      uint64_t old_size = writable_file_->GetFileSize(opts, nullptr);
607
1.51M
      if (ShouldNotifyListeners()) {
608
0
        start_ts = FileOperationInfo::StartNow();
609
0
        old_size = next_write_offset_;
610
0
      }
611
1.51M
      {
612
1.51M
        auto prev_perf_level = GetPerfLevel();
613
614
1.51M
        IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
615
1.51M
        if (perform_data_verification_) {
616
0
          Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
617
0
          v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
618
0
          s = writable_file_->Append(Slice(src, allowed), opts, v_info,
619
0
                                     nullptr);
620
1.51M
        } else {
621
1.51M
          s = writable_file_->Append(Slice(src, allowed), opts, nullptr);
622
1.51M
        }
623
1.51M
        if (!s.ok()) {
624
          // If writable_file_->Append() failed, then the data may or may not
625
          // exist in the underlying memory buffer, OS page cache, remote file
626
          // system's buffer, etc. If WritableFileWriter keeps the data in
627
          // buf_, then a future Close() or write retry may send the data to
628
          // the underlying file again. If the data does exist in the
629
          // underlying buffer and gets written to the file eventually despite
630
          // returning error, the file may end up with two duplicate pieces of
631
          // data. Therefore, clear the buf_ at the WritableFileWriter layer
632
          // and let caller determine error handling.
633
0
          buf_.Size(0);
634
0
          buffered_data_crc32c_checksum_ = 0;
635
0
        }
636
1.51M
        SetPerfLevel(prev_perf_level);
637
1.51M
      }
638
1.51M
      if (ShouldNotifyListeners()) {
639
0
        auto finish_ts = std::chrono::steady_clock::now();
640
0
        NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
641
0
        if (!s.ok()) {
642
0
          NotifyOnIOError(s, FileOperationType::kAppend, file_name(), allowed,
643
0
                          old_size);
644
0
        }
645
0
      }
646
1.51M
      if (!s.ok()) {
647
0
        set_seen_error(s);
648
0
        return s;
649
0
      }
650
1.51M
    }
651
652
1.51M
    IOSTATS_ADD(bytes_written, allowed);
653
1.51M
    TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
654
655
1.51M
    left -= allowed;
656
1.51M
    src += allowed;
657
1.51M
    uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
658
1.51M
    flushed_size_.store(cur_size + allowed, std::memory_order_release);
659
1.51M
  }
660
1.51M
  buf_.Size(0);
661
1.51M
  buffered_data_crc32c_checksum_ = 0;
662
1.51M
  if (!s.ok()) {
663
0
    set_seen_error(s);
664
0
  }
665
1.51M
  return s;
666
1.51M
}
667
668
IOStatus WritableFileWriter::WriteBufferedWithChecksum(const IOOptions& opts,
669
                                                       const char* data,
670
0
                                                       size_t size) {
671
0
  if (seen_error()) {
672
0
    return GetWriterHasPreviousErrorStatus();
673
0
  }
674
675
0
  IOStatus s;
676
0
  assert(!use_direct_io());
677
0
  assert(perform_data_verification_ && buffered_data_with_checksum_);
678
0
  const char* src = data;
679
0
  size_t left = size;
680
0
  DataVerificationInfo v_info;
681
0
  char checksum_buf[sizeof(uint32_t)];
682
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
683
  // Check how much is allowed. Here, we loop until the rate limiter allows to
684
  // write the entire buffer.
685
  // TODO: need to be improved since it sort of defeats the purpose of the rate
686
  // limiter
687
0
  size_t data_size = left;
688
0
  if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
689
0
    while (data_size > 0) {
690
0
      size_t tmp_size;
691
0
      tmp_size =
692
0
          rate_limiter_->RequestToken(data_size, 0, rate_limiter_priority_used,
693
0
                                      stats_, RateLimiter::OpType::kWrite);
694
0
      data_size -= tmp_size;
695
0
    }
696
0
  }
697
698
0
  {
699
0
    IOSTATS_TIMER_GUARD(write_nanos);
700
0
    TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
701
702
0
    FileOperationInfo::StartTimePoint start_ts;
703
0
    uint64_t old_size = writable_file_->GetFileSize(opts, nullptr);
704
0
    if (ShouldNotifyListeners()) {
705
0
      start_ts = FileOperationInfo::StartNow();
706
0
      old_size = next_write_offset_;
707
0
    }
708
0
    {
709
0
      auto prev_perf_level = GetPerfLevel();
710
711
0
      IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
712
713
0
      EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
714
0
      v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
715
0
      s = writable_file_->Append(Slice(src, left), opts, v_info, nullptr);
716
0
      SetPerfLevel(prev_perf_level);
717
0
    }
718
0
    if (ShouldNotifyListeners()) {
719
0
      auto finish_ts = std::chrono::steady_clock::now();
720
0
      NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s);
721
0
      if (!s.ok()) {
722
0
        NotifyOnIOError(s, FileOperationType::kAppend, file_name(), left,
723
0
                        old_size);
724
0
      }
725
0
    }
726
0
    if (!s.ok()) {
727
      // If writable_file_->Append() failed, then the data may or may not
728
      // exist in the underlying memory buffer, OS page cache, remote file
729
      // system's buffer, etc. If WritableFileWriter keeps the data in
730
      // buf_, then a future Close() or write retry may send the data to
731
      // the underlying file again. If the data does exist in the
732
      // underlying buffer and gets written to the file eventually despite
733
      // returning error, the file may end up with two duplicate pieces of
734
      // data. Therefore, clear the buf_ at the WritableFileWriter layer
735
      // and let caller determine error handling.
736
0
      buf_.Size(0);
737
0
      buffered_data_crc32c_checksum_ = 0;
738
0
      set_seen_error(s);
739
0
      return s;
740
0
    }
741
0
  }
742
743
0
  IOSTATS_ADD(bytes_written, left);
744
0
  TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
745
746
  // Buffer write is successful, reset the buffer current size to 0 and reset
747
  // the corresponding checksum value
748
0
  buf_.Size(0);
749
0
  buffered_data_crc32c_checksum_ = 0;
750
0
  uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
751
0
  flushed_size_.store(cur_size + left, std::memory_order_release);
752
0
  if (!s.ok()) {
753
0
    set_seen_error(s);
754
0
  }
755
0
  return s;
756
0
}
757
758
27.1M
void WritableFileWriter::UpdateFileChecksum(const Slice& data) {
759
27.1M
  if (checksum_generator_ != nullptr) {
760
0
    checksum_generator_->Update(data.data(), data.size());
761
0
  }
762
27.1M
}
763
764
// Currently, crc32c checksum is used to calculate the checksum value of the
765
// content in the input buffer for handoff. In the future, the checksum might be
766
// calculated from the existing crc32c checksums of the in WAl and Manifest
767
// records, or even SST file blocks.
768
// TODO: effectively use the existing checksum of the data being writing to
769
// generate the crc32c checksum instead of a raw calculation.
770
void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
771
                                                          size_t size,
772
0
                                                          char* buf) {
773
0
  uint32_t v_crc32c = crc32c::Extend(0, data, size);
774
0
  EncodeFixed32(buf, v_crc32c);
775
0
}
776
777
// This flushes the accumulated data in the buffer. We pad data with zeros if
778
// necessary to the whole page.
779
// However, during automatic flushes padding would not be necessary.
780
// We always use RateLimiter if available. We move (Refit) any buffer bytes
781
// that are left over the
782
// whole number of pages to be written again on the next flush because we can
783
// only write on aligned
784
// offsets.
785
0
IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) {
786
0
  if (seen_error()) {
787
0
    assert(false);
788
789
0
    return IOStatus::IOError("Writer has previous error.");
790
0
  }
791
792
0
  assert(use_direct_io());
793
0
  IOStatus s;
794
0
  const size_t alignment = buf_.Alignment();
795
0
  assert((next_write_offset_ % alignment) == 0);
796
797
  // Calculate whole page final file advance if all writes succeed
798
0
  const size_t file_advance =
799
0
      TruncateToPageBoundary(alignment, buf_.CurrentSize());
800
801
  // Calculate the leftover tail, we write it here padded with zeros BUT we
802
  // will write it again in the future either on Close() OR when the current
803
  // whole page fills out.
804
0
  const size_t leftover_tail = buf_.CurrentSize() - file_advance;
805
806
  // Round up and pad
807
0
  buf_.PadToAlignmentWith(0);
808
809
0
  const char* src = buf_.BufferStart();
810
0
  uint64_t write_offset = next_write_offset_;
811
0
  size_t left = buf_.CurrentSize();
812
0
  DataVerificationInfo v_info;
813
0
  char checksum_buf[sizeof(uint32_t)];
814
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
815
816
0
  while (left > 0) {
817
    // Check how much is allowed
818
0
    size_t size = left;
819
0
    if (rate_limiter_ != nullptr &&
820
0
        rate_limiter_priority_used != Env::IO_TOTAL) {
821
0
      size = rate_limiter_->RequestToken(left, buf_.Alignment(),
822
0
                                         rate_limiter_priority_used, stats_,
823
0
                                         RateLimiter::OpType::kWrite);
824
0
    }
825
826
0
    {
827
0
      IOSTATS_TIMER_GUARD(write_nanos);
828
0
      TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
829
0
      FileOperationInfo::StartTimePoint start_ts;
830
0
      if (ShouldNotifyListeners()) {
831
0
        start_ts = FileOperationInfo::StartNow();
832
0
      }
833
      // direct writes must be positional
834
0
      if (perform_data_verification_) {
835
0
        Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
836
0
        v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
837
0
        s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
838
0
                                             opts, v_info, nullptr);
839
0
      } else {
840
0
        s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
841
0
                                             opts, nullptr);
842
0
      }
843
844
0
      if (ShouldNotifyListeners()) {
845
0
        auto finish_ts = std::chrono::steady_clock::now();
846
0
        NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
847
0
        if (!s.ok()) {
848
0
          NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
849
0
                          size, write_offset);
850
0
        }
851
0
      }
852
0
      if (!s.ok()) {
853
0
        buf_.Size(file_advance + leftover_tail);
854
0
        set_seen_error(s);
855
0
        return s;
856
0
      }
857
0
    }
858
859
0
    IOSTATS_ADD(bytes_written, size);
860
0
    left -= size;
861
0
    src += size;
862
0
    write_offset += size;
863
0
    uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
864
0
    flushed_size_.store(cur_size + size, std::memory_order_release);
865
0
    assert((next_write_offset_ % alignment) == 0);
866
0
  }
867
868
0
  if (s.ok()) {
869
    // Move the tail to the beginning of the buffer
870
    // This never happens during normal Append but rather during
871
    // explicit call to Flush()/Sync() or Close()
872
0
    buf_.RefitTail(file_advance, leftover_tail);
873
    // This is where we start writing next time which may or not be
874
    // the actual file size on disk. They match if the buffer size
875
    // is a multiple of whole pages otherwise filesize_ is leftover_tail
876
    // behind
877
0
    next_write_offset_ += file_advance;
878
0
  } else {
879
0
    set_seen_error(s);
880
0
  }
881
0
  return s;
882
0
}
883
884
0
IOStatus WritableFileWriter::WriteDirectWithChecksum(const IOOptions& opts) {
885
0
  if (seen_error()) {
886
0
    return GetWriterHasPreviousErrorStatus();
887
0
  }
888
889
0
  assert(use_direct_io());
890
0
  assert(perform_data_verification_ && buffered_data_with_checksum_);
891
0
  IOStatus s;
892
0
  const size_t alignment = buf_.Alignment();
893
0
  assert((next_write_offset_ % alignment) == 0);
894
895
  // Calculate whole page final file advance if all writes succeed
896
0
  const size_t file_advance =
897
0
      TruncateToPageBoundary(alignment, buf_.CurrentSize());
898
899
  // Calculate the leftover tail, we write it here padded with zeros BUT we
900
  // will write it again in the future either on Close() OR when the current
901
  // whole page fills out.
902
0
  const size_t leftover_tail = buf_.CurrentSize() - file_advance;
903
904
  // Round up, pad, and combine the checksum.
905
0
  size_t last_cur_size = buf_.CurrentSize();
906
0
  buf_.PadToAlignmentWith(0);
907
0
  size_t padded_size = buf_.CurrentSize() - last_cur_size;
908
0
  const char* padded_start = buf_.BufferStart() + last_cur_size;
909
0
  uint32_t padded_checksum = crc32c::Value(padded_start, padded_size);
910
0
  buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
911
0
      buffered_data_crc32c_checksum_, padded_checksum, padded_size);
912
913
0
  const char* src = buf_.BufferStart();
914
0
  uint64_t write_offset = next_write_offset_;
915
0
  size_t left = buf_.CurrentSize();
916
0
  DataVerificationInfo v_info;
917
0
  char checksum_buf[sizeof(uint32_t)];
918
919
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
920
  // Check how much is allowed. Here, we loop until the rate limiter allows to
921
  // write the entire buffer.
922
  // TODO: need to be improved since it sort of defeats the purpose of the rate
923
  // limiter
924
0
  size_t data_size = left;
925
0
  if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
926
0
    while (data_size > 0) {
927
0
      size_t size;
928
0
      size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
929
0
                                         rate_limiter_priority_used, stats_,
930
0
                                         RateLimiter::OpType::kWrite);
931
0
      data_size -= size;
932
0
    }
933
0
  }
934
935
0
  {
936
0
    IOSTATS_TIMER_GUARD(write_nanos);
937
0
    TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
938
0
    FileOperationInfo::StartTimePoint start_ts;
939
0
    if (ShouldNotifyListeners()) {
940
0
      start_ts = FileOperationInfo::StartNow();
941
0
    }
942
    // direct writes must be positional
943
0
    EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
944
0
    v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
945
0
    s = writable_file_->PositionedAppend(Slice(src, left), write_offset, opts,
946
0
                                         v_info, nullptr);
947
948
0
    if (ShouldNotifyListeners()) {
949
0
      auto finish_ts = std::chrono::steady_clock::now();
950
0
      NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s);
951
0
      if (!s.ok()) {
952
0
        NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
953
0
                        left, write_offset);
954
0
      }
955
0
    }
956
0
    if (!s.ok()) {
957
      // In this case, we do not change buffered_data_crc32c_checksum_ because
958
      // it still aligns with the data in the buffer.
959
0
      buf_.Size(file_advance + leftover_tail);
960
0
      buffered_data_crc32c_checksum_ =
961
0
          crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
962
0
      set_seen_error(s);
963
0
      return s;
964
0
    }
965
0
  }
966
967
0
  IOSTATS_ADD(bytes_written, left);
968
0
  assert((next_write_offset_ % alignment) == 0);
969
0
  uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
970
0
  flushed_size_.store(cur_size + left, std::memory_order_release);
971
972
0
  if (s.ok()) {
973
    // Move the tail to the beginning of the buffer
974
    // This never happens during normal Append but rather during
975
    // explicit call to Flush()/Sync() or Close(). Also the buffer checksum will
976
    // recalculated accordingly.
977
0
    buf_.RefitTail(file_advance, leftover_tail);
978
    // Adjust the checksum value to align with the data in the buffer
979
0
    buffered_data_crc32c_checksum_ =
980
0
        crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
981
    // This is where we start writing next time which may or not be
982
    // the actual file size on disk. They match if the buffer size
983
    // is a multiple of whole pages otherwise filesize_ is leftover_tail
984
    // behind
985
0
    next_write_offset_ += file_advance;
986
0
  } else {
987
0
    set_seen_error(s);
988
0
  }
989
0
  return s;
990
0
}
991
Env::IOPriority WritableFileWriter::DecideRateLimiterPriority(
992
    Env::IOPriority writable_file_io_priority,
993
29.5M
    Env::IOPriority op_rate_limiter_priority) {
994
29.5M
  if (writable_file_io_priority == Env::IO_TOTAL &&
995
29.2M
      op_rate_limiter_priority == Env::IO_TOTAL) {
996
29.2M
    return Env::IO_TOTAL;
997
29.2M
  } else if (writable_file_io_priority == Env::IO_TOTAL) {
998
0
    return op_rate_limiter_priority;
999
285k
  } else if (op_rate_limiter_priority == Env::IO_TOTAL) {
1000
19.2k
    return writable_file_io_priority;
1001
266k
  } else {
1002
266k
    return op_rate_limiter_priority;
1003
266k
  }
1004
29.5M
}
1005
1006
29.8M
IOOptions WritableFileWriter::FinalizeIOOptions(const IOOptions& opts) const {
1007
29.8M
  Env::IOPriority op_rate_limiter_priority = opts.rate_limiter_priority;
1008
29.8M
  IOOptions io_options(opts);
1009
29.8M
  if (writable_file_.get() != nullptr) {
1010
29.5M
    io_options.rate_limiter_priority =
1011
29.5M
        WritableFileWriter::DecideRateLimiterPriority(
1012
29.5M
            writable_file_->GetIOPriority(), op_rate_limiter_priority);
1013
29.5M
  }
1014
29.8M
  return io_options;
1015
29.8M
}
1016
}  // namespace ROCKSDB_NAMESPACE