Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/file/writable_file_writer.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "file/writable_file_writer.h"
11
12
#include <algorithm>
13
#include <mutex>
14
15
#include "db/version_edit.h"
16
#include "file/file_util.h"
17
#include "monitoring/histogram.h"
18
#include "monitoring/iostats_context_imp.h"
19
#include "port/port.h"
20
#include "rocksdb/io_status.h"
21
#include "rocksdb/system_clock.h"
22
#include "test_util/sync_point.h"
23
#include "util/crc32c.h"
24
#include "util/random.h"
25
#include "util/rate_limiter_impl.h"
26
27
namespace ROCKSDB_NAMESPACE {
28
inline Histograms GetFileWriteHistograms(Histograms file_writer_hist,
29
31.2M
                                         Env::IOActivity io_activity) {
30
31.2M
  if (file_writer_hist == Histograms::SST_WRITE_MICROS ||
31
31.2M
      file_writer_hist == Histograms::BLOB_DB_BLOB_FILE_WRITE_MICROS) {
32
228k
    switch (io_activity) {
33
20.1k
      case Env::IOActivity::kFlush:
34
20.1k
        return Histograms::FILE_WRITE_FLUSH_MICROS;
35
15.0k
      case Env::IOActivity::kCompaction:
36
15.0k
        return Histograms::FILE_WRITE_COMPACTION_MICROS;
37
192k
      case Env::IOActivity::kDBOpen:
38
192k
        return Histograms::FILE_WRITE_DB_OPEN_MICROS;
39
0
      default:
40
0
        break;
41
228k
    }
42
228k
  }
43
31.0M
  return Histograms::HISTOGRAM_ENUM_MAX;
44
31.2M
}
45
46
IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
47
                                    const std::string& fname,
48
                                    const FileOptions& file_opts,
49
                                    std::unique_ptr<WritableFileWriter>* writer,
50
0
                                    IODebugContext* dbg) {
51
0
  if (file_opts.use_direct_writes &&
52
0
      0 == file_opts.writable_file_max_buffer_size) {
53
0
    return IOStatus::InvalidArgument(
54
0
        "Direct write requires writable_file_max_buffer_size > 0");
55
0
  }
56
0
  std::unique_ptr<FSWritableFile> file;
57
0
  IOStatus io_s = fs->NewWritableFile(fname, file_opts, &file, dbg);
58
0
  if (io_s.ok()) {
59
0
    writer->reset(new WritableFileWriter(std::move(file), fname, file_opts));
60
0
  }
61
0
  return io_s;
62
0
}
63
64
IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
65
31.2M
                                    uint32_t crc32c_checksum) {
66
31.2M
  if (seen_error()) {
67
0
    return GetWriterHasPreviousErrorStatus();
68
0
  }
69
70
31.2M
  StopWatch sw(clock_, stats_, hist_type_,
71
31.2M
               GetFileWriteHistograms(hist_type_, opts.io_activity));
72
73
31.2M
  const IOOptions io_options = FinalizeIOOptions(opts);
74
31.2M
  const char* src = data.data();
75
31.2M
  size_t left = data.size();
76
31.2M
  IOStatus s;
77
31.2M
  pending_sync_ = true;
78
79
31.2M
  TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Append:0", REDUCE_ODDS2);
80
81
  // Calculate the checksum of appended data
82
31.2M
  UpdateFileChecksum(data);
83
84
31.2M
  {
85
31.2M
    IOSTATS_TIMER_GUARD(prepare_write_nanos);
86
31.2M
    TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
87
31.2M
    writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
88
31.2M
                                 io_options, nullptr);
89
31.2M
  }
90
91
  // See whether we need to enlarge the buffer to avoid the flush
92
31.2M
  if (buf_.Capacity() - buf_.CurrentSize() < left) {
93
1.85k
    for (size_t cap = buf_.Capacity();
94
2.41k
         cap < max_buffer_size_;  // There is still room to increase
95
2.37k
         cap *= 2) {
96
      // See whether the next available size is large enough.
97
      // Buffer will never be increased to more than max_buffer_size_.
98
2.37k
      size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
99
2.37k
      if (desired_capacity - buf_.CurrentSize() >= left ||
100
2.37k
          (use_direct_io() && desired_capacity == max_buffer_size_)) {
101
1.80k
        buf_.AllocateNewBuffer(desired_capacity, true);
102
1.80k
        break;
103
1.80k
      }
104
2.37k
    }
105
1.85k
  }
106
107
  // Flush only when buffered I/O
108
31.2M
  if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
109
44
    if (buf_.CurrentSize() > 0) {
110
44
      if (!buffered_data_with_checksum_) {
111
        // If we're not calculating checksum of buffered data, fill the
112
        // buffer before flushing so that the writes are aligned. This will
113
        // benefit file system performance.
114
44
        size_t appended = buf_.Append(src, left);
115
44
        left -= appended;
116
44
        src += appended;
117
44
      }
118
44
      s = Flush(io_options);
119
44
      if (!s.ok()) {
120
0
        set_seen_error(s);
121
0
        return s;
122
0
      }
123
44
    }
124
44
    assert(buf_.CurrentSize() == 0);
125
44
  }
