Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/file/writable_file_writer.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "file/writable_file_writer.h"
11
12
#include <algorithm>
13
#include <mutex>
14
15
#include "db/version_edit.h"
16
#include "file/file_util.h"
17
#include "monitoring/histogram.h"
18
#include "monitoring/iostats_context_imp.h"
19
#include "port/port.h"
20
#include "rocksdb/io_status.h"
21
#include "rocksdb/system_clock.h"
22
#include "test_util/sync_point.h"
23
#include "util/crc32c.h"
24
#include "util/random.h"
25
#include "util/rate_limiter_impl.h"
26
27
namespace ROCKSDB_NAMESPACE {
28
inline Histograms GetFileWriteHistograms(Histograms file_writer_hist,
29
5.16M
                                         Env::IOActivity io_activity) {
30
5.16M
  if (file_writer_hist == Histograms::SST_WRITE_MICROS ||
31
5.16M
      file_writer_hist == Histograms::BLOB_DB_BLOB_FILE_WRITE_MICROS) {
32
60.4k
    switch (io_activity) {
33
1.19k
      case Env::IOActivity::kFlush:
34
1.19k
        return Histograms::FILE_WRITE_FLUSH_MICROS;
35
963
      case Env::IOActivity::kCompaction:
36
963
        return Histograms::FILE_WRITE_COMPACTION_MICROS;
37
58.3k
      case Env::IOActivity::kDBOpen:
38
58.3k
        return Histograms::FILE_WRITE_DB_OPEN_MICROS;
39
0
      default:
40
0
        break;
41
60.4k
    }
42
60.4k
  }
43
5.10M
  return Histograms::HISTOGRAM_ENUM_MAX;
44
5.16M
}
45
46
IOStatus WritableFileWriter::Create(const std::shared_ptr<FileSystem>& fs,
47
                                    const std::string& fname,
48
                                    const FileOptions& file_opts,
49
                                    std::unique_ptr<WritableFileWriter>* writer,
50
0
                                    IODebugContext* dbg) {
51
0
  if (file_opts.use_direct_writes &&
52
0
      0 == file_opts.writable_file_max_buffer_size) {
53
0
    return IOStatus::InvalidArgument(
54
0
        "Direct write requires writable_file_max_buffer_size > 0");
55
0
  }
56
0
  std::unique_ptr<FSWritableFile> file;
57
0
  IOStatus io_s = fs->NewWritableFile(fname, file_opts, &file, dbg);
58
0
  if (io_s.ok()) {
59
0
    writer->reset(new WritableFileWriter(std::move(file), fname, file_opts));
60
0
  }
61
0
  return io_s;
62
0
}
63
64
IOStatus WritableFileWriter::Append(const IOOptions& opts, const Slice& data,
65
5.16M
                                    uint32_t crc32c_checksum) {
66
5.16M
  if (seen_error()) {
67
0
    return GetWriterHasPreviousErrorStatus();
68
0
  }
69
70
5.16M
  StopWatch sw(clock_, stats_, hist_type_,
71
5.16M
               GetFileWriteHistograms(hist_type_, opts.io_activity));
72
73
5.16M
  const IOOptions io_options = FinalizeIOOptions(opts);
74
5.16M
  const char* src = data.data();
75
5.16M
  size_t left = data.size();
76
5.16M
  IOStatus s;
77
5.16M
  pending_sync_ = true;
78
79
5.16M
  TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Append:0", REDUCE_ODDS2);
80
81
  // Calculate the checksum of appended data
82
5.16M
  UpdateFileChecksum(data);
83
84
5.16M
  {
85
5.16M
    IOSTATS_TIMER_GUARD(prepare_write_nanos);
86
5.16M
    TEST_SYNC_POINT("WritableFileWriter::Append:BeforePrepareWrite");
87
5.16M
    writable_file_->PrepareWrite(static_cast<size_t>(GetFileSize()), left,
88
5.16M
                                 io_options, nullptr);
89
5.16M
  }
90
91
  // See whether we need to enlarge the buffer to avoid the flush
92
5.16M
  if (buf_.Capacity() - buf_.CurrentSize() < left) {
93
2.75k
    for (size_t cap = buf_.Capacity();
94
3.83k
         cap < max_buffer_size_;  // There is still room to increase
95
3.78k
         cap *= 2) {
96
      // See whether the next available size is large enough.
97
      // Buffer will never be increased to more than max_buffer_size_.
98
3.78k
      size_t desired_capacity = std::min(cap * 2, max_buffer_size_);
99
3.78k
      if (desired_capacity - buf_.CurrentSize() >= left ||
100
3.78k
          (use_direct_io() && desired_capacity == max_buffer_size_)) {
101
2.70k
        buf_.AllocateNewBuffer(desired_capacity, true);
102
2.70k
        break;
103
2.70k
      }
104
3.78k
    }
105
2.75k
  }
106
107
  // Flush only when buffered I/O
108
5.16M
  if (!use_direct_io() && (buf_.Capacity() - buf_.CurrentSize()) < left) {
109
52
    if (buf_.CurrentSize() > 0) {
110
52
      s = Flush(io_options);
111
52
      if (!s.ok()) {
112
0
        set_seen_error(s);
113
0
        return s;
114
0
      }
115
52
    }
116
52
    assert(buf_.CurrentSize() == 0);
117
52
  }
118
119
5.16M
  if (perform_data_verification_ && buffered_data_with_checksum_ &&
120
5.16M
      crc32c_checksum != 0) {
121
    // Since we want to use the checksum of the input data, we cannot break it
122
    // into several pieces. We will only write them in the buffer when buffer
123
    // size is enough. Otherwise, we will directly write it down.
124
0
    if (use_direct_io() || (buf_.Capacity() - buf_.CurrentSize()) >= left) {
125
0
      if ((buf_.Capacity() - buf_.CurrentSize()) >= left) {
126
0
        size_t appended = buf_.Append(src, left);
127
0
        if (appended != left) {
128
0
          s = IOStatus::Corruption("Write buffer append failure");
129
0
        }
130
0
        buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
131
0
            buffered_data_crc32c_checksum_, crc32c_checksum, appended);
132
0
      } else {
133
0
        while (left > 0) {
134
0
          size_t appended = buf_.Append(src, left);
135
0
          buffered_data_crc32c_checksum_ =
136
0
              crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
137
0
          left -= appended;
138
0
          src += appended;
139
140
0
          if (left > 0) {
141
0
            s = Flush(io_options);
142
0
            if (!s.ok()) {
143
0
              break;
144
0
            }
145
0
          }
146
0
        }
147
0
      }
148
0
    } else {
149
0
      assert(buf_.CurrentSize() == 0);
150
0
      buffered_data_crc32c_checksum_ = crc32c_checksum;
151
0
      s = WriteBufferedWithChecksum(io_options, src, left);
152
0
    }
153
5.16M
  } else {
154
    // In this case, either we do not need to do the data verification or
155
    // caller does not provide the checksum of the data (crc32c_checksum = 0).
156
    //
157
    // We never write directly to disk with direct I/O on.
158
    // or we simply use it for its original purpose to accumulate many small
159
    // chunks
160
5.16M
    if (use_direct_io() || (buf_.Capacity() >= left)) {
161
10.3M
      while (left > 0) {
162
5.15M
        size_t appended = buf_.Append(src, left);
163
5.15M
        if (perform_data_verification_ && buffered_data_with_checksum_) {
164
0
          buffered_data_crc32c_checksum_ =
165
0
              crc32c::Extend(buffered_data_crc32c_checksum_, src, appended);
166
0
        }
167
5.15M
        left -= appended;
168
5.15M
        src += appended;
169
170
5.15M
        if (left > 0) {
171
0
          s = Flush(io_options);
172
0
          if (!s.ok()) {
173
0
            break;
174
0
          }
175
0
        }
176
5.15M
      }
177
5.16M
    } else {
178
      // Writing directly to file bypassing the buffer
179
20
      assert(buf_.CurrentSize() == 0);
180
20
      if (perform_data_verification_ && buffered_data_with_checksum_) {
181
0
        buffered_data_crc32c_checksum_ = crc32c::Value(src, left);
182
0
        s = WriteBufferedWithChecksum(io_options, src, left);
183
20
      } else {
184
20
        s = WriteBuffered(io_options, src, left);
185
20
      }
186
20
    }
187
5.16M
  }
188
189
5.16M
  TEST_KILL_RANDOM("WritableFileWriter::Append:1");
190
5.16M
  if (s.ok()) {
191
5.16M
    uint64_t cur_size = filesize_.load(std::memory_order_acquire);
192
5.16M
    filesize_.store(cur_size + data.size(), std::memory_order_release);
193
5.16M
  } else {
194
3
    set_seen_error(s);
195
3
  }
196
5.16M
  return s;
197
5.16M
}
198
199
IOStatus WritableFileWriter::Pad(const IOOptions& opts,
200
0
                                 const size_t pad_bytes) {
201
0
  if (seen_error()) {
202
0
    return GetWriterHasPreviousErrorStatus();
203
0
  }
204
0
  const IOOptions io_options = FinalizeIOOptions(opts);
205
0
  assert(pad_bytes < kDefaultPageSize);
206
0
  size_t left = pad_bytes;
207
0
  size_t cap = buf_.Capacity() - buf_.CurrentSize();
208
209
  // Assume pad_bytes is small compared to buf_ capacity. So we always
210
  // use buf_ rather than write directly to file in certain cases like
211
  // Append() does.
212
0
  while (left) {
213
0
    size_t append_bytes = std::min(cap, left);
214
0
    buf_.PadWith(append_bytes, 0);
215
0
    left -= append_bytes;
216
217
0
    Slice data(buf_.BufferStart() + buf_.CurrentSize() - append_bytes,
218
0
               append_bytes);
219
0
    UpdateFileChecksum(data);
220
0
    if (perform_data_verification_) {
221
0
      buffered_data_crc32c_checksum_ = crc32c::Extend(
222
0
          buffered_data_crc32c_checksum_,
223
0
          buf_.BufferStart() + buf_.CurrentSize() - append_bytes, append_bytes);
224
0
    }
225
226
0
    if (left > 0) {
227
0
      IOStatus s = Flush(io_options);
228
0
      if (!s.ok()) {
229
0
        set_seen_error(s);
230
0
        return s;
231
0
      }
232
0
    }
233
0
    cap = buf_.Capacity() - buf_.CurrentSize();
234
0
  }
235
0
  pending_sync_ = true;
236
0
  uint64_t cur_size = filesize_.load(std::memory_order_acquire);
237
0
  filesize_.store(cur_size + pad_bytes, std::memory_order_release);
238
239
0
  return IOStatus::OK();
240
0
}
241
242
105k
IOStatus WritableFileWriter::Close(const IOOptions& opts) {
243
105k
  IOOptions io_options = FinalizeIOOptions(opts);
244
105k
  if (seen_error()) {
245
0
    IOStatus interim;
246
0
    if (writable_file_.get() != nullptr) {
247
0
      interim = writable_file_->Close(io_options, nullptr);
248
0
      writable_file_.reset();
249
0
    }
250
0
    if (interim.ok()) {
251
0
      return IOStatus::IOError(
252
0
          "File is closed but data not flushed as writer has previous error.");
253
0
    } else {
254
0
      return interim;
255
0
    }
256
0
  }
257
258
  // Do not quit immediately on failure the file MUST be closed
259
260
  // Possible to close it twice now as we MUST close
261
  // in __dtor, simply flushing is not enough
262
  // Windows when pre-allocating does not fill with zeros
263
  // also with unbuffered access we also set the end of data.
264
105k
  if (writable_file_.get() == nullptr) {
265
44.7k
    return IOStatus::OK();
266
44.7k
  }
267
268
61.1k
  IOStatus s;
269
61.1k
  s = Flush(io_options);  // flush cache to OS
270
271
61.1k
  IOStatus interim;
272
  // In direct I/O mode we write whole pages so
273
  // we need to let the file know where data ends.
274
61.1k
  if (use_direct_io()) {
275
0
    {
276
0
      FileOperationInfo::StartTimePoint start_ts;
277
0
      if (ShouldNotifyListeners()) {
278
0
        start_ts = FileOperationInfo::StartNow();
279
0
      }
280
0
      uint64_t filesz = filesize_.load(std::memory_order_acquire);
281
0
      interim = writable_file_->Truncate(filesz, io_options, nullptr);
282
0
      if (ShouldNotifyListeners()) {
283
0
        auto finish_ts = FileOperationInfo::FinishNow();
284
0
        NotifyOnFileTruncateFinish(start_ts, finish_ts, s);
285
0
        if (!interim.ok()) {
286
0
          NotifyOnIOError(interim, FileOperationType::kTruncate, file_name(),
287
0
                          filesz);
288
0
        }
289
0
      }
290
0
    }
291
0
    if (interim.ok()) {
292
0
      {
293
0
        FileOperationInfo::StartTimePoint start_ts;
294
0
        if (ShouldNotifyListeners()) {
295
0
          start_ts = FileOperationInfo::StartNow();
296
0
        }
297
0
        interim = writable_file_->Fsync(io_options, nullptr);
298
0
        if (ShouldNotifyListeners()) {
299
0
          auto finish_ts = FileOperationInfo::FinishNow();
300
0
          NotifyOnFileSyncFinish(start_ts, finish_ts, s,
301
0
                                 FileOperationType::kFsync);
302
0
          if (!interim.ok()) {
303
0
            NotifyOnIOError(interim, FileOperationType::kFsync, file_name());
304
0
          }
305
0
        }
306
0
      }
307
0
    }
308
0
    if (!interim.ok() && s.ok()) {
309
0
      s = interim;
310
0
    }
311
0
  }
312
313
61.1k
  TEST_KILL_RANDOM("WritableFileWriter::Close:0");
314
61.1k
  {
315
61.1k
    FileOperationInfo::StartTimePoint start_ts;
316
61.1k
    if (ShouldNotifyListeners()) {
317
0
      start_ts = FileOperationInfo::StartNow();
318
0
    }
319
61.1k
    interim = writable_file_->Close(io_options, nullptr);
320
61.1k
    if (ShouldNotifyListeners()) {
321
0
      auto finish_ts = FileOperationInfo::FinishNow();
322
0
      NotifyOnFileCloseFinish(start_ts, finish_ts, s);
323
0
      if (!interim.ok()) {
324
0
        NotifyOnIOError(interim, FileOperationType::kClose, file_name());
325
0
      }
326
0
    }
327
61.1k
  }
328
61.1k
  if (!interim.ok() && s.ok()) {
329
0
    s = interim;
330
0
  }
331
332
61.1k
  writable_file_.reset();
333
61.1k
  TEST_KILL_RANDOM("WritableFileWriter::Close:1");
334
335
61.1k
  if (s.ok()) {
336
61.1k
    if (checksum_generator_ != nullptr && !checksum_finalized_) {
337
0
      checksum_generator_->Finalize();
338
0
      checksum_finalized_ = true;
339
0
    }
340
61.1k
  } else {
341
0
    set_seen_error(s);
342
0
  }
343
344
61.1k
  return s;
345
105k
}
346
347
// write out the cached data to the OS cache or storage if direct I/O
348
// enabled
349
1.42M
IOStatus WritableFileWriter::Flush(const IOOptions& opts) {
350
1.42M
  if (seen_error()) {
351
0
    return GetWriterHasPreviousErrorStatus();
352
0
  }
353
354
1.42M
  const IOOptions io_options = FinalizeIOOptions(opts);
355
356
1.42M
  IOStatus s;
357
1.42M
  TEST_KILL_RANDOM_WITH_WEIGHT("WritableFileWriter::Flush:0", REDUCE_ODDS2);
358
359
1.42M
  if (buf_.CurrentSize() > 0) {
360
1.32M
    if (use_direct_io()) {
361
0
      if (pending_sync_) {
362
0
        if (perform_data_verification_ && buffered_data_with_checksum_) {
363
0
          s = WriteDirectWithChecksum(io_options);
364
0
        } else {
365
0
          s = WriteDirect(io_options);
366
0
        }
367
0
      }
368
1.32M
    } else {
369
1.32M
      if (perform_data_verification_ && buffered_data_with_checksum_) {
370
0
        s = WriteBufferedWithChecksum(io_options, buf_.BufferStart(),
371
0
                                      buf_.CurrentSize());
372
1.32M
      } else {
373
1.32M
        s = WriteBuffered(io_options, buf_.BufferStart(), buf_.CurrentSize());
374
1.32M
      }
375
1.32M
    }
376
1.32M
    if (!s.ok()) {
377
0
      set_seen_error(s);
378
0
      return s;
379
0
    }
380
1.32M
  }
381
382
1.42M
  {
383
1.42M
    FileOperationInfo::StartTimePoint start_ts;
384
1.42M
    if (ShouldNotifyListeners()) {
385
0
      start_ts = FileOperationInfo::StartNow();
386
0
    }
387
1.42M
    s = writable_file_->Flush(io_options, nullptr);
388
1.42M
    if (ShouldNotifyListeners()) {
389
0
      auto finish_ts = std::chrono::steady_clock::now();
390
0
      NotifyOnFileFlushFinish(start_ts, finish_ts, s);
391
0
      if (!s.ok()) {
392
0
        NotifyOnIOError(s, FileOperationType::kFlush, file_name());
393
0
      }
394
0
    }
395
1.42M
  }
396
397
1.42M
  if (!s.ok()) {
398
0
    set_seen_error(s);
399
0
    return s;
400
0
  }
401
402
  // sync OS cache to disk for every bytes_per_sync_
403
  // TODO: give log file and sst file different options (log
404
  // files could be potentially cached in OS for their whole
405
  // life time, thus we might not want to flush at all).
406
407
  // We try to avoid sync to the last 1MB of data. For two reasons:
408
  // (1) avoid rewrite the same page that is modified later.
409
  // (2) for older version of OS, write can block while writing out
410
  //     the page.
411
  // Xfs does neighbor page flushing outside of the specified ranges. We
412
  // need to make sure sync range is far from the write offset.
413
1.42M
  if (!use_direct_io() && bytes_per_sync_) {
414
0
    const uint64_t kBytesNotSyncRange =
415
0
        1024 * 1024;                                // recent 1MB is not synced.
416
0
    const uint64_t kBytesAlignWhenSync = 4 * 1024;  // Align 4KB.
417
0
    uint64_t cur_size = filesize_.load(std::memory_order_acquire);
418
0
    if (cur_size > kBytesNotSyncRange) {
419
0
      uint64_t offset_sync_to = cur_size - kBytesNotSyncRange;
420
0
      offset_sync_to -= offset_sync_to % kBytesAlignWhenSync;
421
0
      assert(offset_sync_to >= last_sync_size_);
422
0
      if (offset_sync_to > 0 &&
423
0
          offset_sync_to - last_sync_size_ >= bytes_per_sync_) {
424
0
        s = RangeSync(io_options, last_sync_size_,
425
0
                      offset_sync_to - last_sync_size_);
426
0
        if (!s.ok()) {
427
0
          set_seen_error(s);
428
0
        }
429
0
        last_sync_size_ = offset_sync_to;
430
0
      }
431
0
    }
432
0
  }
433
434
1.42M
  return s;
435
1.42M
}
436
437
5.42k
std::string WritableFileWriter::GetFileChecksum() {
438
5.42k
  if (checksum_generator_ != nullptr) {
439
0
    assert(checksum_finalized_);
440
0
    return checksum_generator_->GetChecksum();
441
5.42k
  } else {
442
5.42k
    return kUnknownFileChecksum;
443
5.42k
  }
444
5.42k
}
445
446
5.42k
const char* WritableFileWriter::GetFileChecksumFuncName() const {
447
5.42k
  if (checksum_generator_ != nullptr) {
448
0
    return checksum_generator_->Name();
449
5.42k
  } else {
450
5.42k
    return kUnknownFileChecksumFuncName;
451
5.42k
  }
452
5.42k
}
453
454
IOStatus WritableFileWriter::PrepareIOOptions(const WriteOptions& wo,
455
2.55M
                                              IOOptions& opts) {
456
2.55M
  return PrepareIOFromWriteOptions(wo, opts);
457
2.55M
}
458
459
34.6k
IOStatus WritableFileWriter::Sync(const IOOptions& opts, bool use_fsync) {
460
34.6k
  if (seen_error()) {
461
0
    return GetWriterHasPreviousErrorStatus();
462
0
  }
463
464
34.6k
  IOOptions io_options = FinalizeIOOptions(opts);
465
34.6k
  IOStatus s = Flush(io_options);
466
34.6k
  if (!s.ok()) {
467
0
    set_seen_error(s);
468
0
    return s;
469
0
  }
470
34.6k
  TEST_KILL_RANDOM("WritableFileWriter::Sync:0");
471
34.6k
  if (!use_direct_io() && pending_sync_) {
472
34.6k
    s = SyncInternal(io_options, use_fsync);
473
34.6k
    if (!s.ok()) {
474
0
      set_seen_error(s);
475
0
      return s;
476
0
    }
477
34.6k
  }
478
34.6k
  TEST_KILL_RANDOM("WritableFileWriter::Sync:1");
479
34.6k
  pending_sync_ = false;
480
34.6k
  return IOStatus::OK();
481
34.6k
}
482
483
IOStatus WritableFileWriter::SyncWithoutFlush(const IOOptions& opts,
484
0
                                              bool use_fsync) {
485
0
  if (seen_error()) {
486
0
    return GetWriterHasPreviousErrorStatus();
487
0
  }
488
0
  IOOptions io_options = FinalizeIOOptions(opts);
489
0
  if (!writable_file_->IsSyncThreadSafe()) {
490
0
    return IOStatus::NotSupported(
491
0
        "Can't WritableFileWriter::SyncWithoutFlush() because "
492
0
        "WritableFile::IsSyncThreadSafe() is false");
493
0
  }
494
0
  TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:1");
495
0
  IOStatus s = SyncInternal(io_options, use_fsync);
496
0
  TEST_SYNC_POINT("WritableFileWriter::SyncWithoutFlush:2");
497
0
  if (!s.ok()) {
498
0
    set_seen_error(s);
499
0
  }
500
0
  return s;
501
0
}
502
503
IOStatus WritableFileWriter::SyncInternal(const IOOptions& opts,
504
34.6k
                                          bool use_fsync) {
505
  // Caller is supposed to check seen_error_
506
34.6k
  IOStatus s;
507
34.6k
  IOSTATS_TIMER_GUARD(fsync_nanos);
508
34.6k
  TEST_SYNC_POINT("WritableFileWriter::SyncInternal:0");
509
34.6k
  auto prev_perf_level = GetPerfLevel();
510
511
34.6k
  IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
512
513
34.6k
  FileOperationInfo::StartTimePoint start_ts;
514
34.6k
  if (ShouldNotifyListeners()) {
515
0
    start_ts = FileOperationInfo::StartNow();
516
0
  }
517
518
34.6k
  if (use_fsync) {
519
11.8k
    s = writable_file_->Fsync(opts, nullptr);
520
22.8k
  } else {
521
22.8k
    s = writable_file_->Sync(opts, nullptr);
522
22.8k
  }
523
34.6k
  if (ShouldNotifyListeners()) {
524
0
    auto finish_ts = std::chrono::steady_clock::now();
525
0
    NotifyOnFileSyncFinish(
526
0
        start_ts, finish_ts, s,
527
0
        use_fsync ? FileOperationType::kFsync : FileOperationType::kSync);
528
0
    if (!s.ok()) {
529
0
      NotifyOnIOError(
530
0
          s, (use_fsync ? FileOperationType::kFsync : FileOperationType::kSync),
531
0
          file_name());
532
0
    }
533
0
  }
534
34.6k
  SetPerfLevel(prev_perf_level);
535
536
  // The caller will be responsible to call set_seen_error(s) if s is not OK.
537
34.6k
  return s;
538
34.6k
}
539
540
IOStatus WritableFileWriter::RangeSync(const IOOptions& opts, uint64_t offset,
541
0
                                       uint64_t nbytes) {
542
0
  if (seen_error()) {
543
0
    return GetWriterHasPreviousErrorStatus();
544
0
  }
545
546
0
  IOSTATS_TIMER_GUARD(range_sync_nanos);
547
0
  TEST_SYNC_POINT("WritableFileWriter::RangeSync:0");
548
0
  FileOperationInfo::StartTimePoint start_ts;
549
0
  if (ShouldNotifyListeners()) {
550
0
    start_ts = FileOperationInfo::StartNow();
551
0
  }
552
0
  IOStatus s = writable_file_->RangeSync(offset, nbytes, opts, nullptr);
553
0
  if (!s.ok()) {
554
0
    set_seen_error(s);
555
0
  }
556
0
  if (ShouldNotifyListeners()) {
557
0
    auto finish_ts = std::chrono::steady_clock::now();
558
0
    NotifyOnFileRangeSyncFinish(offset, nbytes, start_ts, finish_ts, s);
559
0
    if (!s.ok()) {
560
0
      NotifyOnIOError(s, FileOperationType::kRangeSync, file_name(), nbytes,
561
0
                      offset);
562
0
    }
563
0
  }
564
0
  return s;
565
0
}
566
567
// This method writes to disk the specified data and makes use of the rate
568
// limiter if available
569
IOStatus WritableFileWriter::WriteBuffered(const IOOptions& opts,
570
1.32M
                                           const char* data, size_t size) {
571
1.32M
  if (seen_error()) {
572
0
    return GetWriterHasPreviousErrorStatus();
573
0
  }
574
575
1.32M
  IOStatus s;
576
1.32M
  assert(!use_direct_io());
577
1.32M
  const char* src = data;
578
1.32M
  size_t left = size;
579
1.32M
  DataVerificationInfo v_info;
580
1.32M
  char checksum_buf[sizeof(uint32_t)];
581
1.32M
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
582
583
2.64M
  while (left > 0) {
584
1.32M
    size_t allowed = left;
585
1.32M
    if (rate_limiter_ != nullptr &&
586
1.32M
        rate_limiter_priority_used != Env::IO_TOTAL) {
587
0
      allowed = rate_limiter_->RequestToken(left, 0 /* alignment */,
588
0
                                            rate_limiter_priority_used, stats_,
589
0
                                            RateLimiter::OpType::kWrite);
590
0
    }
591
592
1.32M
    {
593
1.32M
      IOSTATS_TIMER_GUARD(write_nanos);
594
1.32M
      TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
595
596
1.32M
      FileOperationInfo::StartTimePoint start_ts;
597
1.32M
      uint64_t old_size = writable_file_->GetFileSize(opts, nullptr);
598
1.32M
      if (ShouldNotifyListeners()) {
599
0
        start_ts = FileOperationInfo::StartNow();
600
0
        old_size = next_write_offset_;
601
0
      }
602
1.32M
      {
603
1.32M
        auto prev_perf_level = GetPerfLevel();
604
605
1.32M
        IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
606
1.32M
        if (perform_data_verification_) {
607
0
          Crc32cHandoffChecksumCalculation(src, allowed, checksum_buf);
608
0
          v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
609
0
          s = writable_file_->Append(Slice(src, allowed), opts, v_info,
610
0
                                     nullptr);
611
1.32M
        } else {
612
1.32M
          s = writable_file_->Append(Slice(src, allowed), opts, nullptr);
613
1.32M
        }
614
1.32M
        if (!s.ok()) {
615
          // If writable_file_->Append() failed, then the data may or may not
616
          // exist in the underlying memory buffer, OS page cache, remote file
617
          // system's buffer, etc. If WritableFileWriter keeps the data in
618
          // buf_, then a future Close() or write retry may send the data to
619
          // the underlying file again. If the data does exist in the
620
          // underlying buffer and gets written to the file eventually despite
621
          // returning error, the file may end up with two duplicate pieces of
622
          // data. Therefore, clear the buf_ at the WritableFileWriter layer
623
          // and let caller determine error handling.
624
0
          buf_.Size(0);
625
0
          buffered_data_crc32c_checksum_ = 0;
626
0
        }
627
1.32M
        SetPerfLevel(prev_perf_level);
628
1.32M
      }
629
1.32M
      if (ShouldNotifyListeners()) {
630
0
        auto finish_ts = std::chrono::steady_clock::now();
631
0
        NotifyOnFileWriteFinish(old_size, allowed, start_ts, finish_ts, s);
632
0
        if (!s.ok()) {
633
0
          NotifyOnIOError(s, FileOperationType::kAppend, file_name(), allowed,
634
0
                          old_size);
635
0
        }
636
0
      }
637
1.32M
      if (!s.ok()) {
638
0
        set_seen_error(s);
639
0
        return s;
640
0
      }
641
1.32M
    }
642
643
1.32M
    IOSTATS_ADD(bytes_written, allowed);
644
1.32M
    TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
645
646
1.32M
    left -= allowed;
647
1.32M
    src += allowed;
648
1.32M
    uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
649
1.32M
    flushed_size_.store(cur_size + allowed, std::memory_order_release);
650
1.32M
  }
651
1.32M
  buf_.Size(0);
652
1.32M
  buffered_data_crc32c_checksum_ = 0;
653
1.32M
  if (!s.ok()) {
654
0
    set_seen_error(s);
655
0
  }
656
1.32M
  return s;
657
1.32M
}
658
659
IOStatus WritableFileWriter::WriteBufferedWithChecksum(const IOOptions& opts,
660
                                                       const char* data,
661
0
                                                       size_t size) {
662
0
  if (seen_error()) {
663
0
    return GetWriterHasPreviousErrorStatus();
664
0
  }
665
666
0
  IOStatus s;
667
0
  assert(!use_direct_io());
668
0
  assert(perform_data_verification_ && buffered_data_with_checksum_);
669
0
  const char* src = data;
670
0
  size_t left = size;
671
0
  DataVerificationInfo v_info;
672
0
  char checksum_buf[sizeof(uint32_t)];
673
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
674
  // Check how much is allowed. Here, we loop until the rate limiter allows to
675
  // write the entire buffer.
676
  // TODO: need to be improved since it sort of defeats the purpose of the rate
677
  // limiter
678
0
  size_t data_size = left;
679
0
  if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
680
0
    while (data_size > 0) {
681
0
      size_t tmp_size;
682
0
      tmp_size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
683
0
                                             rate_limiter_priority_used, stats_,
684
0
                                             RateLimiter::OpType::kWrite);
685
0
      data_size -= tmp_size;
686
0
    }
