Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/file/random_access_file_reader.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#pragma once
11
#include <atomic>
12
#include <sstream>
13
#include <string>
14
15
#include "env/file_system_tracer.h"
16
#include "port/port.h"
17
#include "rocksdb/file_system.h"
18
#include "rocksdb/listener.h"
19
#include "rocksdb/options.h"
20
#include "rocksdb/rate_limiter.h"
21
#include "util/aligned_buffer.h"
22
23
namespace ROCKSDB_NAMESPACE {
24
class Statistics;
25
class HistogramImpl;
26
class SystemClock;
27
28
using AlignedBuf = FSAllocationPtr;
29
30
// Align the request r according to alignment and return the aligned result.
31
FSReadRequest Align(const FSReadRequest& r, size_t alignment);
32
33
// Try to merge src to dest if they have overlap.
34
//
35
// Each request represents an inclusive interval [offset, offset + len].
36
// If the intervals have overlap, update offset and len to represent the
37
// merged interval, and return true.
38
// Otherwise, do nothing and return false.
39
bool TryMerge(FSReadRequest* dest, const FSReadRequest& src);
40
41
// RandomAccessFileReader is a wrapper on top of FSRandomAccessFile. It is
42
// responsible for:
43
// - Handling Buffered and Direct reads appropriately.
44
// - Rate limiting compaction reads.
45
// - Notifying any interested listeners on the completion of a read.
46
// - Updating IO stats.
47
class RandomAccessFileReader {
48
 private:
49
  void NotifyOnFileReadFinish(
50
      uint64_t offset, size_t length,
51
      const FileOperationInfo::StartTimePoint& start_ts,
52
      const FileOperationInfo::FinishTimePoint& finish_ts,
53
0
      const Status& status) const {
54
0
    FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts,
55
0
                           finish_ts, status, file_temperature_);
56
0
    info.offset = offset;
57
0
    info.length = length;
58
59
0
    for (auto& listener : listeners_) {
60
0
      listener->OnFileReadFinish(info);
61
0
    }
62
0
    info.status.PermitUncheckedError();
63
0
  }
64
65
  void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation,
66
                       const std::string& file_path, size_t length,
67
0
                       uint64_t offset) const {
68
0
    if (listeners_.empty()) {
69
0
      return;
70
0
    }
71
0
    IOErrorInfo io_error_info(io_status, operation, file_path, length, offset);
72
73
0
    for (auto& listener : listeners_) {
74
0
      listener->OnIOError(io_error_info);
75
0
    }
76
0
    io_status.PermitUncheckedError();
77
0
  }
78
79
661k
  bool ShouldNotifyListeners() const { return !listeners_.empty(); }
80
81
  FSRandomAccessFilePtr file_;
82
  std::string file_name_;
83
  SystemClock* clock_;
84
  Statistics* stats_;
85
  uint32_t hist_type_;
86
  HistogramImpl* file_read_hist_;
87
  RateLimiter* rate_limiter_;
88
  std::vector<std::shared_ptr<EventListener>> listeners_;
89
  const Temperature file_temperature_;
90
  const bool is_last_level_;
91
92
  struct ReadAsyncInfo {
93
    ReadAsyncInfo(std::function<void(FSReadRequest&, void*)> cb, void* cb_arg,
94
                  uint64_t start_time)
95
0
        : cb_(cb),
96
0
          cb_arg_(cb_arg),
97
0
          start_time_(start_time),
98
0
          user_scratch_(nullptr),
99
0
          user_aligned_buf_(nullptr),
100
0
          user_offset_(0),
101
0
          user_len_(0),
102
0
          is_aligned_(false) {}
103
104
    std::function<void(FSReadRequest&, void*)> cb_;
105
    void* cb_arg_;
106
    uint64_t start_time_;
107
    FileOperationInfo::StartTimePoint fs_start_ts_;
108
    // Below fields stores the parameters passed by caller in case of direct_io.
109
    char* user_scratch_;
110
    AlignedBuf* user_aligned_buf_;
111
    uint64_t user_offset_;
112
    size_t user_len_;
113
    Slice user_result_;
114
    // Used in case of direct_io
115
    AlignedBuffer buf_;
116
    bool is_aligned_;
117
  };