126
127
31.2M
  if (perform_data_verification_ && buffered_data_with_checksum_ &&
128
31.2M
      crc32c_checksum != 0) {
129
    // Since we want to use the checksum of the input data, we cannot break it
130
    // into several pieces. We will only write them in the buffer when buffer
131
    // size is enough. Otherwise, we will directly write it down.
132
0
    if (use_direct_io() || (buf_.Capacity() - buf_.CurrentSize()) >= left) {
133
0
      if ((buf_.Capacity() - buf_.CurrentSize()) >= left) {
134
0
        size_t appended = buf_.Append(src, left);
135
0
        if (appended != left) {
136
0
          s = IOStatus::Corruption("Write buffer append failure");
137
0
        }
138
0
        buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
139
0
            buffered_data_crc32c_checksum_, crc32c_checksum, appended);
140
0
      } else {
141
0
        while (left > 0) {
142
0
          size_t appended = buf_.Append(src, left);
143
0
          buffered_data_crc32c_checksum_ =
144
0
              crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
145
0
          left -= appended;
146
0
          src += appended;
147
148
0
          if (left > 0) {
149
0
            s = Flush(io_options);
150
0
            if (!s.ok()) {
151
0
              break;
152
0
            }
153
0
          }
154
0
        }
155
0
      }
156
0
    } else {
157
0
      assert(buf_.CurrentSize() == 0);
158
0
      buffered_data_crc32c_checksum_ = crc32c_checksum;
159
0
      s = WriteBufferedWithChecksum(io_options, src, left);
160
0
    }
161
31.2M
  } else {
162
    // In this case, either we do not need to do the data verification or
163
    // caller does not provide the checksum of the data (crc32c_checksum = 0).
164
    //
165
    // We never write directly to disk with direct I/O on.
166
    // or we simply use it for its original purpose to accumulate many small
167
    // chunks
168
31.2M
    if (use_direct_io() || (buf_.Capacity() >= left)) {
169
62.5M
      while (left > 0) {
170
31.2M
        size_t appended = buf_.Append(src, left);
171
31.2M
        if (perform_data_verification_ && buffered_data_with_checksum_) {
172
0
          buffered_data_crc32c_checksum_ =
173
0
              crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
174
0
        }
175
31.2M
        left -= appended;
176
31.2M
        src += appended;
177
178
31.2M
        if (left > 0) {
179
0
          s = Flush(io_options);
180
0
          if (!s.ok()) {
181
0
            break;
182
0
          }
183
0
        }
184
31.2M
      }
185
31.2M
    } else {
186
      // Writing directly to file bypassing the buffer
187
166
      assert(buf_.CurrentSize() == 0);
188
166
      if (perform_data_verification_ && buffered_data_with_checksum_) {
189
0
        buffered_data_crc32c_checksum_ = crc32c::Value(src, left);
190
0
        s = WriteBufferedWithChecksum(io_options, src, left);
191
166
      } else {
192
166
        s = WriteBuffered(io_options, src, left);
193
166
      }
194
166
    }
195
31.2M
  }
196
197
31.2M
  TEST_KILL_RANDOM("WritableFileWriter::Append:1");
198
31.2M
  if (s.ok()) {
199
31.2M
    uint64_t cur_size = filesize_.load(std::memory_order_acquire);
200
31.2M
    filesize_.store(cur_size + data.size(), std::memory_order_release);
201
31.2M
  } else {
202
101
    set_seen_error(s);
203
101
  }
204
31.2M
  return s;
