Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/file/random_access_file_reader.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "file/random_access_file_reader.h"
11
12
#include <algorithm>
13
#include <mutex>
14
15
#include "file/file_util.h"
16
#include "monitoring/histogram.h"
17
#include "monitoring/iostats_context_imp.h"
18
#include "port/port.h"
19
#include "table/format.h"
20
#include "test_util/sync_point.h"
21
#include "util/random.h"
22
#include "util/rate_limiter_impl.h"
23
24
namespace ROCKSDB_NAMESPACE {
25
inline Histograms GetFileReadHistograms(Statistics* stats,
26
464k
                                        Env::IOActivity io_activity) {
27
464k
  switch (io_activity) {
28
8.36k
    case Env::IOActivity::kFlush:
29
8.36k
      return Histograms::FILE_READ_FLUSH_MICROS;
30
19.6k
    case Env::IOActivity::kCompaction:
31
19.6k
      return Histograms::FILE_READ_COMPACTION_MICROS;
32
424k
    case Env::IOActivity::kDBOpen:
33
424k
      return Histograms::FILE_READ_DB_OPEN_MICROS;
34
11.5k
    default:
35
11.5k
      break;
36
464k
  }
37
38
11.5k
  if (stats && stats->get_stats_level() > StatsLevel::kExceptDetailedTimers) {
39
0
    switch (io_activity) {
40
0
      case Env::IOActivity::kGet:
41
0
        return Histograms::FILE_READ_GET_MICROS;
42
0
      case Env::IOActivity::kMultiGet:
43
0
        return Histograms::FILE_READ_MULTIGET_MICROS;
44
0
      case Env::IOActivity::kDBIterator:
45
0
        return Histograms::FILE_READ_DB_ITERATOR_MICROS;
46
0
      case Env::IOActivity::kVerifyDBChecksum:
47
0
        return Histograms::FILE_READ_VERIFY_DB_CHECKSUM_MICROS;
48
0
      case Env::IOActivity::kVerifyFileChecksums:
49
0
        return Histograms::FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS;
50
0
      default:
51
0
        break;
52
0
    }
53
0
  }
54
11.5k
  return Histograms::HISTOGRAM_ENUM_MAX;
55
11.5k
}
56
inline void RecordIOStats(Statistics* stats, Temperature file_temperature,
57
458k
                          bool is_last_level, size_t size) {
58
458k
  IOSTATS_ADD(bytes_read, size);
59
  // record for last/non-last level
60
458k
  if (is_last_level) {
61
155k
    RecordTick(stats, LAST_LEVEL_READ_BYTES, size);
62
155k
    RecordTick(stats, LAST_LEVEL_READ_COUNT, 1);
63
302k
  } else {
64
302k
    RecordTick(stats, NON_LAST_LEVEL_READ_BYTES, size);
65
302k
    RecordTick(stats, NON_LAST_LEVEL_READ_COUNT, 1);
66
302k
  }
67
68
  // record for temperature file
69
458k
  switch (file_temperature) {
70
0
    case Temperature::kHot:
71
0
      IOSTATS_ADD(file_io_stats_by_temperature.hot_file_bytes_read, size);
72
0
      IOSTATS_ADD(file_io_stats_by_temperature.hot_file_read_count, 1);
73
0
      RecordTick(stats, HOT_FILE_READ_BYTES, size);
74
0
      RecordTick(stats, HOT_FILE_READ_COUNT, 1);
75
0
      break;
76
0
    case Temperature::kWarm:
77
0
      IOSTATS_ADD(file_io_stats_by_temperature.warm_file_bytes_read, size);
78
0
      IOSTATS_ADD(file_io_stats_by_temperature.warm_file_read_count, 1);
79
0
      RecordTick(stats, WARM_FILE_READ_BYTES, size);
80
0
      RecordTick(stats, WARM_FILE_READ_COUNT, 1);
81
0
      break;
82
0
    case Temperature::kCool:
83
0
      IOSTATS_ADD(file_io_stats_by_temperature.cool_file_bytes_read, size);
84
0
      IOSTATS_ADD(file_io_stats_by_temperature.cool_file_read_count, 1);
85
0
      RecordTick(stats, COOL_FILE_READ_BYTES, size);
86
0
      RecordTick(stats, COOL_FILE_READ_COUNT, 1);
87
0
      break;
88
0
    case Temperature::kCold:
89
0
      IOSTATS_ADD(file_io_stats_by_temperature.cold_file_bytes_read, size);
90
0
      IOSTATS_ADD(file_io_stats_by_temperature.cold_file_read_count, 1);
91
0
      RecordTick(stats, COLD_FILE_READ_BYTES, size);
92
0
      RecordTick(stats, COLD_FILE_READ_COUNT, 1);
93
0
      break;
94
0
    case Temperature::kIce:
95
0
      IOSTATS_ADD(file_io_stats_by_temperature.ice_file_bytes_read, size);
96
0
      IOSTATS_ADD(file_io_stats_by_temperature.ice_file_read_count, 1);
97
0
      RecordTick(stats, ICE_FILE_READ_BYTES, size);
98
0
      RecordTick(stats, ICE_FILE_READ_COUNT, 1);
99
0
      break;
100
457k
    case Temperature::kUnknown:
101
457k
      if (is_last_level) {
102
155k
        IOSTATS_ADD(file_io_stats_by_temperature.unknown_last_level_bytes_read,
103
155k
                    size);
104
155k
        IOSTATS_ADD(file_io_stats_by_temperature.unknown_last_level_read_count,
105
155k
                    1);
106
302k
      } else {
107
302k
        IOSTATS_ADD(
108
302k
            file_io_stats_by_temperature.unknown_non_last_level_bytes_read,
109
302k
            size);
110
302k
        IOSTATS_ADD(
111
302k
            file_io_stats_by_temperature.unknown_non_last_level_read_count, 1);
112
302k
      }
113
457k
      break;
114
0
    default:
115
0
      break;
116
458k
  }
117
458k
}
118
119
IOStatus RandomAccessFileReader::Create(
120
    const std::shared_ptr<FileSystem>& fs, const std::string& fname,
121
    const FileOptions& file_opts,
122
0
    std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) {
123
0
  std::unique_ptr<FSRandomAccessFile> file;
124
0
  IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg);
125
0
  if (io_s.ok()) {
126
0
    reader->reset(new RandomAccessFileReader(std::move(file), fname));
127
0
  }
128
0
  return io_s;
129
0
}
130
131
IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset,
132
                                      size_t n, Slice* result, char* scratch,
