/src/rocksdb/file/sequence_file_reader.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "file/sequence_file_reader.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <mutex> |
14 | | |
15 | | #include "file/read_write_util.h" |
16 | | #include "monitoring/histogram.h" |
17 | | #include "monitoring/iostats_context_imp.h" |
18 | | #include "port/port.h" |
19 | | #include "rocksdb/file_system.h" |
20 | | #include "test_util/sync_point.h" |
21 | | #include "util/aligned_buffer.h" |
22 | | #include "util/random.h" |
23 | | #include "util/rate_limiter_impl.h" |
24 | | |
25 | | namespace ROCKSDB_NAMESPACE { |
26 | | IOStatus SequentialFileReader::Create( |
27 | | const std::shared_ptr<FileSystem>& fs, const std::string& fname, |
28 | | const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader, |
29 | 0 | IODebugContext* dbg, RateLimiter* rate_limiter) { |
30 | 0 | std::unique_ptr<FSSequentialFile> file; |
31 | 0 | IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg); |
32 | 0 | if (io_s.ok()) { |
33 | 0 | reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {}, |
34 | 0 | rate_limiter)); |
35 | 0 | } |
36 | 0 | return io_s; |
37 | 0 | } |
38 | | |
39 | | IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch, |
40 | 41.4k | Env::IOPriority rate_limiter_priority) { |
41 | 41.4k | IOStatus io_s; |
42 | 41.4k | IOOptions io_opts; |
43 | 41.4k | io_opts.rate_limiter_priority = rate_limiter_priority; |
44 | 41.4k | io_opts.verify_and_reconstruct_read = verify_and_reconstruct_read_; |
45 | 41.4k | if (use_direct_io()) { |
46 | | // |
47 | | // |-offset_advance-|---bytes returned--| |
48 | | // |----------------------buf size-------------------------| |
49 | | // | | | | |
50 | | // aligned offset offset + n Roundup(offset + n, |
51 | | // offset alignment) |
52 | | // |
53 | 0 | size_t offset = offset_.fetch_add(n); |
54 | 0 | size_t alignment = file_->GetRequiredBufferAlignment(); |
55 | 0 | size_t aligned_offset = TruncateToPageBoundary(alignment, offset); |
56 | 0 | size_t offset_advance = offset - aligned_offset; |
57 | 0 | size_t size = Roundup(offset + n, alignment) - aligned_offset; |
58 | 0 | size_t r = 0; |
59 | 0 | AlignedBuffer buf; |
60 | 0 | buf.Alignment(alignment); |
61 | 0 | buf.AllocateNewBuffer(size); |
62 | |
|
63 | 0 | while (buf.CurrentSize() < size) { |
64 | 0 | size_t allowed; |
65 | 0 | if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { |
66 | 0 | allowed = rate_limiter_->RequestToken( |
67 | 0 | buf.Capacity() - buf.CurrentSize(), buf.Alignment(), |
68 | 0 | rate_limiter_priority, nullptr /* stats */, |
69 | 0 | RateLimiter::OpType::kRead); |
70 | 0 | } else { |
71 | 0 | assert(buf.CurrentSize() == 0); |
72 | 0 | allowed = size; |
73 | 0 | } |
74 | |
|
75 | 0 | Slice tmp; |
76 | 0 | uint64_t orig_offset = 0; |
77 | 0 | FileOperationInfo::StartTimePoint start_ts; |
78 | 0 | if (ShouldNotifyListeners()) { |
79 | 0 | orig_offset = aligned_offset + buf.CurrentSize(); |
80 | 0 | start_ts = FileOperationInfo::StartNow(); |
81 | 0 | } |
82 | 0 | io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed, |
83 | 0 | io_opts, &tmp, buf.Destination(), |
84 | 0 | nullptr /* dbg */); |
85 | 0 | if (ShouldNotifyListeners()) { |
86 | 0 | auto finish_ts = FileOperationInfo::FinishNow(); |
87 | 0 | NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, |
88 | 0 | io_s); |
89 | 0 | } |
90 | 0 | buf.Size(buf.CurrentSize() + tmp.size()); |
91 | 0 | if (!io_s.ok() || tmp.size() < allowed) { |
92 | 0 | break; |
93 | 0 | } |
94 | 0 | } |
95 | |
|
96 | 0 | if (io_s.ok() && offset_advance < buf.CurrentSize()) { |
97 | 0 | r = buf.Read(scratch, offset_advance, |
98 | 0 | std::min(buf.CurrentSize() - offset_advance, n)); |
99 | 0 | } |
100 | 0 | *result = Slice(scratch, r); |
101 | 41.4k | } else { |
102 | | // To be paranoid, modify scratch a little bit, so in case underlying |
103 | | // FileSystem doesn't fill the buffer but return success and `scratch` |
104 | | // returns contains a previous block, returned value will not pass |
105 | | // checksum. |
106 | | // It's hard to find useful byte for direct I/O case, so we skip it. |
107 | 41.4k | if (n > 0 && scratch != nullptr) { |
108 | 41.4k | scratch[0]++; |
109 | 41.4k | } |
110 | | |
111 | 41.4k | size_t read = 0; |
112 | 54.1k | while (read < n) { |
113 | 41.4k | size_t allowed; |
114 | 41.4k | if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { |
115 | 0 | allowed = rate_limiter_->RequestToken( |
116 | 0 | n - read, 0 /* alignment */, rate_limiter_priority, |
117 | 0 | nullptr /* stats */, RateLimiter::OpType::kRead); |
118 | 41.4k | } else { |
119 | 41.4k | allowed = n; |
120 | 41.4k | } |
121 | 41.4k | FileOperationInfo::StartTimePoint start_ts; |
122 | 41.4k | if (ShouldNotifyListeners()) { |
123 | 0 | start_ts = FileOperationInfo::StartNow(); |
124 | 0 | } |
125 | 41.4k | Slice tmp; |
126 | 41.4k | io_s = file_->Read(allowed, io_opts, &tmp, scratch + read, |
127 | 41.4k | nullptr /* dbg */); |
128 | 41.4k | if (ShouldNotifyListeners()) { |
129 | 0 | auto finish_ts = FileOperationInfo::FinishNow(); |
130 | 0 | size_t offset = offset_.fetch_add(tmp.size()); |
131 | 0 | NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s); |
132 | 0 | } |
133 | 41.4k | read += tmp.size(); |
134 | 41.4k | if (!io_s.ok() || tmp.size() < allowed) { |
135 | 28.7k | break; |
136 | 28.7k | } |
137 | 41.4k | } |
138 | 41.4k | *result = Slice(scratch, read); |
139 | 41.4k | } |
140 | 41.4k | IOSTATS_ADD(bytes_read, result->size()); |
141 | 41.4k | return io_s; |
142 | 41.4k | } |
143 | | |
144 | 0 | IOStatus SequentialFileReader::Skip(uint64_t n) { |
145 | 0 | if (use_direct_io()) { |
146 | 0 | offset_ += static_cast<size_t>(n); |
147 | 0 | return IOStatus::OK(); |
148 | 0 | } |
149 | 0 | return file_->Skip(n); |
150 | 0 | } |
151 | | |
152 | | namespace { |
153 | | // This class wraps a SequentialFile, exposing same API, with the differenece |
154 | | // of being able to prefetch up to readahead_size bytes and then serve them |
155 | | // from memory, avoiding the entire round-trip if, for example, the data for the |
156 | | // file is actually remote. |
157 | | class ReadaheadSequentialFile : public FSSequentialFile { |
158 | | public: |
159 | | ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file, |
160 | | size_t readahead_size) |
161 | | : file_(std::move(file)), |
162 | | alignment_(file_->GetRequiredBufferAlignment()), |
163 | | readahead_size_(Roundup(readahead_size, alignment_)), |
164 | | buffer_(), |
165 | | buffer_offset_(0), |
166 | 11.8k | read_offset_(0) { |
167 | 11.8k | buffer_.Alignment(alignment_); |
168 | 11.8k | buffer_.AllocateNewBuffer(readahead_size_); |
169 | 11.8k | } |
170 | | |
171 | | ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete; |
172 | | |
173 | | ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete; |
174 | | |
175 | | IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch, |
176 | 12.5k | IODebugContext* dbg) override { |
177 | 12.5k | std::unique_lock<std::mutex> lk(lock_); |
178 | | |
179 | 12.5k | size_t cached_len = 0; |
180 | | // Check if there is a cache hit, meaning that [offset, offset + n) is |
181 | | // either completely or partially in the buffer. If it's completely cached, |
182 | | // including end of file case when offset + n is greater than EOF, then |
183 | | // return. |
184 | 12.5k | if (TryReadFromCache(n, &cached_len, scratch) && |
185 | 12.5k | (cached_len == n || buffer_.CurrentSize() < readahead_size_)) { |
186 | | // We read exactly what we needed, or we hit end of file - return. |
187 | 736 | *result = Slice(scratch, cached_len); |
188 | 736 | return IOStatus::OK(); |
189 | 736 | } |
190 | 11.8k | n -= cached_len; |
191 | | |
192 | 11.8k | IOStatus s; |
193 | | // Read-ahead only make sense if we have some slack left after reading |
194 | 11.8k | if (n + alignment_ >= readahead_size_) { |
195 | 0 | s = file_->Read(n, opts, result, scratch + cached_len, dbg); |
196 | 0 | if (s.ok()) { |
197 | 0 | read_offset_ += result->size(); |
198 | 0 | *result = Slice(scratch, cached_len + result->size()); |
199 | 0 | } |
200 | 0 | buffer_.Clear(); |
201 | 0 | return s; |
202 | 0 | } |
203 | | |
204 | 11.8k | s = ReadIntoBuffer(readahead_size_, opts, dbg); |
205 | 11.8k | if (s.ok()) { |
206 | | // The data we need is now in cache, so we can safely read it |
207 | 11.8k | size_t remaining_len; |
208 | 11.8k | TryReadFromCache(n, &remaining_len, scratch + cached_len); |
209 | 11.8k | *result = Slice(scratch, cached_len + remaining_len); |
210 | 11.8k | } |
211 | 11.8k | return s; |
212 | 11.8k | } |
213 | | |
214 | 0 | IOStatus Skip(uint64_t n) override { |
215 | 0 | std::unique_lock<std::mutex> lk(lock_); |
216 | 0 | IOStatus s = IOStatus::OK(); |
217 | | // First check if we need to skip already cached data |
218 | 0 | if (buffer_.CurrentSize() > 0) { |
219 | | // Do we need to skip beyond cached data? |
220 | 0 | if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) { |
221 | | // Yes. Skip whaterver is in memory and adjust offset accordingly |
222 | 0 | n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_; |
223 | 0 | read_offset_ = buffer_offset_ + buffer_.CurrentSize(); |
224 | 0 | } else { |
225 | | // No. The entire section to be skipped is entirely i cache. |
226 | 0 | read_offset_ += n; |
227 | 0 | n = 0; |
228 | 0 | } |
229 | 0 | } |
230 | 0 | if (n > 0) { |
231 | | // We still need to skip more, so call the file API for skipping |
232 | 0 | s = file_->Skip(n); |
233 | 0 | if (s.ok()) { |
234 | 0 | read_offset_ += n; |
235 | 0 | } |
236 | 0 | buffer_.Clear(); |
237 | 0 | } |
238 | 0 | return s; |
239 | 0 | } |
240 | | |
241 | | IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts, |
242 | | Slice* result, char* scratch, |
243 | 0 | IODebugContext* dbg) override { |
244 | 0 | return file_->PositionedRead(offset, n, opts, result, scratch, dbg); |
245 | 0 | } |
246 | | |
247 | 0 | IOStatus InvalidateCache(size_t offset, size_t length) override { |
248 | 0 | std::unique_lock<std::mutex> lk(lock_); |
249 | 0 | buffer_.Clear(); |
250 | 0 | return file_->InvalidateCache(offset, length); |
251 | 0 | } |
252 | | |
253 | 12.5k | bool use_direct_io() const override { return file_->use_direct_io(); } |
254 | | |
255 | | private: |
256 | | // Tries to read from buffer_ n bytes. If anything was read from the cache, it |
257 | | // sets cached_len to the number of bytes actually read, copies these number |
258 | | // of bytes to scratch and returns true. |
259 | | // If nothing was read sets cached_len to 0 and returns false. |
260 | 24.3k | bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) { |
261 | 24.3k | if (read_offset_ < buffer_offset_ || |
262 | 24.3k | read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) { |
263 | 11.8k | *cached_len = 0; |
264 | 11.8k | return false; |
265 | 11.8k | } |
266 | 12.5k | uint64_t offset_in_buffer = read_offset_ - buffer_offset_; |
267 | 12.5k | *cached_len = std::min( |
268 | 12.5k | buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n); |
269 | 12.5k | memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len); |
270 | 12.5k | read_offset_ += *cached_len; |
271 | 12.5k | return true; |
272 | 24.3k | } |
273 | | |
274 | | // Reads into buffer_ the next n bytes from file_. |
275 | | // Can actually read less if EOF was reached. |
276 | | // Returns the status of the read operastion on the file. |
277 | | IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts, |
278 | 11.8k | IODebugContext* dbg) { |
279 | 11.8k | if (n > buffer_.Capacity()) { |
280 | 0 | n = buffer_.Capacity(); |
281 | 0 | } |
282 | 11.8k | assert(IsFileSectorAligned(n, alignment_)); |
283 | 11.8k | Slice result; |
284 | 11.8k | IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg); |
285 | 11.8k | if (s.ok()) { |
286 | 11.8k | buffer_offset_ = read_offset_; |
287 | 11.8k | buffer_.Size(result.size()); |
288 | 11.8k | assert(result.size() == 0 || buffer_.BufferStart() == result.data()); |
289 | 11.8k | } |
290 | 11.8k | return s; |
291 | 11.8k | } |
292 | | |
293 | | const std::unique_ptr<FSSequentialFile> file_; |
294 | | const size_t alignment_; |
295 | | const size_t readahead_size_; |
296 | | |
297 | | std::mutex lock_; |
298 | | // The buffer storing the prefetched data |
299 | | AlignedBuffer buffer_; |
300 | | // The offset in file_, corresponding to data stored in buffer_ |
301 | | uint64_t buffer_offset_; |
302 | | // The offset up to which data was read from file_. In fact, it can be larger |
303 | | // than the actual file size, since the file_->Skip(n) call doesn't return the |
304 | | // actual number of bytes that were skipped, which can be less than n. |
305 | | // This is not a problemm since read_offset_ is monotonically increasing and |
306 | | // its only use is to figure out if next piece of data should be read from |
307 | | // buffer_ or file_ directly. |
308 | | uint64_t read_offset_; |
309 | | }; |
310 | | } // namespace |
311 | | |
312 | | std::unique_ptr<FSSequentialFile> |
313 | | SequentialFileReader::NewReadaheadSequentialFile( |
314 | 28.7k | std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) { |
315 | 28.7k | if (file->GetRequiredBufferAlignment() >= readahead_size) { |
316 | | // Short-circuit and return the original file if readahead_size is |
317 | | // too small and hence doesn't make sense to be used for prefetching. |
318 | 16.9k | return std::move(file); |
319 | 16.9k | } |
320 | 11.8k | std::unique_ptr<FSSequentialFile> result( |
321 | 11.8k | new ReadaheadSequentialFile(std::move(file), readahead_size)); |
322 | 11.8k | return result; |
323 | 28.7k | } |
324 | | } // namespace ROCKSDB_NAMESPACE |