205
31.2M
}
206
207
IOStatus WritableFileWriter::Pad(const IOOptions& opts,
208
0
                                 const size_t pad_bytes) {
209
0
  if (seen_error()) {
210
0
    return GetWriterHasPreviousErrorStatus();
211
0
  }
212
0
  const IOOptions io_options = FinalizeIOOptions(opts);
213
0
  assert(pad_bytes < kDefaultPageSize);
214
0
  size_t left = pad_bytes;
215
0
  size_t cap = buf_.Capacity() - buf_.CurrentSize();
216
217
  // Assume pad_bytes is small compared to buf_ capacity. So we always
218
  // use buf_ rather than write directly to file in certain cases like
219
  // Append() does.
220
0
  while (left) {
221
0
    size_t append_bytes = std::min(cap, left);
222
0
    buf_.PadWith(append_bytes, 0);
223
0
    left -= append_bytes;
224
225
0
    Slice data(buf_.BufferStart() + buf_.CurrentSize() - append_bytes,
226
0
               append_bytes);
227
0
    UpdateFileChecksum(data);
228
0
    if (perform_data_verification_) {
229
0
      buffered_data_crc32c_checksum_ = crc32c::Extend(
230
0
          buffered_data_crc32c_checksum_,
231
0
          buf_.BufferStart() + buf_.CurrentSize() - append_bytes, append_bytes);
232
0
    }
233
234
0
    if (left > 0) {
235
0
      IOStatus s = Flush(io_options);
236
0
      if (!s.ok()) {
237
0
        set_seen_error(s);
238
0
        return s;
239
0
      }
240
0
    }
241
0
    cap = buf_.Capacity() - buf_.CurrentSize();
242
0
  }
243
0
  pending_sync_ = true;
244
0
  uint64_t cur_size = filesize_.load(std::memory_order_acquire);
245
0
  filesize_.store(cur_size + pad_bytes, std::memory_order_release);
246
247
0
  return IOStatus::OK();
248
0
}
249
250
580k
IOStatus WritableFileWriter::Close(const IOOptions& opts) {
251
580k
  IOOptions io_options = FinalizeIOOptions(opts);
252
580k
  if (seen_error()) {
253
0
    IOStatus interim;
254
0
    if (writable_file_.get() != nullptr) {
255
0
      interim = writable_file_->Close(io_options, nullptr);
256
0
      writable_file_.reset();
257
0
    }
258
0
    if (interim.ok()) {
259
0
      return IOStatus::IOError(
260
0
          "File is closed but data not flushed as writer has previous error.");
261
0
    } else {
262
0
      return interim;
263
0
    }
264
0
  }
265
266
  // Do not quit immediately on failure the file MUST be closed
267
268
  // Possible to close it twice now as we MUST close
269
  // in __dtor, simply flushing is not enough
270
  // Windows when pre-allocating does not fill with zeros
271
  // also with unbuffered access we also set the end of data.
272
580k
  if (writable_file_.get() == nullptr) {
273
255k
    return IOStatus::OK();
274
255k
  }
275
276
324k
  IOStatus s;
277
324k
  s = Flush(io_options);  // flush cache to OS
278
279
324k
  IOStatus interim;
280
  // In direct I/O mode we write whole pages so
281
  // we need to let the file know where data ends.
282
324k
  if (use_direct_io()) {
283
0
    {
284
0
      FileOperationInfo::StartTimePoint start_ts;
285
0
      if (ShouldNotifyListeners()) {
286
0
        start_ts = FileOperationInfo::StartNow();
287
0
      }
288
0
      uint64_t filesz = filesize_.load(std::memory_order_acquire);
289
0
      interim = writable_file_->Truncate(filesz, io_options, nullptr);
290
0
      if (ShouldNotifyListeners()) {
291
0
        auto finish_ts = FileOperationInfo::FinishNow();
292
0
        NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
293
0
        if (!interim.ok()) {
294
0
          NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(),
295
0
                          filesz);
296
0
        }
297
0
      }
298
0
    }
299
0
    if (interim.ok()) {
300
0
      {
301
0
        FileOperationInfo::StartTimePoint start_ts;
302
0
        if (ShouldNotifyListeners()) {
303
0
          start_ts = FileOperationInfo::StartNow();
304
0
        }
305
0
        interim = writable_file_->Fsync(io_options, nullptr);
306
0
        if (ShouldNotifyListeners()) {
307
0
          auto finish_ts = FileOperationInfo::FinishNow();
308
0
          NotifyOnFileSyncFinish(start_ts, finish_ts, s,
309
0
                                 FileOperationType::kFsync);
310
0
          if (!interim.ok()) {
311
0
            NotifyOnIOError(interim, FileOperationType::kFsync, file_name());
312
0
          }
313
0
        }
314
0
      }
315
0
    }
316
0
    if (!interim.ok() && s.ok()) {
317
0
      s = interim;
318
0
    }
319
0
  }
320
321
324k
  TEST_KILL_RANDOM("WritableFileWriter::Close:0");
322
324k
  {
323
324k
    FileOperationInfo::StartTimePoint start_ts;
324
324k
    if (ShouldNotifyListeners()) {
325
0
      start_ts = FileOperationInfo::StartNow();
326
0
    }
327
324k
    interim = writable_file_->Close(io_options, nullptr);
328
324k
    if (ShouldNotifyListeners()) {
329
0
      auto finish_ts = FileOperationInfo::FinishNow();
330
0
      NotifyOnFileCloseFinish(start_ts, finish_ts, s);
331
0
      if (!interim.ok()) {
332
0
        NotifyOnIOError(interim, FileOperationType::kClose, file_name());
333
0
      }
334
0
    }
335
324k
  }
336
324k
  if (!interim.ok() && s.ok()) {
337
0
    s = interim;
338
0
  }
339
340
324k
  writable_file_.reset();
341
324k
  TEST_KILL_RANDOM("WritableFileWriter::Close:1");
342
343
324k
  if (s.ok()) {
344
324k
    if (checksum_generator_ != nullptr && !checksum_finalized_) {
345
0
      checksum_generator_->Finalize();
346
0
      checksum_finalized_ = true;
347
0
    }
348
18.4E
  } else {
349
18.4E
    set_seen_error(s);
350
18.4E
  }
351
352
324k
  return s;