133
                                      AlignedBuf* aligned_buf,
134
464k
                                      IODebugContext* dbg) const {
135
464k
  (void)aligned_buf;
136
464k
  const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
137
138
464k
  TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr);
139
464k
  TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read:IODebugContext",
140
464k
                           const_cast<void*>(static_cast<void*>(dbg)));
141
142
  // To be paranoid: modify scratch a little bit, so in case underlying
143
  // FileSystem doesn't fill the buffer but return success and `scratch` returns
144
  // contains a previous block, returned value will not pass checksum.
145
464k
  if (n > 0 && scratch != nullptr) {
146
    // This byte might not change anything for direct I/O case, but it's OK.
147
463k
    scratch[0]++;
148
463k
  }
149
150
464k
  IOStatus io_s;
151
464k
  uint64_t elapsed = 0;
152
464k
  size_t alignment = file_->GetRequiredBufferAlignment();
153
464k
  bool is_aligned = false;
154
464k
  if (scratch != nullptr) {
155
    // Check if offset, length and buffer are aligned.
156
464k
    is_aligned = (offset & (alignment - 1)) == 0 &&
157
22.0k
                 (n & (alignment - 1)) == 0 &&
158
16
                 (uintptr_t(scratch) & (alignment - 1)) == 0;
159
464k
  }