687
0
  }
688
689
0
  {
690
0
    IOSTATS_TIMER_GUARD(write_nanos);
691
0
    TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
692
693
0
    FileOperationInfo::StartTimePoint start_ts;
694
0
    uint64_t old_size = writable_file_->GetFileSize(opts, nullptr);
695
0
    if (ShouldNotifyListeners()) {
696
0
      start_ts = FileOperationInfo::StartNow();
697
0
      old_size = next_write_offset_;
698
0
    }
699
0
    {
700
0
      auto prev_perf_level = GetPerfLevel();
701
702
0
      IOSTATS_CPU_TIMER_GUARD(cpu_write_nanos, clock_);
703
704
0
      EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
705
0
      v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
706
0
      s = writable_file_->Append(Slice(src, left), opts, v_info, nullptr);
707
0
      SetPerfLevel(prev_perf_level);
708
0
    }
709
0
    if (ShouldNotifyListeners()) {
710
0
      auto finish_ts = std::chrono::steady_clock::now();
711
0
      NotifyOnFileWriteFinish(old_size, left, start_ts, finish_ts, s);
712
0
      if (!s.ok()) {
713
0
        NotifyOnIOError(s, FileOperationType::kAppend, file_name(), left,
714
0
                        old_size);
715
0
      }
716
0
    }
717
0
    if (!s.ok()) {
718
      // If writable_file_->Append() failed, then the data may or may not
719
      // exist in the underlying memory buffer, OS page cache, remote file
720
      // system's buffer, etc. If WritableFileWriter keeps the data in
721
      // buf_, then a future Close() or write retry may send the data to
722
      // the underlying file again. If the data does exist in the
723
      // underlying buffer and gets written to the file eventually despite
724
      // returning error, the file may end up with two duplicate pieces of
725
      // data. Therefore, clear the buf_ at the WritableFileWriter layer
726
      // and let caller determine error handling.
727
0
      buf_.Size(0);
728
0
      buffered_data_crc32c_checksum_ = 0;
729
0
      set_seen_error(s);
730
0
      return s;
731
0
    }
732
0
  }