353
580k
}
354
355
// write out the cached data to the OS cache or storage if direct I/O
356
// enabled
357
2.71M
IOStatus WritableFileWriter::Flush(const IOOptions& opts) {
358
2.71M
  if (seen_error()) {
359
0
    return GetWriterHasPreviousErrorStatus();
360
0
  }
361
362
2.71M
  const IOOptions io_options = FinalizeIOOptions(opts);
363
364
2.71M
  IOStatus s;
365
2.71M
  TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
366
367
2.71M
  if (buf_.CurrentSize() > 0) {
368
2.14M
    if (use_direct_io()) {
369
0
      if (pending_sync_) {
370
0
        if (perform_data_verification_ && buffered_data_with_checksum_) {
371
0
          s = WriteDirectWithChecksum(io_options);
372
0
        } else {
373
0
          s = WriteDirect(io_options);
374
0
        }
375
0
      }
376
2.14M
    } else {
377
2.14M
      if (perform_data_verification_ && buffered_data_with_checksum_) {
378
0
        s = WriteBufferedWithChecksum(io_options, buf_.BufferStart(),
379
0
                                      buf_.CurrentSize());
380
2.14M
      } else {
381
2.14M
        s = WriteBuffered(io_options, buf_.BufferStart(), buf_.CurrentSize());
382
2.14M
      }
383
2.14M
    }
384
2.14M
    if (!s.ok()) {
385
0
      set_seen_error(s);
386
0
      return s;
387
0
    }
388
2.14M
  }
389
390
2.71M
  {
391
2.71M
    FileOperationInfo::StartTimePoint start_ts;
392
2.71M
    if (ShouldNotifyListeners()) {
393
0
      start_ts = FileOperationInfo::StartNow();
394
0
    }
395
2.71M
    s = writable_file_->Flush(io_options, nullptr);
396
2.71M
    if (ShouldNotifyListeners()) {
397
0
      auto finish_ts = std::chrono::steady_clock::now();
398
0
      NotifyOnFileFlushFinish(start_ts, finish_ts, s);
399
0
      if (!s.ok()) {
400
0
        NotifyOnIOError(s, FileOperationType::kFlush, file_name());
401
0
      }
402
0
    }
403
2.71M
  }
404
405
2.71M
  if (!s.ok()) {
406
0
    set_seen_error(s);
407
0
    return s;
408
0
  }
409
410
  // sync OS cache to disk for every bytes_per_sync_
411
  // TODO: give log file and sst file different options (log
412
  // files could be potentially cached in OS for their whole
413
  // life time, thus we might not want to flush at all).
414
415
  // We try to avoid sync to the last 1MB of data. For two reasons:
416
  // (1) avoid rewrite the same page that is modified later.
417
  // (2) for older version of OS, write can block while writing out
418
  //     the page.
419
  // Xfs does neighbor page flushing outside of the specified ranges. We
420
  // need to make sure sync range is far from the write offset.
421
2.71M
  if (!use_direct_io() && bytes_per_sync_) {
422
0
    const uint64_t kBytesNotSyncRange =
423
0
        1024 * 1024;                                // recent 1MB is not synced.
424
0
    const uint64_t kBytesAlignWhenSync = 4 * 1024;  // Align 4KB.
425
0
    uint64_t cur_size = filesize_.load(std::memory_order_acquire);
426
0
    if (cur_size > kBytesNotSyncRange) {
427
0
      uint64_t offset_sync_to = cur_size - kBytesNotSyncRange;
428
0
      offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
429
0
      assert(offset_sync_to >= last_sync_size_);
430
0
      if (offset_sync_to > 0 &&
431
0
          offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
432
0
        s = RangeSync(io_options, last_sync_size_,
433
0
                      offset_sync_to - last_sync_size_);
434
0
        if (!s.ok()) {
435
0
          set_seen_error(s);
436
0
        }
437
0
        last_sync_size_ = offset_sync_to;
438
0
      }
439
0
    }
440
0
  }
441
442
2.71M
  return s;
443
2.71M
}
444
445
23.9k
std::string WritableFileWriter::GetFileChecksum() {
446
23.9k
  if (checksum_generator_ != nullptr) {
447
0
    assert(checksum_finalized_);
448
0
    return checksum_generator_->GetChecksum();
449
23.9k
  } else {
450
23.9k
    return kUnknownFileChecksum;
451
23.9k
  }
452
23.9k
}
453
454
23.9k
const char* WritableFileWriter::GetFileChecksumFuncName() const {
455
23.9k
  if (checksum_generator_ != nullptr) {
456
0
    return checksum_generator_->Name();
457
23.9k
  } else {
458
23.9k
    return kUnknownFileChecksumFuncName;
459
23.9k
  }
460
23.9k
}
461
462
IOStatus WritableFileWriter::PrepareIOOptions(const WriteOptions& wo,
463
3.63M
                                              IOOptions& opts) {
464
3.63M
  return PrepareIOFromWriteOptions(wo, opts);
465
3.63M
}
466
467
247k
IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) {
468
247k
  if (seen_error()) {
469
0
    return GetWriterHasPreviousErrorStatus();
470
0
  }
471
472
247k
  IOOptions io_options = FinalizeIOOptions(opts);
473
247k
  IOStatus s = Flush(io_options);
474
247k
  if (!s.ok()) {
475
0
    set_seen_error(s);
476
0
    return s;
477
0
  }
478
247k
  TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
479
247k
  if (!use_direct_io() && pending_sync_) {
480
247k
    s = SyncInternal(io_options, use_fsync);
481
247k
    if (!s.ok()) {
482
0
      set_seen_error(s);
483
0
      return s;
484
0
    }
485
247k
  }
486
247k
  TEST_KILL_RANDOM("WritableFileWriter::Sync:1");
487
247k
  pending_sync_ = false;
488
247k
  return IOStatus::OK();
489
247k
}
490
491
IOStatus WritableFileWriter::SyncWithoutFlush(const IOOptions& opts,
492
0
                                              bool use_fsync) {
493
0
  if (seen_error()) {
494
0
    return GetWriterHasPreviousErrorStatus();
495
0
  }
496
0
  IOOptions io_options = FinalizeIOOptions(opts);
497
0
  if (!writable_file_->IsSyncThreadSafe()) {
498
0
    return IOStatus::NotSupported(
499
0
        "Can't WritableFileWriter::SyncWithoutFlush() because "
500
0
        "WritableFile::IsSyncThreadSafe() is false");
501
0
  }
502
0
  TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
503
0
  IOStatus s = SyncInternal(io_options, use_fsync);
504
0
  TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
505
0
  if (!s.ok()) {
506
0
    set_seen_error(s);
507
0
  }
508
0
  return s;
509
0
}
510
511
IOStatus WritableFileWriter::SyncInternal(const IOOptions& opts,
512
247k
                                          bool use_fsync) {
513
  // Caller is supposed to check seen_error_
514
247k
  IOStatus s;
515
247k
  IOSTATS_TIMER_GUARD(fsync_nanos);
516
247k
  TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
517
247k
  auto prev_perf_level = GetPerfLevel();
518
519
247k
  IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
520
521
247k
  FileOperationInfo::StartTimePoint start_ts;
522
247k
  if (ShouldNotifyListeners()) {
523
0
    start_ts = FileOperationInfo::StartNow();
524
0
  }
525
526
247k
  if (use_fsync) {
527
102k
    s = writable_file_->Fsync(opts, nullptr);
528
145k
  } else {
529
145k
    s = writable_file_->Sync(opts, nullptr);
530
145k
  }
531
247k
  if (ShouldNotifyListeners()) {
532
0
    auto finish_ts = std::chrono::steady_clock::now();
533
0
    NotifyOnFileSyncFinish(
534
0
        start_ts, finish_ts, s,
535
0
        use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
536
0
    if (!s.ok()) {
537
0
      NotifyOnIOError(
538
0
          s, (use_fsync ? FileOperationType::kFsync : FileOperationType::kSync),
539
0
          file_name());
540
0
    }
541
0
  }
542
247k
  SetPerfLevel(prev_perf_level);
543
544
  // The caller will be responsible to call set_seen_error(s) if s is not OK.
545
247k
  return s;
546
247k
}
547
548
IOStatus WritableFileWriter::RangeSync(const IOOptions& opts, uint64_t offset,
549
0
                                       uint64_t nbytes) {
550
0
  if (seen_error()) {
551
0
    return GetWriterHasPreviousErrorStatus();
552
0
  }
553
554
0
  IOSTATS_TIMER_GUARD(range_sync_nanos);
555
0
  TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
556
0
  FileOperationInfo::StartTimePoint start_ts;
557
0
  if (ShouldNotifyListeners()) {
558
0
    start_ts = FileOperationInfo::StartNow();
559
0
  }
560
0
  IOStatus s = writable_file_->RangeSync(offset, nbytes, opts, nullptr);
561
0
  if (!s.ok()) {
562
0
    set_seen_error(s);
563
0
  }
564
0
  if (ShouldNotifyListeners()) {
565
0
    auto finish_ts = std::chrono::steady_clock::now();
566
0
    NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
567
0
    if (!s.ok()) {
568
0
      NotifyOnIOError(s, FileOperationType::kRangeSync, file_name(), nbytes,
569
0
                      offset);
570
0
    }
571
0
  }
572
0
  return s;
573
0
}
574
575
// This method writes to disk the specified data and makes use of the rate
576
// limiter if available
577
IOStatus WritableFileWriter::WriteBuffered(const IOOptions& opts,
578
2.14M
                                           const char* data, size_t size) {
579
2.14M
  if (seen_error()) {
580
0
    return GetWriterHasPreviousErrorStatus();
581
0
  }
582
583
2.14M
  IOStatus s;
584
2.14M
  assert(!use_direct_io());
585
2.14M
  const char* src = data;
586
2.14M
  size_t left = size;
587
2.14M
  DataVerificationInfo v_info;
588
2.14M
  char checksum_buf[sizeof(uint32_t)];
589
2.14M
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
590
591
4.29M
  while (left > 0) {
592
2.14M
    size_t allowed = left;
593
2.14M
    if (rate_limiter_ != nullptr &&
594
2.14M
        rate_limiter_priority_used != Env::IO_TOTAL) {
595
0
      allowed = rate_limiter_->RequestToken(left, 0 /* alignment */,
596
0
                                            rate_limiter_priority_used, stats_,
597
0
                                            RateLimiter::OpType::kWrite);
598
0
    }
599
600
2.14M
    {
601
2.14M
      IOSTATS_TIMER_GUARD(write_nanos);
602
2.14M
      TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
603
604
2.14M
      FileOperationInfo::StartTimePoint start_ts;
605
2.14M
      uint64_t old_size = writable_file_->GetFileSize(opts, nullptr);
606
2.14M
      if (ShouldNotifyListeners()) {
607
0
        start_ts = FileOperationInfo::StartNow();
608
0
        old_size = next_write_offset_;
609
0
      }
610
2.14M
      {
611
2.14M
        auto prev_perf_level = GetPerfLevel();
612
613
2.14M
        IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
614
2.14M
        if (perform_data_verification_) {
615
0
          Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
616
0
          v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
617
0
          s = writable_file_->Append(Slice(src, allowed), opts, v_info,
618
0
                                     nullptr);
619
2.14M
        } else {
620
2.14M
          s = writable_file_->Append(Slice(src, allowed), opts, nullptr);
621
2.14M
        }
622
2.14M
        if (!s.ok()) {
623
          // If writable_file_->Append() failed, then the data may or may not
624
          // exist in the underlying memory buffer, OS page cache, remote file
625
          // system's buffer, etc. If WritableFileWriter keeps the data in
626
          // buf_, then a future Close() or write retry may send the data to
627
          // the underlying file again. If the data does exist in the
628
          // underlying buffer and gets written to the file eventually despite
629
          // returning error, the file may end up with two duplicate pieces of
630
          // data. Therefore, clear the buf_ at the WritableFileWriter layer
631
          // and let caller determine error handling.
632
0
          buf_.Size(0);
633
0
          buffered_data_crc32c_checksum_ = 0;
634
0
        }
635
2.14M
        SetPerfLevel(prev_perf_level);
636
2.14M
      }
637
2.14M
      if (ShouldNotifyListeners()) {
638
0
        auto finish_ts = std::chrono::steady_clock::now();
639
0
        NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
640
0
        if (!s.ok()) {
641
0
          NotifyOnIOError(s, FileOperationType::kAppend, file_name(), allowed,
642
0
                          old_size);
643
0
        }
644
0
      }
645
2.14M
      if (!s.ok()) {
646
0
        set_seen_error(s);
647
0
        return s;
648
0
      }
649
2.14M
    }
650
651
2.14M
    IOSTATS_ADD(bytes_written, allowed);
652
2.14M
    TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
653
654
2.14M
    left -= allowed;
655
2.14M
    src += allowed;
656
2.14M
    uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
657
2.14M
    flushed_size_.store(cur_size + allowed, std::memory_order_release);
658
2.14M
  }
659
2.14M
  buf_.Size(0);
660
2.14M
  buffered_data_crc32c_checksum_ = 0;
661
2.14M
  if (!s.ok()) {
662
0
    set_seen_error(s);
663
0
  }
664
2.14M
  return s;
665
2.14M
}
666
667
IOStatus WritableFileWriter::WriteBufferedWithChecksum(const IOOptions& opts,
668
                                                       const char* data,
669
0
                                                       size_t size) {
670
0
  if (seen_error()) {
671
0
    return GetWriterHasPreviousErrorStatus();
672
0
  }
673
674
0
  IOStatus s;
675
0
  assert(!use_direct_io());
676
0
  assert(perform_data_verification_ && buffered_data_with_checksum_);
677
0
  const char* src = data;
678
0
  size_t left = size;
679
0
  DataVerificationInfo v_info;
680
0
  char checksum_buf[sizeof(uint32_t)];
681
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
682
  // Check how much is allowed. Here, we loop until the rate limiter allows to
683
  // write the entire buffer.
684
  // TODO: need to be improved since it sort of defeats the purpose of the rate
685
  // limiter
686
0
  size_t data_size = left;
687
0
  if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
688
0
    while (data_size > 0) {
689
0
      size_t tmp_size;
690
0
      tmp_size =
691
0
          rate_limiter_->RequestToken(data_size, 0, rate_limiter_priority_used,
692
0
                                      stats_, RateLimiter::OpType::kWrite);
693
0
      data_size -= tmp_size;
694
0
    }
695
0
  }
696
697
0
  {
698
0
    IOSTATS_TIMER_GUARD(write_nanos);
699
0
    TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
700
701
0
    FileOperationInfo::StartTimePoint start_ts;
702
0
    uint64_t old_size = writable_file_->GetFileSize(opts, nullptr);
703
0
    if (ShouldNotifyListeners()) {
704
0
      start_ts = FileOperationInfo::StartNow();
705
0
      old_size = next_write_offset_;
706
0
    }
707
0
    {
708
0
      auto prev_perf_level = GetPerfLevel();
709
710
0
      IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
711
712
0
      EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
713
0
      v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
714
0
      s = writable_file_->Append(Slice(src, left), opts, v_info, nullptr);
715
0
      SetPerfLevel(prev_perf_level);
716
0
    }
717
0
    if (ShouldNotifyListeners()) {
718
0
      auto finish_ts = std::chrono::steady_clock::now();
719
0
      NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s);
720
0
      if (!s.ok()) {
721
0
        NotifyOnIOError(s, FileOperationType::kAppend, file_name(), left,
722
0
                        old_size);
723
0
      }
724
0
    }
725
0
    if (!s.ok()) {
726
      // If writable_file_->Append() failed, then the data may or may not
727
      // exist in the underlying memory buffer, OS page cache, remote file
728
      // system's buffer, etc. If WritableFileWriter keeps the data in
729
      // buf_, then a future Close() or write retry may send the data to
730
      // the underlying file again. If the data does exist in the
731
      // underlying buffer and gets written to the file eventually despite
732
      // returning error, the file may end up with two duplicate pieces of
733
      // data. Therefore, clear the buf_ at the WritableFileWriter layer
734
      // and let caller determine error handling.
735
0
      buf_.Size(0);
736
0
      buffered_data_crc32c_checksum_ = 0;
737
0
      set_seen_error(s);
738
0
      return s;
739
0
    }
740
0
  }