160
161
464k
  {
162
464k
    StopWatch sw(clock_, stats_, hist_type_,
163
464k
                 GetFileReadHistograms(stats_, opts.io_activity),
164
464k
                 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
165
464k
                 true /*delay_enabled*/);
166
464k
    auto prev_perf_level = GetPerfLevel();
167
464k
    IOSTATS_TIMER_GUARD(read_nanos);
168
464k
    if (use_direct_io() && is_aligned == false) {
169
0
      size_t aligned_offset =
170
0
          TruncateToPageBoundary(alignment, static_cast<size_t>(offset));
171
0
      size_t offset_advance = static_cast<size_t>(offset) - aligned_offset;
172
0
      size_t read_size =
173
0
          Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset;
174
0
      AlignedBuffer buf;
175
0
      buf.Alignment(alignment);
176
0
      buf.AllocateNewBuffer(read_size);
177
0
      while (buf.CurrentSize() < read_size) {
178
0
        size_t allowed;
179
0
        if (rate_limiter_priority != Env::IO_TOTAL &&
180
0
            rate_limiter_ != nullptr) {
181
0
          allowed = rate_limiter_->RequestToken(
182
0
              buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
183
0
              rate_limiter_priority, stats_, RateLimiter::OpType::kRead);
184
0
        } else {
185
0
          assert(buf.CurrentSize() == 0);
186
0
          allowed = read_size;
187
0
        }
188
0
        Slice tmp;
189
190
0
        FileOperationInfo::StartTimePoint start_ts;
191
0
        uint64_t orig_offset = 0;
192
0
        if (ShouldNotifyListeners()) {
193
0
          start_ts = FileOperationInfo::StartNow();
194
0
          orig_offset = aligned_offset + buf.CurrentSize();
195
0
        }
196
197
0
        {
198
0
          IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
199
          // Only user reads are expected to specify a timeout. And user reads
200
          // are not subjected to rate_limiter and should go through only
201
          // one iteration of this loop, so we don't need to check and adjust
202
          // the opts.timeout before calling file_->Read
203
0
          assert(!opts.timeout.count() || allowed == read_size);
204
0
          io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts,
205
0
                             &tmp, buf.Destination(), dbg);
206
0
        }
207
0
        if (ShouldNotifyListeners()) {
208
0
          auto finish_ts = FileOperationInfo::FinishNow();
209
0
          NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
210
0
                                 io_s);
211
0
          if (!io_s.ok()) {
212
0
            NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
213
0
                            tmp.size(), orig_offset);
214
0
          }
215
0
        }
216
217
0
        buf.Size(buf.CurrentSize() + tmp.size());
218
0
        if (!io_s.ok() || tmp.size() < allowed) {
219
0
          break;
220
0
        }
221
0
      }
222
0
      size_t res_len = 0;
223
0
      if (io_s.ok() && offset_advance < buf.CurrentSize()) {
224
0
        res_len = std::min(buf.CurrentSize() - offset_advance, n);
225
0
        if (aligned_buf == nullptr) {
226
0
          buf.Read(scratch, offset_advance, res_len);
227
0
        } else {
228
0
          scratch = buf.BufferStart() + offset_advance;
229
0
          *aligned_buf = buf.Release();
230
0
        }
231
0
      }
232
0
      *result = Slice(scratch, res_len);
233
464k
    } else {
234
464k
      size_t pos = 0;
235
464k
      const char* res_scratch = nullptr;
236
929k
      while (pos < n) {
237
464k
        size_t allowed;
238
464k
        if (rate_limiter_priority != Env::IO_TOTAL &&
239
25.2k
            rate_limiter_ != nullptr) {
240
0
          if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
241
0
            sw.DelayStart();
242
0
          }
243
0
          allowed = rate_limiter_->RequestToken(
244
0
              n - pos, (use_direct_io() ? alignment : 0), rate_limiter_priority,
245
0
              stats_, RateLimiter::OpType::kRead);
246
0
          if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) {
247
0
            sw.DelayStop();
248
0
          }
249
464k
        } else {
250
464k
          allowed = n;
251
464k
        }