733
734
0
  IOSTATS_ADD(bytes_written, left);
735
0
  TEST_KILL_RANDOM("WritableFileWriter::WriteBuffered:0");
736
737
  // Buffer write is successful, reset the buffer current size to 0 and reset
738
  // the corresponding checksum value
739
0
  buf_.Size(0);
740
0
  buffered_data_crc32c_checksum_ = 0;
741
0
  uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
742
0
  flushed_size_.store(cur_size + left, std::memory_order_release);
743
0
  if (!s.ok()) {
744
0
    set_seen_error(s);
745
0
  }
746
0
  return s;
747
0
}
748
749
5.16M
void WritableFileWriter::UpdateFileChecksum(const Slice& data) {
750
5.16M
  if (checksum_generator_ != nullptr) {
751
0
    checksum_generator_->Update(data.data(), data.size());
752
0
  }
753
5.16M
}
754
755
// Currently, crc32c checksum is used to calculate the checksum value of the
756
// content in the input buffer for handoff. In the future, the checksum might be
757
// calculated from the existing crc32c checksums of the in WAl and Manifest
758
// records, or even SST file blocks.
759
// TODO: effectively use the existing checksum of the data being writing to
760
// generate the crc32c checksum instead of a raw calculation.
761
void WritableFileWriter::Crc32cHandoffChecksumCalculation(const char* data,
762
                                                          size_t size,
763
0
                                                          char* buf) {
764
0
  uint32_t v_crc32c = crc32c::Extend(0, data, size);
765
0
  EncodeFixed32(buf, v_crc32c);
766
0
}
767
768
// This flushes the accumulated data in the buffer. We pad data with zeros if
769
// necessary to the whole page.
770
// However, during automatic flushes padding would not be necessary.
771
// We always use RateLimiter if available. We move (Refit) any buffer bytes
772
// that are left over the
773
// whole number of pages to be written again on the next flush because we can
774
// only write on aligned
775
// offsets.
776
0
IOStatus WritableFileWriter::WriteDirect(const IOOptions& opts) {
777
0
  if (seen_error()) {
778
0
    assert(false);
779
780
0
    return IOStatus::IOError("Writer has previous error.");
781
0
  }
782
783
0
  assert(use_direct_io());
784
0
  IOStatus s;
785
0
  const size_t alignment = buf_.Alignment();
786
0
  assert((next_write_offset_ % alignment) == 0);
787
788
  // Calculate whole page final file advance if all writes succeed
789
0
  const size_t file_advance =
790
0
      TruncateToPageBoundary(alignment, buf_.CurrentSize());
791
792
  // Calculate the leftover tail, we write it here padded with zeros BUT we
793
  // will write it again in the future either on Close() OR when the current
794
  // whole page fills out.
795
0
  const size_t leftover_tail = buf_.CurrentSize() - file_advance;
796
797
  // Round up and pad
798
0
  buf_.PadToAlignmentWith(0);
799
800
0
  const char* src = buf_.BufferStart();
801
0
  uint64_t write_offset = next_write_offset_;
802
0
  size_t left = buf_.CurrentSize();
803
0
  DataVerificationInfo v_info;
804
0
  char checksum_buf[sizeof(uint32_t)];
805
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
806
807
0
  while (left > 0) {
808
    // Check how much is allowed
809
0
    size_t size = left;
810
0
    if (rate_limiter_ != nullptr &&
811
0
        rate_limiter_priority_used != Env::IO_TOTAL) {
812
0
      size = rate_limiter_->RequestToken(left, buf_.Alignment(),
813
0
                                         rate_limiter_priority_used, stats_,
814
0
                                         RateLimiter::OpType::kWrite);
815
0
    }
816
817
0
    {
818
0
      IOSTATS_TIMER_GUARD(write_nanos);
819
0
      TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
820
0
      FileOperationInfo::StartTimePoint start_ts;
821
0
      if (ShouldNotifyListeners()) {
822
0
        start_ts = FileOperationInfo::StartNow();
823
0
      }
824
      // direct writes must be positional
825
0
      if (perform_data_verification_) {
826
0
        Crc32cHandoffChecksumCalculation(src, size, checksum_buf);
827
0
        v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
828
0
        s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
829
0
                                             opts, v_info, nullptr);
830
0
      } else {
831
0
        s = writable_file_->PositionedAppend(Slice(src, size), write_offset,
832
0
                                             opts, nullptr);
833
0
      }
834
835
0
      if (ShouldNotifyListeners()) {
836
0
        auto finish_ts = std::chrono::steady_clock::now();
837
0
        NotifyOnFileWriteFinish(write_offset, size, start_ts, finish_ts, s);
838
0
        if (!s.ok()) {
839
0
          NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
840
0
                          size, write_offset);
841
0
        }
842
0
      }
843
0
      if (!s.ok()) {
844
0
        buf_.Size(file_advance + leftover_tail);
845
0
        set_seen_error(s);
846
0
        return s;
847
0
      }
848
0
    }