741
742
0
  IOSTATS_ADD(bytes_written, left);
743
0
  TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
744
745
  // Buffer write is successful, reset the buffer current size to 0 and reset
746
  // the corresponding checksum value
747
0
  buf_.Size(0);
748
0
  buffered_data_crc32c_checksum_ = 0;
749
0
  uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
750
0
  flushed_size_.store(cur_size + left, std::memory_order_release);
751
0
  if (!s.ok()) {
752
0
    set_seen_error(s);
753
0
  }
754
0
  return s;
755
0
}
756
757
31.2M
void WritableFileWriter::UpdateFileChecksum(const Slice& data) {
758
31.2M
  if (checksum_generator_ != nullptr) {
759
0
    checksum_generator_->Update(data.data(), data.size());
760
0
  }
761
31.2M
}
762
763
// Currently, crc32c checksum is used to calculate the checksum value of the
764
// content in the input buffer for handoff. In the future, the checksum might be
765
// calculated from the existing crc32c checksums of the in WAl and Manifest
766
// records, or even SST file blocks.
767
// TODO: effectively use the existing checksum of the data being writing to
768
// generate the crc32c checksum instead of a raw calculation.
769
void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
770
                                                          size_t size,
771
0
                                                          char* buf) {
772
0
  uint32_t v_crc32c = crc32c::Extend(0, data, size);
773
0
  EncodeFixed32(buf, v_crc32c);
774
0
}
775
776
// This flushes the accumulated data in the buffer. We pad data with zeros if
777
// necessary to the whole page.
778
// However, during automatic flushes padding would not be necessary.
779
// We always use RateLimiter if available. We move (Refit) any buffer bytes
780
// that are left over the
781
// whole number of pages to be written again on the next flush because we can
782
// only write on aligned
783
// offsets.
784
0
IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) {
785
0
  if (seen_error()) {
786
0
    assert(false);
787
788
0
    return IOStatus::IOError("Writer has previous error.");
789
0
  }
790
791
0
  assert(use_direct_io());
792
0
  IOStatus s;
793
0
  const size_t alignment = buf_.Alignment();
794
0
  assert((next_write_offset_ % alignment) == 0);
795
796
  // Calculate whole page final file advance if all writes succeed
797
0
  const size_t file_advance =
798
0
      TruncateToPageBoundary(alignment, buf_.CurrentSize());
799
800
  // Calculate the leftover tail, we write it here padded with zeros BUT we
801
  // will write it again in the future either on Close() OR when the current
802
  // whole page fills out.
803
0
  const size_t leftover_tail = buf_.CurrentSize() - file_advance;
804
805
  // Round up and pad
806
0
  buf_.PadToAlignmentWith(0);
807
808
0
  const char* src = buf_.BufferStart();
809
0
  uint64_t write_offset = next_write_offset_;
810
0
  size_t left = buf_.CurrentSize();
811
0
  DataVerificationInfo v_info;
812
0
  char checksum_buf[sizeof(uint32_t)];
813
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
814
815
0
  while (left > 0) {
816
    // Check how much is allowed
817
0
    size_t size = left;
818
0
    if (rate_limiter_ != nullptr &&
819
0
        rate_limiter_priority_used != Env::IO_TOTAL) {
820
0
      size = rate_limiter_->RequestToken(left, buf_.Alignment(),
821
0
                                         rate_limiter_priority_used, stats_,
822
0
                                         RateLimiter::OpType::kWrite);
823
0
    }
824
825
0
    {
826
0
      IOSTATS_TIMER_GUARD(write_nanos);
827
0
      TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
828
0
      FileOperationInfo::StartTimePoint start_ts;
829
0
      if (ShouldNotifyListeners()) {
830
0
        start_ts = FileOperationInfo::StartNow();
831
0
      }
832
      // direct writes must be positional
833
0
      if (perform_data_verification_) {
834
0
        Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
835
0
        v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
836
0
        s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
837
0
                                             opts, v_info, nullptr);
838
0
      } else {
839
0
        s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
840
0
                                             opts, nullptr);
841
0
      }
842
843
0
      if (ShouldNotifyListeners()) {
844
0
        auto finish_ts = std::chrono::steady_clock::now();
845
0
        NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
846
0
        if (!s.ok()) {
847
0
          NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
848
0
                          size, write_offset);
849
0
        }
850
0
      }