252
464k
        Slice tmp_result;
253
254
464k
        FileOperationInfo::StartTimePoint start_ts;
255
464k
        if (ShouldNotifyListeners()) {
256
0
          start_ts = FileOperationInfo::StartNow();
257
0
        }
258
259
464k
        {
260
464k
          IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
261
          // Only user reads are expected to specify a timeout. And user reads
262
          // are not subjected to rate_limiter and should go through only
263
          // one iteration of this loop, so we don't need to check and adjust
264
          // the opts.timeout before calling file_->Read
265
464k
          assert(!opts.timeout.count() || allowed == n);
266
464k
          io_s = file_->Read(offset + pos, allowed, opts, &tmp_result,
267
464k
                             scratch + pos, dbg);
268
464k
        }
269
464k
        if (ShouldNotifyListeners()) {
270
0
          auto finish_ts = FileOperationInfo::FinishNow();
271
0
          NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts,
272
0
                                 finish_ts, io_s);
273
274
0
          if (!io_s.ok()) {
275
0
            NotifyOnIOError(io_s, FileOperationType::kRead, file_name(),
276
0
                            tmp_result.size(), offset + pos);
277
0
          }
278
0
        }
279
464k
        if (res_scratch == nullptr) {
280
          // we can't simply use `scratch` because reads of mmap'd files return
281
          // data in a different buffer.
282
464k
          res_scratch = tmp_result.data();
283
18.4E
        } else {
284
          // make sure chunks are inserted contiguously into `res_scratch`.
285
18.4E
          assert(tmp_result.data() == res_scratch + pos);
286
18.4E
        }
287
464k
        pos += tmp_result.size();
288
464k
        if (!io_s.ok() || tmp_result.size() < allowed) {
289
0
          break;
290
0
        }
291
464k
      }
292
464k
      *result = Slice(res_scratch, io_s.ok() ? pos : 0);
293
464k
    }
294
464k
    RecordIOStats(stats_, file_temperature_, is_last_level_, result->size());
295
464k
    SetPerfLevel(prev_perf_level);
296
464k
  }
297
464k
  if (stats_ != nullptr && file_read_hist_ != nullptr) {
298
0
    file_read_hist_->Add(elapsed);
299
0
  }
300
301
#ifndef NDEBUG
302
  auto pair = std::make_pair(&file_name_, &io_s);
303
  if (offset == 0) {
304
    TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::BeforeReturn",
305
                             &pair);
306
  }
307
  TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::AnyOffset", &pair);
308
#endif
309
464k
  return io_s;
310
464k
}
311
312
0
size_t End(const FSReadRequest& r) {
313
0
  return static_cast<size_t>(r.offset) + r.len;
314
0
}
315
316
0
FSReadRequest Align(const FSReadRequest& r, size_t alignment) {
317
0
  FSReadRequest req;
318
0
  req.offset = static_cast<uint64_t>(
319
0
      TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset)));
320
0
  req.len = Roundup(End(r), alignment) - req.offset;
321
0
  req.scratch = nullptr;
322
0
  return req;
323
0
}
324
325
0
bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) {
326
0
  size_t dest_offset = static_cast<size_t>(dest->offset);
327
0
  size_t src_offset = static_cast<size_t>(src.offset);
328
0
  size_t dest_end = End(*dest);
329
0
  size_t src_end = End(src);
330
0
  if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) {
331
0
    return false;
332
0
  }
333
0
  dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset));
334
0
  dest->len = std::max(dest_end, src_end) - dest->offset;
335
0
  return true;
336
0
}
337
338
IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts,
339
                                           FSReadRequest* read_reqs,