849
850
0
    IOSTATS_ADD(bytes_written, size);
851
0
    left -= size;
852
0
    src += size;
853
0
    write_offset += size;
854
0
    uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
855
0
    flushed_size_.store(cur_size + size, std::memory_order_release);
856
0
    assert((next_write_offset_ % alignment) == 0);
857
0
  }
858
859
0
  if (s.ok()) {
860
    // Move the tail to the beginning of the buffer
861
    // This never happens during normal Append but rather during
862
    // explicit call to Flush()/Sync() or Close()
863
0
    buf_.RefitTail(file_advance, leftover_tail);
864
    // This is where we start writing next time which may or not be
865
    // the actual file size on disk. They match if the buffer size
866
    // is a multiple of whole pages otherwise filesize_ is leftover_tail
867
    // behind
868
0
    next_write_offset_ += file_advance;
869
0
  } else {
870
0
    set_seen_error(s);
871
0
  }
872
0
  return s;
873
0
}
874
875
0
IOStatus WritableFileWriter::WriteDirectWithChecksum(const IOOptions& opts) {
876
0
  if (seen_error()) {
877
0
    return GetWriterHasPreviousErrorStatus();
878
0
  }
879
880
0
  assert(use_direct_io());
881
0
  assert(perform_data_verification_ && buffered_data_with_checksum_);
882
0
  IOStatus s;
883
0
  const size_t alignment = buf_.Alignment();
884
0
  assert((next_write_offset_ % alignment) == 0);
885
886
  // Calculate whole page final file advance if all writes succeed
887
0
  const size_t file_advance =
888
0
      TruncateToPageBoundary(alignment, buf_.CurrentSize());
889
890
  // Calculate the leftover tail, we write it here padded with zeros BUT we
891
  // will write it again in the future either on Close() OR when the current
892
  // whole page fills out.
893
0
  const size_t leftover_tail = buf_.CurrentSize() - file_advance;
894
895
  // Round up, pad, and combine the checksum.
896
0
  size_t last_cur_size = buf_.CurrentSize();
897
0
  buf_.PadToAlignmentWith(0);
898
0
  size_t padded_size = buf_.CurrentSize() - last_cur_size;
899
0
  const char* padded_start = buf_.BufferStart() + last_cur_size;
900
0
  uint32_t padded_checksum = crc32c::Value(padded_start, padded_size);
901
0
  buffered_data_crc32c_checksum_ = crc32c::Crc32cCombine(
902
0
      buffered_data_crc32c_checksum_, padded_checksum, padded_size);
903
904
0
  const char* src = buf_.BufferStart();
905
0
  uint64_t write_offset = next_write_offset_;
906
0
  size_t left = buf_.CurrentSize();
907
0
  DataVerificationInfo v_info;
908
0
  char checksum_buf[sizeof(uint32_t)];
909
910
0
  Env::IOPriority rate_limiter_priority_used = opts.rate_limiter_priority;
911
  // Check how much is allowed. Here, we loop until the rate limiter allows to
912
  // write the entire buffer.
913
  // TODO: need to be improved since it sort of defeats the purpose of the rate
914
  // limiter
915
0
  size_t data_size = left;
916
0
  if (rate_limiter_ != nullptr && rate_limiter_priority_used != Env::IO_TOTAL) {
917
0
    while (data_size > 0) {
918
0
      size_t size;
919
0
      size = rate_limiter_->RequestToken(data_size, buf_.Alignment(),
920
0
                                         rate_limiter_priority_used, stats_,
921
0
                                         RateLimiter::OpType::kWrite);
922
0
      data_size -= size;
923
0
    }
924
0
  }
925
926
0
  {
927
0
    IOSTATS_TIMER_GUARD(write_nanos);
928
0
    TEST_SYNC_POINT("WritableFileWriter::Flush:BeforeAppend");
929
0
    FileOperationInfo::StartTimePoint start_ts;
930
0
    if (ShouldNotifyListeners()) {
931
0
      start_ts = FileOperationInfo::StartNow();
932
0
    }
933
    // direct writes must be positional
934
0
    EncodeFixed32(checksum_buf, buffered_data_crc32c_checksum_);
935
0
    v_info.checksum = Slice(checksum_buf, sizeof(uint32_t));
936
0
    s = writable_file_->PositionedAppend(Slice(src, left), write_offset, opts,
937
0
                                         v_info, nullptr);
938
939
0
    if (ShouldNotifyListeners()) {
940
0
      auto finish_ts = std::chrono::steady_clock::now();
941
0
      NotifyOnFileWriteFinish(write_offset, left, start_ts, finish_ts, s);
942
0
      if (!s.ok()) {
943
0
        NotifyOnIOError(s, FileOperationType::kPositionedAppend, file_name(),
944
0
                        left, write_offset);
945
0
      }
946
0
    }
947
0
    if (!s.ok()) {
948
      // In this case, we do not change buffered_data_crc32c_checksum_ because
949
      // it still aligns with the data in the buffer.
950
0
      buf_.Size(file_advance + leftover_tail);
951
0
      buffered_data_crc32c_checksum_ =
952
0
          crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
953
0
      set_seen_error(s);
954
0
      return s;
955
0
    }
956
0
  }
957
958
0
  IOSTATS_ADD(bytes_written, left);
959
0
  assert((next_write_offset_ % alignment) == 0);
960
0
  uint64_t cur_size = flushed_size_.load(std::memory_order_acquire);
961
0
  flushed_size_.store(cur_size + left, std::memory_order_release);
962
963
0
  if (s.ok()) {
964
    // Move the tail to the beginning of the buffer
965
    // This never happens during normal Append but rather during
966
    // explicit call to Flush()/Sync() or Close(). Also the buffer checksum will
967
    // recalculated accordingly.
968
0
    buf_.RefitTail(file_advance, leftover_tail);
969
    // Adjust the checksum value to align with the data in the buffer
970
0
    buffered_data_crc32c_checksum_ =
971
0
        crc32c::Value(buf_.BufferStart(), buf_.CurrentSize());
972
    // This is where we start writing next time which may or not be
973
    // the actual file size on disk. They match if the buffer size
974
    // is a multiple of whole pages otherwise filesize_ is leftover_tail
975
    // behind
976
0
    next_write_offset_ += file_advance;
977
0
  } else {
978
0
    set_seen_error(s);
979
0
  }
980
0
  return s;
981
0
}
982
Env::IOPriority WritableFileWriter::DecideRateLimiterPriority(
983
    Env::IOPriority writable_file_io_priority,
984
6.68M
    Env::IOPriority op_rate_limiter_priority) {
985
6.68M
  if (writable_file_io_priority == Env::IO_TOTAL &&
986
6.68M
      op_rate_limiter_priority == Env::IO_TOTAL) {
987
6.60M
    return Env::IO_TOTAL;
988
6.60M
  } else if (writable_file_io_priority == Env::IO_TOTAL) {
989
0
    return op_rate_limiter_priority;
990
82.2k
  } else if (op_rate_limiter_priority == Env::IO_TOTAL) {
991
1.19k
    return writable_file_io_priority;
992
81.0k
  } else {
993
81.0k
    return op_rate_limiter_priority;
994
81.0k
  }
995
6.68M
}
996
997
6.73M
IOOptions WritableFileWriter::FinalizeIOOptions(const IOOptions& opts) const {
998
6.73M
  Env::IOPriority op_rate_limiter_priority = opts.rate_limiter_priority;
999
6.73M
  IOOptions io_options(opts);
1000
6.73M
  if (writable_file_.get() != nullptr) {
1001
6.68M
    io_options.rate_limiter_priority =
1002
6.68M
        WritableFileWriter::DecideRateLimiterPriority(
1003
6.68M
            writable_file_->GetIOPriority(), op_rate_limiter_priority);
1004
6.68M
  }
1005
6.73M
  return io_options;
1006
6.73M
}
1007
}  // namespace ROCKSDB_NAMESPACE