851
0
      if (!s.ok()) {
852
0
        buf_.Size(file_advance + leftover_tail);
853
0
        set_seen_error(s);
854
0
        return s;
855
0
      }
856
0
    }
857
858
0
    IOSTATS_ADD(bytes_written, size);
859
0
    left -= size;
860
0
    src += size;
861
0
    write_offset += size;
862
0
    uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
863
0
    flushed_size_.store(cur_size + size, std::memory_order_release);
864
0
    assert((next_write_offset_ % alignment) == 0);
865
0
  }
866
867
0
  if (s.ok()) {
868
    // Move the tail to the beginning of the buffer
869
    // This never happens during normal Append but rather during
870
    // explicit call to Flush()/Sync() or Close()
871
0
    buf_.RefitTail(file_advance, leftover_tail);
872
    // This is where we start writing next time which may or not be
873
    // the actual file size on disk. They match if the buffer size
874
    // is a multiple of whole pages otherwise filesize_ is leftover_tail
875
    // behind
876
0
    next_write_offset_ += file_advance;
877
0
  } else {
878
0
    set_seen_error(s);
879
0
  }
880
0
  return s;
881
0
}
882
883
0
IOStatus WritableFileWriter::WriteDirectWithChecksum(const IOOptions& opts) {
884
0
  if (seen_error()) {
885
0
    return GetWriterHasPreviousErrorStatus();
886
0
  }
887
888
0
  assert(use_direct_io());
889
0
  assert(perform_data_verification_ && buffered_data_with_checksum_);
890
0
  IOStatus s;
891
0
  const size_t alignment = buf_.Alignment();
892
0
  assert((next_write_offset_ % alignment) == 0);
893
894
  // Calculate whole page final file advance if all writes succeed
895
0
  const size_t file_advance =
896
0
      TruncateToPageBoundary(alignment, buf_.CurrentSize());
897
898
  // Calculate the leftover tail, we write it here padded with zeros BUT we
899
  // will write it again in the future either on Close() OR when the current
900
  // whole page fills out.
901
0
  const size_t leftover_tail = buf_.CurrentSize() - file_advance;
902
903
  // Round up, pad, and combine the checksum.
904
0
  size_t last_cur_size = buf_.CurrentSize();
905
0
  buf_.PadToAlignmentWith(0);
906
0
  size_t padded_size = buf_.CurrentSize() - last_cur_size;
907
0
  const char* padded_start = buf_.BufferStart() + last_cur_size;
908
0
  uint32_t padded_checksum = crc32c::Value(padded_start, padded_size);
909
0
  buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
910
0
      buffered_data_crc32c_checksum_, padded_checksum, padded_size);