340
                                           size_t num_reqs,
341
                                           AlignedBuf* aligned_buf,
342
0
                                           IODebugContext* dbg) const {
343
0
  (void)aligned_buf;  // suppress warning of unused variable in LITE mode
344
0
  assert(num_reqs > 0);
345
346
#ifndef NDEBUG
347
  for (size_t i = 0; i < num_reqs - 1; ++i) {
348
    assert(read_reqs[i].offset <= read_reqs[i + 1].offset);
349
  }
350
#endif  // !NDEBUG
351
0
  const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority;
352
353
  // To be paranoid modify scratch a little bit, so in case underlying
354
  // FileSystem doesn't fill the buffer but return success and `scratch` returns
355
  // contains a previous block, returned value will not pass checksum.
356
  // This byte might not change anything for direct I/O case, but it's OK.
357
0
  for (size_t i = 0; i < num_reqs; i++) {
358
0
    FSReadRequest& r = read_reqs[i];
359
0
    if (r.len > 0 && r.scratch != nullptr) {
360
0
      r.scratch[0]++;
361
0
    }
362
0
  }
363
364
0
  IOStatus io_s;
365
0
  uint64_t elapsed = 0;
366
0
  {
367
0
    StopWatch sw(clock_, stats_, hist_type_,
368
0
                 GetFileReadHistograms(stats_, opts.io_activity),
369
0
                 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
370
0
                 true /*delay_enabled*/);
371
0
    auto prev_perf_level = GetPerfLevel();
372
0
    IOSTATS_TIMER_GUARD(read_nanos);
373
374
0
    FSReadRequest* fs_reqs = read_reqs;
375
0
    size_t num_fs_reqs = num_reqs;
376
0
    std::vector<FSReadRequest> aligned_reqs;
377
0
    if (use_direct_io()) {
378
      // num_reqs is the max possible size,
379
      // this can reduce std::vecector's internal resize operations.
380
0
      aligned_reqs.reserve(num_reqs);
381
      // Align and merge the read requests.
382
0
      size_t alignment = file_->GetRequiredBufferAlignment();
383
0
      for (size_t i = 0; i < num_reqs; i++) {
384
0
        FSReadRequest r = Align(read_reqs[i], alignment);
385
0
        if (i == 0) {
386
          // head
387
0
          aligned_reqs.push_back(std::move(r));
388
389
0
        } else if (!TryMerge(&aligned_reqs.back(), r)) {
390
          // head + n
391
0
          aligned_reqs.push_back(std::move(r));
392
393
0
        } else {
394
          // unused
395
0
          r.status.PermitUncheckedError();
396
0
        }
397
0
      }
398
0
      TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs",
399
0
                               &aligned_reqs);
400
401
      // Allocate aligned buffer and let scratch buffers point to it.
402
0
      size_t total_len = 0;
403
0
      for (const auto& r : aligned_reqs) {
404
0
        total_len += r.len;
405
0
      }
406
0
      AlignedBuffer buf;
407
0
      buf.Alignment(alignment);
408
0
      buf.AllocateNewBuffer(total_len);
409
0
      char* scratch = buf.BufferStart();
410
0
      for (auto& r : aligned_reqs) {
411
0
        r.scratch = scratch;
412
0
        scratch += r.len;
413
0
      }
414
415
0
      *aligned_buf = buf.Release();
416
0
      fs_reqs = aligned_reqs.data();
417
0
      num_fs_reqs = aligned_reqs.size();
418
0
    }
419
420
0
    FileOperationInfo::StartTimePoint start_ts;
421
0
    if (ShouldNotifyListeners()) {
422
0
      start_ts = FileOperationInfo::StartNow();
423
0
    }
424
425
0
    {
426
0
      IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_);
427
0
      if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
428
        // TODO: ideally we should call `RateLimiter::RequestToken()` for
429
        // allowed bytes to multi-read and then consume those bytes by
430
        // satisfying as many requests in `MultiRead()` as possible, instead of
431
        // what we do here, which can cause burst when the
432
        // `total_multi_read_size` is big.
433
0
        size_t total_multi_read_size = 0;
434
0
        assert(fs_reqs != nullptr);
435
0
        for (size_t i = 0; i < num_fs_reqs; ++i) {
436
0
          FSReadRequest& req = fs_reqs[i];
437
0
          total_multi_read_size += req.len;
438
0
        }
439
0
        size_t remaining_bytes = total_multi_read_size;
440
0
        size_t request_bytes = 0;
441
0
        while (remaining_bytes > 0) {
442
0
          request_bytes = std::min(
443
0
              static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()),
444
0
              remaining_bytes);
445
0
          rate_limiter_->Request(request_bytes, rate_limiter_priority,
446
0
                                 nullptr /* stats */,
447
0
                                 RateLimiter::OpType::kRead);
448
0
          remaining_bytes -= request_bytes;
449
0
        }