118
119
 public:
120
  explicit RandomAccessFileReader(
121
      std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
122
      SystemClock* clock = nullptr,
123
      const std::shared_ptr<IOTracer>& io_tracer = nullptr,
124
      Statistics* stats = nullptr,
125
      uint32_t hist_type = Histograms::HISTOGRAM_ENUM_MAX,
126
      HistogramImpl* file_read_hist = nullptr,
127
      RateLimiter* rate_limiter = nullptr,
128
      const std::vector<std::shared_ptr<EventListener>>& listeners = {},
129
      Temperature file_temperature = Temperature::kUnknown,
130
      bool is_last_level = false)
131
78.0k
      : file_(std::move(raf), io_tracer, _file_name),
132
78.0k
        file_name_(std::move(_file_name)),
133
78.0k
        clock_(clock),
134
78.0k
        stats_(stats),
135
78.0k
        hist_type_(hist_type),
136
78.0k
        file_read_hist_(file_read_hist),
137
78.0k
        rate_limiter_(rate_limiter),
138
78.0k
        listeners_(),
139
78.0k
        file_temperature_(file_temperature),
140
78.0k
        is_last_level_(is_last_level) {
141
78.0k
    std::for_each(listeners.begin(), listeners.end(),
142
78.0k
                  [this](const std::shared_ptr<EventListener>& e) {
143
0
                    if (e->ShouldBeNotifiedOnFileIO()) {
144
0
                      listeners_.emplace_back(e);
145
0
                    }
146
0
                  });
147
78.0k
  }
148
149
  static IOStatus Create(const std::shared_ptr<FileSystem>& fs,
150
                         const std::string& fname, const FileOptions& file_opts,
151
                         std::unique_ptr<RandomAccessFileReader>* reader,
152
                         IODebugContext* dbg);
153
  RandomAccessFileReader(const RandomAccessFileReader&) = delete;
154
  RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete;
155
156
  // In non-direct IO mode,
157
  // 1. if using mmap, result is stored in a buffer other than scratch;
158
  // 2. if not using mmap, result is stored in the buffer starting from scratch.
159
  //
160
  // In direct IO mode, an aligned buffer is allocated internally.
161
  // 1. If aligned_buf is null, then results are copied to the buffer
162
  // starting from scratch;
163
  // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns
164
  // the internally allocated buffer on return, and the result refers to a
165
  // region in aligned_buf.
166
  IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result,
167
                char* scratch, AlignedBuf* aligned_buf,
168
                IODebugContext* dbg = nullptr) const;
169
170
  // REQUIRES:
171
  // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing.
172
  // In non-direct IO mode, aligned_buf should be null;
173
  // In direct IO mode, aligned_buf stores the aligned buffer allocated inside
174
  // MultiRead, the result Slices in reqs refer to aligned_buf.
175
  IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs,
176
                     size_t num_reqs, AlignedBuf* aligned_buf,
177
                     IODebugContext* dbg = nullptr) const;
178
179
  IOStatus Prefetch(const IOOptions& opts, uint64_t offset, size_t n,
180
88.6k
                    IODebugContext* dbg = nullptr) const {
181
88.6k
    return file_->Prefetch(offset, n, opts, dbg);
182
88.6k
  }
183
184
78.0k
  FSRandomAccessFile* file() { return file_.get(); }
185
186
254k
  const std::string& file_name() const { return file_name_; }
187
188
748k
  bool use_direct_io() const { return file_->use_direct_io(); }
189
190
  IOStatus PrepareIOOptions(const ReadOptions& ro, IOOptions& opts,
191
                            IODebugContext* dbg = nullptr) const;
192
193
  IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
194
                     std::function<void(FSReadRequest&, void*)> cb,
195
                     void* cb_arg, void** io_handle, IOHandleDeleter* del_fn,
196
                     AlignedBuf* aligned_buf, IODebugContext* dbg = nullptr);
197
198
  void ReadAsyncCallback(FSReadRequest& req, void* cb_arg);
199
};
200
}  // namespace ROCKSDB_NAMESPACE