911
912
0
  const char* src = buf_.BufferStart();
913
0
  uint64_t write_offset = next_write_offset_;
914
0
  size_t left = buf_.CurrentSize();
915
0
  DataVerificationInfo v_info;
916
0
  char checksum_buf[sizeof(uint32_t)];
917
918
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
919
  // Check how much is allowed. Here, we loop until the rate limiter allows to
920
  // write the entire buffer.
921
  // TODO: need to be improved since it sort of defeats the purpose of the rate
922
  // limiter
923
0
  size_t data_size = left;
924
0
  if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
925
0
    while (data_size > 0) {
926
0
      size_t size;
927
0
      size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
928
0
                                         rate_limiter_priority_used, stats_,
929
0
                                         RateLimiter::OpType::kWrite);
930
0
      data_size -= size;
931
0
    }
932
0
  }
933
934
0
  {
935
0
    IOSTATS_TIMER_GUARD(write_nanos);
936
0
    TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
937
0
    FileOperationInfo::StartTimePoint start_ts;
938
0
    if (ShouldNotifyListeners()) {
939
0
      start_ts = FileOperationInfo::StartNow();
940
0
    }
941
    // direct writes must be positional
942
0
    EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
943
0
    v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
944
0
    s = writable_file_->PositionedAppend(Slice(src, left), write_offset, opts,
945
0
                                         v_info, nullptr);
946
947
0
    if (ShouldNotifyListeners()) {
948
0
      auto finish_ts = std::chrono::steady_clock::now();
949
0
      NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s);
950
0
      if (!s.ok()) {
951
0
        NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
952
0
                        left, write_offset);
953
0
      }