450
0
      }
451
0
      TEST_SYNC_POINT_CALLBACK(
452
0
          "RandomAccessFileReader::MultiRead:IODebugContext",
453
0
          const_cast<void*>(static_cast<void*>(dbg)));
454
0
      io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, dbg);
455
0
      RecordInHistogram(stats_, MULTIGET_IO_BATCH_SIZE, num_fs_reqs);
456
0
    }
457
458
0
    if (use_direct_io()) {
459
      // Populate results in the unaligned read requests.
460
0
      size_t aligned_i = 0;
461
0
      for (size_t i = 0; i < num_reqs; i++) {
462
0
        auto& r = read_reqs[i];
463
0
        if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) {
464
0
          aligned_i++;
465
0
        }
466
0
        const auto& fs_r = fs_reqs[aligned_i];
467
0
        r.status = fs_r.status;
468
0
        if (r.status.ok()) {
469
0
          uint64_t offset = r.offset - fs_r.offset;
470
0
          if (fs_r.result.size() <= offset) {
471
            // No byte in the read range is returned.
472
0
            r.result = Slice();
473
0
          } else {
474
0
            size_t len = std::min(
475
0
                r.len, static_cast<size_t>(fs_r.result.size() - offset));
476
0
            r.result = Slice(fs_r.scratch + offset, len);
477
0
          }
478
0
        } else {
479
0
          r.result = Slice();
480
0
        }
481
0
      }
482
0
    }
483
484
0
    for (size_t i = 0; i < num_reqs; ++i) {
485
0
      if (ShouldNotifyListeners()) {
486
0
        auto finish_ts = FileOperationInfo::FinishNow();
487
0
        NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(),
488
0
                               start_ts, finish_ts, read_reqs[i].status);
489
0
      }
490
0
      if (!read_reqs[i].status.ok()) {
491
0
        NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead,
492
0
                        file_name(), read_reqs[i].result.size(),
493
0
                        read_reqs[i].offset);
494
0
      }
495
0
      RecordIOStats(stats_, file_temperature_, is_last_level_,
496
0
                    read_reqs[i].result.size());
497
0
    }
498
0
    SetPerfLevel(prev_perf_level);
499
0
  }
500
0
  if (stats_ != nullptr && file_read_hist_ != nullptr) {
501
0
    file_read_hist_->Add(elapsed);
502
0
  }
503
504
0
  return io_s;
505
0
}
506
507
IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
508
                                                  IOOptions& opts,
509
915k
                                                  IODebugContext* dbg) const {
510
915k
  if (clock_ != nullptr) {
511
915k
    return PrepareIOFromReadOptions(ro, clock_, opts, dbg);
512
915k
  } else {
513
195
    return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts,
514
195
                                    dbg);
515
195
  }
