/src/rocksdb/file/random_access_file_reader.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #pragma once |
11 | | #include <atomic> |
12 | | #include <sstream> |
13 | | #include <string> |
14 | | |
15 | | #include "env/file_system_tracer.h" |
16 | | #include "port/port.h" |
17 | | #include "rocksdb/file_system.h" |
18 | | #include "rocksdb/listener.h" |
19 | | #include "rocksdb/options.h" |
20 | | #include "rocksdb/rate_limiter.h" |
21 | | #include "util/aligned_buffer.h" |
22 | | |
23 | | namespace ROCKSDB_NAMESPACE { |
24 | | class Statistics; |
25 | | class HistogramImpl; |
26 | | class SystemClock; |
27 | | |
28 | | using AlignedBuf = FSAllocationPtr; |
29 | | |
30 | | // Align the request r according to alignment and return the aligned result. |
31 | | FSReadRequest Align(const FSReadRequest& r, size_t alignment); |
32 | | |
33 | | // Try to merge src to dest if they have overlap. |
34 | | // |
35 | | // Each request represents an inclusive interval [offset, offset + len]. |
36 | | // If the intervals have overlap, update offset and len to represent the |
37 | | // merged interval, and return true. |
38 | | // Otherwise, do nothing and return false. |
39 | | bool TryMerge(FSReadRequest* dest, const FSReadRequest& src); |
40 | | |
41 | | // RandomAccessFileReader is a wrapper on top of FSRandomAccessFile. It is |
42 | | // responsible for: |
43 | | // - Handling Buffered and Direct reads appropriately. |
44 | | // - Rate limiting compaction reads. |
45 | | // - Notifying any interested listeners on the completion of a read. |
46 | | // - Updating IO stats. |
47 | | class RandomAccessFileReader { |
48 | | private: |
49 | | void NotifyOnFileReadFinish( |
50 | | uint64_t offset, size_t length, |
51 | | const FileOperationInfo::StartTimePoint& start_ts, |
52 | | const FileOperationInfo::FinishTimePoint& finish_ts, |
53 | 0 | const Status& status) const { |
54 | 0 | FileOperationInfo info(FileOperationType::kRead, file_name_, start_ts, |
55 | 0 | finish_ts, status, file_temperature_); |
56 | 0 | info.offset = offset; |
57 | 0 | info.length = length; |
58 | |
|
59 | 0 | for (auto& listener : listeners_) { |
60 | 0 | listener->OnFileReadFinish(info); |
61 | 0 | } |
62 | 0 | info.status.PermitUncheckedError(); |
63 | 0 | } |
64 | | |
65 | | void NotifyOnIOError(const IOStatus& io_status, FileOperationType operation, |
66 | | const std::string& file_path, size_t length, |
67 | 0 | uint64_t offset) const { |
68 | 0 | if (listeners_.empty()) { |
69 | 0 | return; |
70 | 0 | } |
71 | 0 | IOErrorInfo io_error_info(io_status, operation, file_path, length, offset); |
72 | |
|
73 | 0 | for (auto& listener : listeners_) { |
74 | 0 | listener->OnIOError(io_error_info); |
75 | 0 | } |
76 | 0 | io_status.PermitUncheckedError(); |
77 | 0 | } |
78 | | |
79 | 661k | bool ShouldNotifyListeners() const { return !listeners_.empty(); } |
80 | | |
81 | | FSRandomAccessFilePtr file_; |
82 | | std::string file_name_; |
83 | | SystemClock* clock_; |
84 | | Statistics* stats_; |
85 | | uint32_t hist_type_; |
86 | | HistogramImpl* file_read_hist_; |
87 | | RateLimiter* rate_limiter_; |
88 | | std::vector<std::shared_ptr<EventListener>> listeners_; |
89 | | const Temperature file_temperature_; |
90 | | const bool is_last_level_; |
91 | | |
92 | | struct ReadAsyncInfo { |
93 | | ReadAsyncInfo(std::function<void(FSReadRequest&, void*)> cb, void* cb_arg, |
94 | | uint64_t start_time) |
95 | 0 | : cb_(cb), |
96 | 0 | cb_arg_(cb_arg), |
97 | 0 | start_time_(start_time), |
98 | 0 | user_scratch_(nullptr), |
99 | 0 | user_aligned_buf_(nullptr), |
100 | 0 | user_offset_(0), |
101 | 0 | user_len_(0), |
102 | 0 | is_aligned_(false) {} |
103 | | |
104 | | std::function<void(FSReadRequest&, void*)> cb_; |
105 | | void* cb_arg_; |
106 | | uint64_t start_time_; |
107 | | FileOperationInfo::StartTimePoint fs_start_ts_; |
108 | | // Below fields stores the parameters passed by caller in case of direct_io. |
109 | | char* user_scratch_; |
110 | | AlignedBuf* user_aligned_buf_; |
111 | | uint64_t user_offset_; |
112 | | size_t user_len_; |
113 | | Slice user_result_; |
114 | | // Used in case of direct_io |
115 | | AlignedBuffer buf_; |
116 | | bool is_aligned_; |
117 | | }; |
118 | | |
119 | | public: |
120 | | explicit RandomAccessFileReader( |
121 | | std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name, |
122 | | SystemClock* clock = nullptr, |
123 | | const std::shared_ptr<IOTracer>& io_tracer = nullptr, |
124 | | Statistics* stats = nullptr, |
125 | | uint32_t hist_type = Histograms::HISTOGRAM_ENUM_MAX, |
126 | | HistogramImpl* file_read_hist = nullptr, |
127 | | RateLimiter* rate_limiter = nullptr, |
128 | | const std::vector<std::shared_ptr<EventListener>>& listeners = {}, |
129 | | Temperature file_temperature = Temperature::kUnknown, |
130 | | bool is_last_level = false) |
131 | 78.0k | : file_(std::move(raf), io_tracer, _file_name), |
132 | 78.0k | file_name_(std::move(_file_name)), |
133 | 78.0k | clock_(clock), |
134 | 78.0k | stats_(stats), |
135 | 78.0k | hist_type_(hist_type), |
136 | 78.0k | file_read_hist_(file_read_hist), |
137 | 78.0k | rate_limiter_(rate_limiter), |
138 | 78.0k | listeners_(), |
139 | 78.0k | file_temperature_(file_temperature), |
140 | 78.0k | is_last_level_(is_last_level) { |
141 | 78.0k | std::for_each(listeners.begin(), listeners.end(), |
142 | 78.0k | [this](const std::shared_ptr<EventListener>& e) { |
143 | 0 | if (e->ShouldBeNotifiedOnFileIO()) { |
144 | 0 | listeners_.emplace_back(e); |
145 | 0 | } |
146 | 0 | }); |
147 | 78.0k | } |
148 | | |
149 | | static IOStatus Create(const std::shared_ptr<FileSystem>& fs, |
150 | | const std::string& fname, const FileOptions& file_opts, |
151 | | std::unique_ptr<RandomAccessFileReader>* reader, |
152 | | IODebugContext* dbg); |
153 | | RandomAccessFileReader(const RandomAccessFileReader&) = delete; |
154 | | RandomAccessFileReader& operator=(const RandomAccessFileReader&) = delete; |
155 | | |
156 | | // In non-direct IO mode, |
157 | | // 1. if using mmap, result is stored in a buffer other than scratch; |
158 | | // 2. if not using mmap, result is stored in the buffer starting from scratch. |
159 | | // |
160 | | // In direct IO mode, an aligned buffer is allocated internally. |
161 | | // 1. If aligned_buf is null, then results are copied to the buffer |
162 | | // starting from scratch; |
163 | | // 2. Otherwise, scratch is not used and can be null, the aligned_buf owns |
164 | | // the internally allocated buffer on return, and the result refers to a |
165 | | // region in aligned_buf. |
166 | | IOStatus Read(const IOOptions& opts, uint64_t offset, size_t n, Slice* result, |
167 | | char* scratch, AlignedBuf* aligned_buf, |
168 | | IODebugContext* dbg = nullptr) const; |
169 | | |
170 | | // REQUIRES: |
171 | | // num_reqs > 0, reqs do not overlap, and offsets in reqs are increasing. |
172 | | // In non-direct IO mode, aligned_buf should be null; |
173 | | // In direct IO mode, aligned_buf stores the aligned buffer allocated inside |
174 | | // MultiRead, the result Slices in reqs refer to aligned_buf. |
175 | | IOStatus MultiRead(const IOOptions& opts, FSReadRequest* reqs, |
176 | | size_t num_reqs, AlignedBuf* aligned_buf, |
177 | | IODebugContext* dbg = nullptr) const; |
178 | | |
179 | | IOStatus Prefetch(const IOOptions& opts, uint64_t offset, size_t n, |
180 | 88.6k | IODebugContext* dbg = nullptr) const { |
181 | 88.6k | return file_->Prefetch(offset, n, opts, dbg); |
182 | 88.6k | } |
183 | | |
184 | 78.0k | FSRandomAccessFile* file() { return file_.get(); } |
185 | | |
186 | 254k | const std::string& file_name() const { return file_name_; } |
187 | | |
188 | 748k | bool use_direct_io() const { return file_->use_direct_io(); } |
189 | | |
190 | | IOStatus PrepareIOOptions(const ReadOptions& ro, IOOptions& opts, |
191 | | IODebugContext* dbg = nullptr) const; |
192 | | |
193 | | IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts, |
194 | | std::function<void(FSReadRequest&, void*)> cb, |
195 | | void* cb_arg, void** io_handle, IOHandleDeleter* del_fn, |
196 | | AlignedBuf* aligned_buf, IODebugContext* dbg = nullptr); |
197 | | |
198 | | void ReadAsyncCallback(FSReadRequest& req, void* cb_arg); |
199 | | }; |
200 | | } // namespace ROCKSDB_NAMESPACE |