954
0
    }
955
0
    if (!s.ok()) {
956
      // In this case, we do not change buffered_data_crc32c_checksum_ because
957
      // it still aligns with the data in the buffer.
958
0
      buf_.Size(file_advance + leftover_tail);
959
0
      buffered_data_crc32c_checksum_ =
960
0
          crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
961
0
      set_seen_error(s);
962
0
      return s;
963
0
    }
964
0
  }
965
966
0
  IOSTATS_ADD(bytes_written, left);
967
0
  assert((next_write_offset_ % alignment) == 0);
968
0
  uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
969
0
  flushed_size_.store(cur_size + left, std::memory_order_release);
970
971
0
  if (s.ok()) {
972
    // Move the tail to the beginning of the buffer
973
    // This never happens during normal Append but rather during
974
    // explicit call to Flush()/Sync() or Close(). Also the buffer checksum will
975
    // recalculated accordingly.
976
0
    buf_.RefitTail(file_advance, leftover_tail);
977
    // Adjust the checksum value to align with the data in the buffer
978
0
    buffered_data_crc32c_checksum_ =
979
0
        crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
980
    // This is where we start writing next time which may or not be
981
    // the actual file size on disk. They match if the buffer size
982
    // is a multiple of whole pages otherwise filesize_ is leftover_tail
983
    // behind
984
0
    next_write_offset_ += file_advance;
985
0
  } else {
986
0
    set_seen_error(s);
987
0
  }
988
0
  return s;
989
0
}
990
Env::IOPriority WritableFileWriter::DecideRateLimiterPriority(
991
    Env::IOPriority writable_file_io_priority,
992
34.5M
    Env::IOPriority op_rate_limiter_priority) {
993
34.5M
  if (writable_file_io_priority == Env::IO_TOTAL &&
994
34.5M
      op_rate_limiter_priority == Env::IO_TOTAL) {
995
34.2M
    return Env::IO_TOTAL;
996
34.2M
  } else if (writable_file_io_priority == Env::IO_TOTAL) {
997
0
    return op_rate_limiter_priority;
998
325k
  } else if (op_rate_limiter_priority == Env::IO_TOTAL) {
999
19.0k
    return writable_file_io_priority;
1000
306k
  } else {
1001
306k
    return op_rate_limiter_priority;
1002
306k
  }
1003
34.5M
}
1004
1005
34.8M
IOOptions WritableFileWriter::FinalizeIOOptions(const IOOptions& opts) const {
1006
34.8M
  Env::IOPriority op_rate_limiter_priority = opts.rate_limiter_priority;
1007
34.8M
  IOOptions io_options(opts);
1008
34.8M
  if (writable_file_.get() != nullptr) {
1009
34.5M
    io_options.rate_limiter_priority =
1010
34.5M
        WritableFileWriter::DecideRateLimiterPriority(
1011
34.5M
            writable_file_->GetIOPriority(), op_rate_limiter_priority);
1012
34.5M
  }
1013
34.8M
  return io_options;
1014
34.8M
}
1015
}  // namespace ROCKSDB_NAMESPACE