516
915k
}
517
518
// Notes for when direct_io is enabled:
519
// Unless req.offset, req.len, req.scratch are all already aligned,
520
// RandomAccessFileReader will creats aligned requests and aligned buffer for
521
// the request. User should only provide either req.scratch or aligned_buf. If
522
// only req.scratch is provided, result will be copied from allocated aligned
523
// buffer to req.scratch. If only alignd_buf is provided, it will be set to
524
// the ailgned buf allocated by RandomAccessFileReader and saves a copy.
525
IOStatus RandomAccessFileReader::ReadAsync(
526
    FSReadRequest& req, const IOOptions& opts,
527
    std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
528
    void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf,
529
0
    IODebugContext* dbg) {
530
0
  IOStatus s;
531
0
  TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::ReadAsync:InjectStatus",
532
0
                           &s);
533
0
  if (!s.ok()) {
534
0
    return s;
535
0
  }
536
  // Create a callback and populate info.
537
0
  auto read_async_callback =
538
0
      std::bind(&RandomAccessFileReader::ReadAsyncCallback, this,
539
0
                std::placeholders::_1, std::placeholders::_2);
540
541
0
  ReadAsyncInfo* read_async_info = new ReadAsyncInfo(
542
0
      cb, cb_arg, (clock_ != nullptr ? clock_->NowMicros() : 0));
543
544
0
  if (ShouldNotifyListeners()) {
545
0
    read_async_info->fs_start_ts_ = FileOperationInfo::StartNow();
546
0
  }
547
548
0
  size_t alignment = file_->GetRequiredBufferAlignment();
549
0
  bool is_aligned = (req.offset & (alignment - 1)) == 0 &&
550
0
                    (req.len & (alignment - 1)) == 0 &&
551
0
                    (uintptr_t(req.scratch) & (alignment - 1)) == 0;
552
0
  read_async_info->is_aligned_ = is_aligned;
553
554
0
  uint64_t elapsed = 0;
555
0
  if (use_direct_io() && is_aligned == false) {
556
0
    FSReadRequest aligned_req = Align(req, alignment);
557
0
    aligned_req.status.PermitUncheckedError();
558
559
    // Allocate aligned buffer.
560
0
    read_async_info->buf_.Alignment(alignment);
561
0
    read_async_info->buf_.AllocateNewBuffer(aligned_req.len);
562
563
    // Set rem fields in aligned FSReadRequest.
564
0
    aligned_req.scratch = read_async_info->buf_.BufferStart();
565
566
    // Set user provided fields to populate back in callback.
567
0
    read_async_info->user_scratch_ = req.scratch;
568
0
    read_async_info->user_aligned_buf_ = aligned_buf;
569
0
    read_async_info->user_len_ = req.len;
570
0
    read_async_info->user_offset_ = req.offset;
571
0
    read_async_info->user_result_ = req.result;
572
573
0
    assert(read_async_info->buf_.CurrentSize() == 0);
574
575
0
    StopWatch sw(clock_, stats_, hist_type_,
576
0
                 GetFileReadHistograms(stats_, opts.io_activity),
577
0
                 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
578
0
                 true /*delay_enabled*/);
579
0
    s = file_->ReadAsync(aligned_req, opts, read_async_callback,
580
0
                         read_async_info, io_handle, del_fn, dbg);
581
0
  } else {
582
0
    StopWatch sw(clock_, stats_, hist_type_,
583
0
                 GetFileReadHistograms(stats_, opts.io_activity),
584
0
                 (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/,
585
0
                 true /*delay_enabled*/);
586
0
    s = file_->ReadAsync(req, opts, read_async_callback, read_async_info,
587
0
                         io_handle, del_fn, dbg);
588
0
  }
589
0
  RecordTick(stats_, READ_ASYNC_MICROS, elapsed);
590
591
// Suppress false positive clang analyzer warnings.
592
// Memory is not released if file_->ReadAsync returns !s.ok(), because
593
// ReadAsyncCallback is never called in that case. If ReadAsyncCallback is
594
// called then ReadAsync should always return IOStatus::OK().
595
0
#ifndef __clang_analyzer__
596
0
  if (!s.ok()) {
597
0
    delete read_async_info;
598
0
  }
599
0
#endif  // __clang_analyzer__
600
601
0
  return s;
602
0
}
603
604
void RandomAccessFileReader::ReadAsyncCallback(FSReadRequest& req,
605
0
                                               void* cb_arg) {
606
0
  ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg);
607
0
  assert(read_async_info);
608
0
  assert(read_async_info->cb_);
609
610
0
  if (use_direct_io() && read_async_info->is_aligned_ == false) {
611
    // Create FSReadRequest with user provided fields.
612
0
    FSReadRequest user_req;
613
0
    user_req.scratch = read_async_info->user_scratch_;
614
0
    user_req.offset = read_async_info->user_offset_;
615
0
    user_req.len = read_async_info->user_len_;
616
617
    // Update results in user_req.
618
0
    user_req.result = req.result;
619
0
    user_req.status = req.status;
620
621
0
    read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() +
622
0
                               req.result.size());
623
624
0
    size_t offset_advance_len = static_cast<size_t>(
625
0
        /*offset_passed_by_user=*/read_async_info->user_offset_ -
626
0
        /*aligned_offset=*/req.offset);
627
628
0
    size_t res_len = 0;
629
0
    if (req.status.ok() &&
630
0
        offset_advance_len < read_async_info->buf_.CurrentSize()) {
631
0
      res_len =
632
0
          std::min(read_async_info->buf_.CurrentSize() - offset_advance_len,
633
0
                   read_async_info->user_len_);
634
0
      if (read_async_info->user_aligned_buf_ == nullptr) {
635
        // Copy the data into user's scratch.
636
// Clang analyzer assumes that it will take use_direct_io() == false in
637
// ReadAsync and use_direct_io() == true in Callback which cannot be true.
638
0
#ifndef __clang_analyzer__
639
0
        read_async_info->buf_.Read(user_req.scratch, offset_advance_len,
640
0
                                   res_len);
641
0
#endif  // __clang_analyzer__
642
0
      } else {
643
        // Set aligned_buf provided by user without additional copy.
644
0
        user_req.scratch =
645
0
            read_async_info->buf_.BufferStart() + offset_advance_len;
646
0
        *read_async_info->user_aligned_buf_ = read_async_info->buf_.Release();
647
0
      }
648
0
      user_req.result = Slice(user_req.scratch, res_len);
649
0
    } else {
650
      // Either req.status is not ok or data was not read.
651
0
      user_req.result = Slice();
652
0
    }
653
0
    read_async_info->cb_(user_req, read_async_info->cb_arg_);
654
0
  } else {
655
0
    read_async_info->cb_(req, read_async_info->cb_arg_);
656
0
  }
657
658
  // Update stats and notify listeners.
659
0
  if (stats_ != nullptr && file_read_hist_ != nullptr) {
660
    // elapsed doesn't take into account delay and overwrite as StopWatch does
661
    // in Read.
662
0
    uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_;
663
0
    file_read_hist_->Add(elapsed);
664
0
  }
665
0
  if (req.status.ok()) {
666
0
    RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size());
667
0
  } else if (!req.status.IsAborted()) {
668
0
    RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1);
669
0
  }
670
0
  if (ShouldNotifyListeners()) {
671
0
    auto finish_ts = FileOperationInfo::FinishNow();
672
0
    NotifyOnFileReadFinish(req.offset, req.result.size(),
673
0
                           read_async_info->fs_start_ts_, finish_ts,
674
0
                           req.status);
675
0
  }
676
0
  if (!req.status.ok()) {
677
0
    NotifyOnIOError(req.status, FileOperationType::kRead, file_name(),
678
0
                    req.result.size(), req.offset);
679
0
  }
680
0
  RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size());
681
0
  delete read_async_info;
682
0
}
683
}  // namespace ROCKSDB_NAMESPACE