/src/rocksdb/file/random_access_file_reader.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "file/random_access_file_reader.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <mutex> |
14 | | |
15 | | #include "file/file_util.h" |
16 | | #include "monitoring/histogram.h" |
17 | | #include "monitoring/iostats_context_imp.h" |
18 | | #include "port/port.h" |
19 | | #include "table/format.h" |
20 | | #include "test_util/sync_point.h" |
21 | | #include "util/random.h" |
22 | | #include "util/rate_limiter_impl.h" |
23 | | |
24 | | namespace ROCKSDB_NAMESPACE { |
25 | | inline Histograms GetFileReadHistograms(Statistics* stats, |
26 | 464k | Env::IOActivity io_activity) { |
27 | 464k | switch (io_activity) { |
28 | 8.36k | case Env::IOActivity::kFlush: |
29 | 8.36k | return Histograms::FILE_READ_FLUSH_MICROS; |
30 | 19.6k | case Env::IOActivity::kCompaction: |
31 | 19.6k | return Histograms::FILE_READ_COMPACTION_MICROS; |
32 | 424k | case Env::IOActivity::kDBOpen: |
33 | 424k | return Histograms::FILE_READ_DB_OPEN_MICROS; |
34 | 11.5k | default: |
35 | 11.5k | break; |
36 | 464k | } |
37 | | |
38 | 11.5k | if (stats && stats->get_stats_level() > StatsLevel::kExceptDetailedTimers) { |
39 | 0 | switch (io_activity) { |
40 | 0 | case Env::IOActivity::kGet: |
41 | 0 | return Histograms::FILE_READ_GET_MICROS; |
42 | 0 | case Env::IOActivity::kMultiGet: |
43 | 0 | return Histograms::FILE_READ_MULTIGET_MICROS; |
44 | 0 | case Env::IOActivity::kDBIterator: |
45 | 0 | return Histograms::FILE_READ_DB_ITERATOR_MICROS; |
46 | 0 | case Env::IOActivity::kVerifyDBChecksum: |
47 | 0 | return Histograms::FILE_READ_VERIFY_DB_CHECKSUM_MICROS; |
48 | 0 | case Env::IOActivity::kVerifyFileChecksums: |
49 | 0 | return Histograms::FILE_READ_VERIFY_FILE_CHECKSUMS_MICROS; |
50 | 0 | default: |
51 | 0 | break; |
52 | 0 | } |
53 | 0 | } |
54 | 11.5k | return Histograms::HISTOGRAM_ENUM_MAX; |
55 | 11.5k | } |
56 | | inline void RecordIOStats(Statistics* stats, Temperature file_temperature, |
57 | 458k | bool is_last_level, size_t size) { |
58 | 458k | IOSTATS_ADD(bytes_read, size); |
59 | | // record for last/non-last level |
60 | 458k | if (is_last_level) { |
61 | 155k | RecordTick(stats, LAST_LEVEL_READ_BYTES, size); |
62 | 155k | RecordTick(stats, LAST_LEVEL_READ_COUNT, 1); |
63 | 302k | } else { |
64 | 302k | RecordTick(stats, NON_LAST_LEVEL_READ_BYTES, size); |
65 | 302k | RecordTick(stats, NON_LAST_LEVEL_READ_COUNT, 1); |
66 | 302k | } |
67 | | |
68 | | // record for temperature file |
69 | 458k | switch (file_temperature) { |
70 | 0 | case Temperature::kHot: |
71 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.hot_file_bytes_read, size); |
72 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.hot_file_read_count, 1); |
73 | 0 | RecordTick(stats, HOT_FILE_READ_BYTES, size); |
74 | 0 | RecordTick(stats, HOT_FILE_READ_COUNT, 1); |
75 | 0 | break; |
76 | 0 | case Temperature::kWarm: |
77 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.warm_file_bytes_read, size); |
78 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.warm_file_read_count, 1); |
79 | 0 | RecordTick(stats, WARM_FILE_READ_BYTES, size); |
80 | 0 | RecordTick(stats, WARM_FILE_READ_COUNT, 1); |
81 | 0 | break; |
82 | 0 | case Temperature::kCool: |
83 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.cool_file_bytes_read, size); |
84 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.cool_file_read_count, 1); |
85 | 0 | RecordTick(stats, COOL_FILE_READ_BYTES, size); |
86 | 0 | RecordTick(stats, COOL_FILE_READ_COUNT, 1); |
87 | 0 | break; |
88 | 0 | case Temperature::kCold: |
89 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.cold_file_bytes_read, size); |
90 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.cold_file_read_count, 1); |
91 | 0 | RecordTick(stats, COLD_FILE_READ_BYTES, size); |
92 | 0 | RecordTick(stats, COLD_FILE_READ_COUNT, 1); |
93 | 0 | break; |
94 | 0 | case Temperature::kIce: |
95 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.ice_file_bytes_read, size); |
96 | 0 | IOSTATS_ADD(file_io_stats_by_temperature.ice_file_read_count, 1); |
97 | 0 | RecordTick(stats, ICE_FILE_READ_BYTES, size); |
98 | 0 | RecordTick(stats, ICE_FILE_READ_COUNT, 1); |
99 | 0 | break; |
100 | 457k | case Temperature::kUnknown: |
101 | 457k | if (is_last_level) { |
102 | 155k | IOSTATS_ADD(file_io_stats_by_temperature.unknown_last_level_bytes_read, |
103 | 155k | size); |
104 | 155k | IOSTATS_ADD(file_io_stats_by_temperature.unknown_last_level_read_count, |
105 | 155k | 1); |
106 | 302k | } else { |
107 | 302k | IOSTATS_ADD( |
108 | 302k | file_io_stats_by_temperature.unknown_non_last_level_bytes_read, |
109 | 302k | size); |
110 | 302k | IOSTATS_ADD( |
111 | 302k | file_io_stats_by_temperature.unknown_non_last_level_read_count, 1); |
112 | 302k | } |
113 | 457k | break; |
114 | 0 | default: |
115 | 0 | break; |
116 | 458k | } |
117 | 458k | } |
118 | | |
119 | | IOStatus RandomAccessFileReader::Create( |
120 | | const std::shared_ptr<FileSystem>& fs, const std::string& fname, |
121 | | const FileOptions& file_opts, |
122 | 0 | std::unique_ptr<RandomAccessFileReader>* reader, IODebugContext* dbg) { |
123 | 0 | std::unique_ptr<FSRandomAccessFile> file; |
124 | 0 | IOStatus io_s = fs->NewRandomAccessFile(fname, file_opts, &file, dbg); |
125 | 0 | if (io_s.ok()) { |
126 | 0 | reader->reset(new RandomAccessFileReader(std::move(file), fname)); |
127 | 0 | } |
128 | 0 | return io_s; |
129 | 0 | } |
130 | | |
131 | | IOStatus RandomAccessFileReader::Read(const IOOptions& opts, uint64_t offset, |
132 | | size_t n, Slice* result, char* scratch, |
133 | | AlignedBuf* aligned_buf, |
134 | 464k | IODebugContext* dbg) const { |
135 | 464k | (void)aligned_buf; |
136 | 464k | const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority; |
137 | | |
138 | 464k | TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read", nullptr); |
139 | 464k | TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read:IODebugContext", |
140 | 464k | const_cast<void*>(static_cast<void*>(dbg))); |
141 | | |
142 | | // To be paranoid: modify scratch a little bit, so in case underlying |
143 | | // FileSystem doesn't fill the buffer but return success and `scratch` returns |
144 | | // contains a previous block, returned value will not pass checksum. |
145 | 464k | if (n > 0 && scratch != nullptr) { |
146 | | // This byte might not change anything for direct I/O case, but it's OK. |
147 | 463k | scratch[0]++; |
148 | 463k | } |
149 | | |
150 | 464k | IOStatus io_s; |
151 | 464k | uint64_t elapsed = 0; |
152 | 464k | size_t alignment = file_->GetRequiredBufferAlignment(); |
153 | 464k | bool is_aligned = false; |
154 | 464k | if (scratch != nullptr) { |
155 | | // Check if offset, length and buffer are aligned. |
156 | 464k | is_aligned = (offset & (alignment - 1)) == 0 && |
157 | 22.0k | (n & (alignment - 1)) == 0 && |
158 | 16 | (uintptr_t(scratch) & (alignment - 1)) == 0; |
159 | 464k | } |
160 | | |
161 | 464k | { |
162 | 464k | StopWatch sw(clock_, stats_, hist_type_, |
163 | 464k | GetFileReadHistograms(stats_, opts.io_activity), |
164 | 464k | (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, |
165 | 464k | true /*delay_enabled*/); |
166 | 464k | auto prev_perf_level = GetPerfLevel(); |
167 | 464k | IOSTATS_TIMER_GUARD(read_nanos); |
168 | 464k | if (use_direct_io() && is_aligned == false) { |
169 | 0 | size_t aligned_offset = |
170 | 0 | TruncateToPageBoundary(alignment, static_cast<size_t>(offset)); |
171 | 0 | size_t offset_advance = static_cast<size_t>(offset) - aligned_offset; |
172 | 0 | size_t read_size = |
173 | 0 | Roundup(static_cast<size_t>(offset + n), alignment) - aligned_offset; |
174 | 0 | AlignedBuffer buf; |
175 | 0 | buf.Alignment(alignment); |
176 | 0 | buf.AllocateNewBuffer(read_size); |
177 | 0 | while (buf.CurrentSize() < read_size) { |
178 | 0 | size_t allowed; |
179 | 0 | if (rate_limiter_priority != Env::IO_TOTAL && |
180 | 0 | rate_limiter_ != nullptr) { |
181 | 0 | allowed = rate_limiter_->RequestToken( |
182 | 0 | buf.Capacity() - buf.CurrentSize(), buf.Alignment(), |
183 | 0 | rate_limiter_priority, stats_, RateLimiter::OpType::kRead); |
184 | 0 | } else { |
185 | 0 | assert(buf.CurrentSize() == 0); |
186 | 0 | allowed = read_size; |
187 | 0 | } |
188 | 0 | Slice tmp; |
189 | |
|
190 | 0 | FileOperationInfo::StartTimePoint start_ts; |
191 | 0 | uint64_t orig_offset = 0; |
192 | 0 | if (ShouldNotifyListeners()) { |
193 | 0 | start_ts = FileOperationInfo::StartNow(); |
194 | 0 | orig_offset = aligned_offset + buf.CurrentSize(); |
195 | 0 | } |
196 | |
|
197 | 0 | { |
198 | 0 | IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); |
199 | | // Only user reads are expected to specify a timeout. And user reads |
200 | | // are not subjected to rate_limiter and should go through only |
201 | | // one iteration of this loop, so we don't need to check and adjust |
202 | | // the opts.timeout before calling file_->Read |
203 | 0 | assert(!opts.timeout.count() || allowed == read_size); |
204 | 0 | io_s = file_->Read(aligned_offset + buf.CurrentSize(), allowed, opts, |
205 | 0 | &tmp, buf.Destination(), dbg); |
206 | 0 | } |
207 | 0 | if (ShouldNotifyListeners()) { |
208 | 0 | auto finish_ts = FileOperationInfo::FinishNow(); |
209 | 0 | NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts, |
210 | 0 | io_s); |
211 | 0 | if (!io_s.ok()) { |
212 | 0 | NotifyOnIOError(io_s, FileOperationType::kRead, file_name(), |
213 | 0 | tmp.size(), orig_offset); |
214 | 0 | } |
215 | 0 | } |
216 | |
|
217 | 0 | buf.Size(buf.CurrentSize() + tmp.size()); |
218 | 0 | if (!io_s.ok() || tmp.size() < allowed) { |
219 | 0 | break; |
220 | 0 | } |
221 | 0 | } |
222 | 0 | size_t res_len = 0; |
223 | 0 | if (io_s.ok() && offset_advance < buf.CurrentSize()) { |
224 | 0 | res_len = std::min(buf.CurrentSize() - offset_advance, n); |
225 | 0 | if (aligned_buf == nullptr) { |
226 | 0 | buf.Read(scratch, offset_advance, res_len); |
227 | 0 | } else { |
228 | 0 | scratch = buf.BufferStart() + offset_advance; |
229 | 0 | *aligned_buf = buf.Release(); |
230 | 0 | } |
231 | 0 | } |
232 | 0 | *result = Slice(scratch, res_len); |
233 | 464k | } else { |
234 | 464k | size_t pos = 0; |
235 | 464k | const char* res_scratch = nullptr; |
236 | 929k | while (pos < n) { |
237 | 464k | size_t allowed; |
238 | 464k | if (rate_limiter_priority != Env::IO_TOTAL && |
239 | 25.2k | rate_limiter_ != nullptr) { |
240 | 0 | if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { |
241 | 0 | sw.DelayStart(); |
242 | 0 | } |
243 | 0 | allowed = rate_limiter_->RequestToken( |
244 | 0 | n - pos, (use_direct_io() ? alignment : 0), rate_limiter_priority, |
245 | 0 | stats_, RateLimiter::OpType::kRead); |
246 | 0 | if (rate_limiter_->IsRateLimited(RateLimiter::OpType::kRead)) { |
247 | 0 | sw.DelayStop(); |
248 | 0 | } |
249 | 464k | } else { |
250 | 464k | allowed = n; |
251 | 464k | } |
252 | 464k | Slice tmp_result; |
253 | | |
254 | 464k | FileOperationInfo::StartTimePoint start_ts; |
255 | 464k | if (ShouldNotifyListeners()) { |
256 | 0 | start_ts = FileOperationInfo::StartNow(); |
257 | 0 | } |
258 | | |
259 | 464k | { |
260 | 464k | IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); |
261 | | // Only user reads are expected to specify a timeout. And user reads |
262 | | // are not subjected to rate_limiter and should go through only |
263 | | // one iteration of this loop, so we don't need to check and adjust |
264 | | // the opts.timeout before calling file_->Read |
265 | 464k | assert(!opts.timeout.count() || allowed == n); |
266 | 464k | io_s = file_->Read(offset + pos, allowed, opts, &tmp_result, |
267 | 464k | scratch + pos, dbg); |
268 | 464k | } |
269 | 464k | if (ShouldNotifyListeners()) { |
270 | 0 | auto finish_ts = FileOperationInfo::FinishNow(); |
271 | 0 | NotifyOnFileReadFinish(offset + pos, tmp_result.size(), start_ts, |
272 | 0 | finish_ts, io_s); |
273 | |
|
274 | 0 | if (!io_s.ok()) { |
275 | 0 | NotifyOnIOError(io_s, FileOperationType::kRead, file_name(), |
276 | 0 | tmp_result.size(), offset + pos); |
277 | 0 | } |
278 | 0 | } |
279 | 464k | if (res_scratch == nullptr) { |
280 | | // we can't simply use `scratch` because reads of mmap'd files return |
281 | | // data in a different buffer. |
282 | 464k | res_scratch = tmp_result.data(); |
283 | 18.4E | } else { |
284 | | // make sure chunks are inserted contiguously into `res_scratch`. |
285 | 18.4E | assert(tmp_result.data() == res_scratch + pos); |
286 | 18.4E | } |
287 | 464k | pos += tmp_result.size(); |
288 | 464k | if (!io_s.ok() || tmp_result.size() < allowed) { |
289 | 0 | break; |
290 | 0 | } |
291 | 464k | } |
292 | 464k | *result = Slice(res_scratch, io_s.ok() ? pos : 0); |
293 | 464k | } |
294 | 464k | RecordIOStats(stats_, file_temperature_, is_last_level_, result->size()); |
295 | 464k | SetPerfLevel(prev_perf_level); |
296 | 464k | } |
297 | 464k | if (stats_ != nullptr && file_read_hist_ != nullptr) { |
298 | 0 | file_read_hist_->Add(elapsed); |
299 | 0 | } |
300 | | |
301 | | #ifndef NDEBUG |
302 | | auto pair = std::make_pair(&file_name_, &io_s); |
303 | | if (offset == 0) { |
304 | | TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::BeforeReturn", |
305 | | &pair); |
306 | | } |
307 | | TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::Read::AnyOffset", &pair); |
308 | | #endif |
309 | 464k | return io_s; |
310 | 464k | } |
311 | | |
312 | 0 | size_t End(const FSReadRequest& r) { |
313 | 0 | return static_cast<size_t>(r.offset) + r.len; |
314 | 0 | } |
315 | | |
316 | 0 | FSReadRequest Align(const FSReadRequest& r, size_t alignment) { |
317 | 0 | FSReadRequest req; |
318 | 0 | req.offset = static_cast<uint64_t>( |
319 | 0 | TruncateToPageBoundary(alignment, static_cast<size_t>(r.offset))); |
320 | 0 | req.len = Roundup(End(r), alignment) - req.offset; |
321 | 0 | req.scratch = nullptr; |
322 | 0 | return req; |
323 | 0 | } |
324 | | |
325 | 0 | bool TryMerge(FSReadRequest* dest, const FSReadRequest& src) { |
326 | 0 | size_t dest_offset = static_cast<size_t>(dest->offset); |
327 | 0 | size_t src_offset = static_cast<size_t>(src.offset); |
328 | 0 | size_t dest_end = End(*dest); |
329 | 0 | size_t src_end = End(src); |
330 | 0 | if (std::max(dest_offset, src_offset) > std::min(dest_end, src_end)) { |
331 | 0 | return false; |
332 | 0 | } |
333 | 0 | dest->offset = static_cast<uint64_t>(std::min(dest_offset, src_offset)); |
334 | 0 | dest->len = std::max(dest_end, src_end) - dest->offset; |
335 | 0 | return true; |
336 | 0 | } |
337 | | |
338 | | IOStatus RandomAccessFileReader::MultiRead(const IOOptions& opts, |
339 | | FSReadRequest* read_reqs, |
340 | | size_t num_reqs, |
341 | | AlignedBuf* aligned_buf, |
342 | 0 | IODebugContext* dbg) const { |
343 | 0 | (void)aligned_buf; // suppress warning of unused variable in LITE mode |
344 | 0 | assert(num_reqs > 0); |
345 | |
|
346 | | #ifndef NDEBUG |
347 | | for (size_t i = 0; i < num_reqs - 1; ++i) { |
348 | | assert(read_reqs[i].offset <= read_reqs[i + 1].offset); |
349 | | } |
350 | | #endif // !NDEBUG |
351 | 0 | const Env::IOPriority rate_limiter_priority = opts.rate_limiter_priority; |
352 | | |
353 | | // To be paranoid modify scratch a little bit, so in case underlying |
354 | | // FileSystem doesn't fill the buffer but return success and `scratch` returns |
355 | | // contains a previous block, returned value will not pass checksum. |
356 | | // This byte might not change anything for direct I/O case, but it's OK. |
357 | 0 | for (size_t i = 0; i < num_reqs; i++) { |
358 | 0 | FSReadRequest& r = read_reqs[i]; |
359 | 0 | if (r.len > 0 && r.scratch != nullptr) { |
360 | 0 | r.scratch[0]++; |
361 | 0 | } |
362 | 0 | } |
363 | |
|
364 | 0 | IOStatus io_s; |
365 | 0 | uint64_t elapsed = 0; |
366 | 0 | { |
367 | 0 | StopWatch sw(clock_, stats_, hist_type_, |
368 | 0 | GetFileReadHistograms(stats_, opts.io_activity), |
369 | 0 | (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, |
370 | 0 | true /*delay_enabled*/); |
371 | 0 | auto prev_perf_level = GetPerfLevel(); |
372 | 0 | IOSTATS_TIMER_GUARD(read_nanos); |
373 | |
|
374 | 0 | FSReadRequest* fs_reqs = read_reqs; |
375 | 0 | size_t num_fs_reqs = num_reqs; |
376 | 0 | std::vector<FSReadRequest> aligned_reqs; |
377 | 0 | if (use_direct_io()) { |
378 | | // num_reqs is the max possible size, |
379 | | // this can reduce std::vecector's internal resize operations. |
380 | 0 | aligned_reqs.reserve(num_reqs); |
381 | | // Align and merge the read requests. |
382 | 0 | size_t alignment = file_->GetRequiredBufferAlignment(); |
383 | 0 | for (size_t i = 0; i < num_reqs; i++) { |
384 | 0 | FSReadRequest r = Align(read_reqs[i], alignment); |
385 | 0 | if (i == 0) { |
386 | | // head |
387 | 0 | aligned_reqs.push_back(std::move(r)); |
388 | |
|
389 | 0 | } else if (!TryMerge(&aligned_reqs.back(), r)) { |
390 | | // head + n |
391 | 0 | aligned_reqs.push_back(std::move(r)); |
392 | |
|
393 | 0 | } else { |
394 | | // unused |
395 | 0 | r.status.PermitUncheckedError(); |
396 | 0 | } |
397 | 0 | } |
398 | 0 | TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::MultiRead:AlignedReqs", |
399 | 0 | &aligned_reqs); |
400 | | |
401 | | // Allocate aligned buffer and let scratch buffers point to it. |
402 | 0 | size_t total_len = 0; |
403 | 0 | for (const auto& r : aligned_reqs) { |
404 | 0 | total_len += r.len; |
405 | 0 | } |
406 | 0 | AlignedBuffer buf; |
407 | 0 | buf.Alignment(alignment); |
408 | 0 | buf.AllocateNewBuffer(total_len); |
409 | 0 | char* scratch = buf.BufferStart(); |
410 | 0 | for (auto& r : aligned_reqs) { |
411 | 0 | r.scratch = scratch; |
412 | 0 | scratch += r.len; |
413 | 0 | } |
414 | |
|
415 | 0 | *aligned_buf = buf.Release(); |
416 | 0 | fs_reqs = aligned_reqs.data(); |
417 | 0 | num_fs_reqs = aligned_reqs.size(); |
418 | 0 | } |
419 | |
|
420 | 0 | FileOperationInfo::StartTimePoint start_ts; |
421 | 0 | if (ShouldNotifyListeners()) { |
422 | 0 | start_ts = FileOperationInfo::StartNow(); |
423 | 0 | } |
424 | |
|
425 | 0 | { |
426 | 0 | IOSTATS_CPU_TIMER_GUARD(cpu_read_nanos, clock_); |
427 | 0 | if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) { |
428 | | // TODO: ideally we should call `RateLimiter::RequestToken()` for |
429 | | // allowed bytes to multi-read and then consume those bytes by |
430 | | // satisfying as many requests in `MultiRead()` as possible, instead of |
431 | | // what we do here, which can cause burst when the |
432 | | // `total_multi_read_size` is big. |
433 | 0 | size_t total_multi_read_size = 0; |
434 | 0 | assert(fs_reqs != nullptr); |
435 | 0 | for (size_t i = 0; i < num_fs_reqs; ++i) { |
436 | 0 | FSReadRequest& req = fs_reqs[i]; |
437 | 0 | total_multi_read_size += req.len; |
438 | 0 | } |
439 | 0 | size_t remaining_bytes = total_multi_read_size; |
440 | 0 | size_t request_bytes = 0; |
441 | 0 | while (remaining_bytes > 0) { |
442 | 0 | request_bytes = std::min( |
443 | 0 | static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()), |
444 | 0 | remaining_bytes); |
445 | 0 | rate_limiter_->Request(request_bytes, rate_limiter_priority, |
446 | 0 | nullptr /* stats */, |
447 | 0 | RateLimiter::OpType::kRead); |
448 | 0 | remaining_bytes -= request_bytes; |
449 | 0 | } |
450 | 0 | } |
451 | 0 | TEST_SYNC_POINT_CALLBACK( |
452 | 0 | "RandomAccessFileReader::MultiRead:IODebugContext", |
453 | 0 | const_cast<void*>(static_cast<void*>(dbg))); |
454 | 0 | io_s = file_->MultiRead(fs_reqs, num_fs_reqs, opts, dbg); |
455 | 0 | RecordInHistogram(stats_, MULTIGET_IO_BATCH_SIZE, num_fs_reqs); |
456 | 0 | } |
457 | |
|
458 | 0 | if (use_direct_io()) { |
459 | | // Populate results in the unaligned read requests. |
460 | 0 | size_t aligned_i = 0; |
461 | 0 | for (size_t i = 0; i < num_reqs; i++) { |
462 | 0 | auto& r = read_reqs[i]; |
463 | 0 | if (static_cast<size_t>(r.offset) > End(aligned_reqs[aligned_i])) { |
464 | 0 | aligned_i++; |
465 | 0 | } |
466 | 0 | const auto& fs_r = fs_reqs[aligned_i]; |
467 | 0 | r.status = fs_r.status; |
468 | 0 | if (r.status.ok()) { |
469 | 0 | uint64_t offset = r.offset - fs_r.offset; |
470 | 0 | if (fs_r.result.size() <= offset) { |
471 | | // No byte in the read range is returned. |
472 | 0 | r.result = Slice(); |
473 | 0 | } else { |
474 | 0 | size_t len = std::min( |
475 | 0 | r.len, static_cast<size_t>(fs_r.result.size() - offset)); |
476 | 0 | r.result = Slice(fs_r.scratch + offset, len); |
477 | 0 | } |
478 | 0 | } else { |
479 | 0 | r.result = Slice(); |
480 | 0 | } |
481 | 0 | } |
482 | 0 | } |
483 | |
|
484 | 0 | for (size_t i = 0; i < num_reqs; ++i) { |
485 | 0 | if (ShouldNotifyListeners()) { |
486 | 0 | auto finish_ts = FileOperationInfo::FinishNow(); |
487 | 0 | NotifyOnFileReadFinish(read_reqs[i].offset, read_reqs[i].result.size(), |
488 | 0 | start_ts, finish_ts, read_reqs[i].status); |
489 | 0 | } |
490 | 0 | if (!read_reqs[i].status.ok()) { |
491 | 0 | NotifyOnIOError(read_reqs[i].status, FileOperationType::kRead, |
492 | 0 | file_name(), read_reqs[i].result.size(), |
493 | 0 | read_reqs[i].offset); |
494 | 0 | } |
495 | 0 | RecordIOStats(stats_, file_temperature_, is_last_level_, |
496 | 0 | read_reqs[i].result.size()); |
497 | 0 | } |
498 | 0 | SetPerfLevel(prev_perf_level); |
499 | 0 | } |
500 | 0 | if (stats_ != nullptr && file_read_hist_ != nullptr) { |
501 | 0 | file_read_hist_->Add(elapsed); |
502 | 0 | } |
503 | |
|
504 | 0 | return io_s; |
505 | 0 | } |
506 | | |
507 | | IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro, |
508 | | IOOptions& opts, |
509 | 915k | IODebugContext* dbg) const { |
510 | 915k | if (clock_ != nullptr) { |
511 | 915k | return PrepareIOFromReadOptions(ro, clock_, opts, dbg); |
512 | 915k | } else { |
513 | 195 | return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts, |
514 | 195 | dbg); |
515 | 195 | } |
516 | 915k | } |
517 | | |
518 | | // Notes for when direct_io is enabled: |
519 | | // Unless req.offset, req.len, req.scratch are all already aligned, |
520 | | // RandomAccessFileReader will creats aligned requests and aligned buffer for |
521 | | // the request. User should only provide either req.scratch or aligned_buf. If |
522 | | // only req.scratch is provided, result will be copied from allocated aligned |
523 | | // buffer to req.scratch. If only alignd_buf is provided, it will be set to |
524 | | // the ailgned buf allocated by RandomAccessFileReader and saves a copy. |
525 | | IOStatus RandomAccessFileReader::ReadAsync( |
526 | | FSReadRequest& req, const IOOptions& opts, |
527 | | std::function<void(FSReadRequest&, void*)> cb, void* cb_arg, |
528 | | void** io_handle, IOHandleDeleter* del_fn, AlignedBuf* aligned_buf, |
529 | 0 | IODebugContext* dbg) { |
530 | 0 | IOStatus s; |
531 | 0 | TEST_SYNC_POINT_CALLBACK("RandomAccessFileReader::ReadAsync:InjectStatus", |
532 | 0 | &s); |
533 | 0 | if (!s.ok()) { |
534 | 0 | return s; |
535 | 0 | } |
536 | | // Create a callback and populate info. |
537 | 0 | auto read_async_callback = |
538 | 0 | std::bind(&RandomAccessFileReader::ReadAsyncCallback, this, |
539 | 0 | std::placeholders::_1, std::placeholders::_2); |
540 | |
|
541 | 0 | ReadAsyncInfo* read_async_info = new ReadAsyncInfo( |
542 | 0 | cb, cb_arg, (clock_ != nullptr ? clock_->NowMicros() : 0)); |
543 | |
|
544 | 0 | if (ShouldNotifyListeners()) { |
545 | 0 | read_async_info->fs_start_ts_ = FileOperationInfo::StartNow(); |
546 | 0 | } |
547 | |
|
548 | 0 | size_t alignment = file_->GetRequiredBufferAlignment(); |
549 | 0 | bool is_aligned = (req.offset & (alignment - 1)) == 0 && |
550 | 0 | (req.len & (alignment - 1)) == 0 && |
551 | 0 | (uintptr_t(req.scratch) & (alignment - 1)) == 0; |
552 | 0 | read_async_info->is_aligned_ = is_aligned; |
553 | |
|
554 | 0 | uint64_t elapsed = 0; |
555 | 0 | if (use_direct_io() && is_aligned == false) { |
556 | 0 | FSReadRequest aligned_req = Align(req, alignment); |
557 | 0 | aligned_req.status.PermitUncheckedError(); |
558 | | |
559 | | // Allocate aligned buffer. |
560 | 0 | read_async_info->buf_.Alignment(alignment); |
561 | 0 | read_async_info->buf_.AllocateNewBuffer(aligned_req.len); |
562 | | |
563 | | // Set rem fields in aligned FSReadRequest. |
564 | 0 | aligned_req.scratch = read_async_info->buf_.BufferStart(); |
565 | | |
566 | | // Set user provided fields to populate back in callback. |
567 | 0 | read_async_info->user_scratch_ = req.scratch; |
568 | 0 | read_async_info->user_aligned_buf_ = aligned_buf; |
569 | 0 | read_async_info->user_len_ = req.len; |
570 | 0 | read_async_info->user_offset_ = req.offset; |
571 | 0 | read_async_info->user_result_ = req.result; |
572 | |
|
573 | 0 | assert(read_async_info->buf_.CurrentSize() == 0); |
574 | |
|
575 | 0 | StopWatch sw(clock_, stats_, hist_type_, |
576 | 0 | GetFileReadHistograms(stats_, opts.io_activity), |
577 | 0 | (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, |
578 | 0 | true /*delay_enabled*/); |
579 | 0 | s = file_->ReadAsync(aligned_req, opts, read_async_callback, |
580 | 0 | read_async_info, io_handle, del_fn, dbg); |
581 | 0 | } else { |
582 | 0 | StopWatch sw(clock_, stats_, hist_type_, |
583 | 0 | GetFileReadHistograms(stats_, opts.io_activity), |
584 | 0 | (stats_ != nullptr) ? &elapsed : nullptr, true /*overwrite*/, |
585 | 0 | true /*delay_enabled*/); |
586 | 0 | s = file_->ReadAsync(req, opts, read_async_callback, read_async_info, |
587 | 0 | io_handle, del_fn, dbg); |
588 | 0 | } |
589 | 0 | RecordTick(stats_, READ_ASYNC_MICROS, elapsed); |
590 | | |
591 | | // Suppress false positive clang analyzer warnings. |
592 | | // Memory is not released if file_->ReadAsync returns !s.ok(), because |
593 | | // ReadAsyncCallback is never called in that case. If ReadAsyncCallback is |
594 | | // called then ReadAsync should always return IOStatus::OK(). |
595 | 0 | #ifndef __clang_analyzer__ |
596 | 0 | if (!s.ok()) { |
597 | 0 | delete read_async_info; |
598 | 0 | } |
599 | 0 | #endif // __clang_analyzer__ |
600 | |
|
601 | 0 | return s; |
602 | 0 | } |
603 | | |
604 | | void RandomAccessFileReader::ReadAsyncCallback(FSReadRequest& req, |
605 | 0 | void* cb_arg) { |
606 | 0 | ReadAsyncInfo* read_async_info = static_cast<ReadAsyncInfo*>(cb_arg); |
607 | 0 | assert(read_async_info); |
608 | 0 | assert(read_async_info->cb_); |
609 | |
|
610 | 0 | if (use_direct_io() && read_async_info->is_aligned_ == false) { |
611 | | // Create FSReadRequest with user provided fields. |
612 | 0 | FSReadRequest user_req; |
613 | 0 | user_req.scratch = read_async_info->user_scratch_; |
614 | 0 | user_req.offset = read_async_info->user_offset_; |
615 | 0 | user_req.len = read_async_info->user_len_; |
616 | | |
617 | | // Update results in user_req. |
618 | 0 | user_req.result = req.result; |
619 | 0 | user_req.status = req.status; |
620 | |
|
621 | 0 | read_async_info->buf_.Size(read_async_info->buf_.CurrentSize() + |
622 | 0 | req.result.size()); |
623 | |
|
624 | 0 | size_t offset_advance_len = static_cast<size_t>( |
625 | 0 | /*offset_passed_by_user=*/read_async_info->user_offset_ - |
626 | 0 | /*aligned_offset=*/req.offset); |
627 | |
|
628 | 0 | size_t res_len = 0; |
629 | 0 | if (req.status.ok() && |
630 | 0 | offset_advance_len < read_async_info->buf_.CurrentSize()) { |
631 | 0 | res_len = |
632 | 0 | std::min(read_async_info->buf_.CurrentSize() - offset_advance_len, |
633 | 0 | read_async_info->user_len_); |
634 | 0 | if (read_async_info->user_aligned_buf_ == nullptr) { |
635 | | // Copy the data into user's scratch. |
636 | | // Clang analyzer assumes that it will take use_direct_io() == false in |
637 | | // ReadAsync and use_direct_io() == true in Callback which cannot be true. |
638 | 0 | #ifndef __clang_analyzer__ |
639 | 0 | read_async_info->buf_.Read(user_req.scratch, offset_advance_len, |
640 | 0 | res_len); |
641 | 0 | #endif // __clang_analyzer__ |
642 | 0 | } else { |
643 | | // Set aligned_buf provided by user without additional copy. |
644 | 0 | user_req.scratch = |
645 | 0 | read_async_info->buf_.BufferStart() + offset_advance_len; |
646 | 0 | *read_async_info->user_aligned_buf_ = read_async_info->buf_.Release(); |
647 | 0 | } |
648 | 0 | user_req.result = Slice(user_req.scratch, res_len); |
649 | 0 | } else { |
650 | | // Either req.status is not ok or data was not read. |
651 | 0 | user_req.result = Slice(); |
652 | 0 | } |
653 | 0 | read_async_info->cb_(user_req, read_async_info->cb_arg_); |
654 | 0 | } else { |
655 | 0 | read_async_info->cb_(req, read_async_info->cb_arg_); |
656 | 0 | } |
657 | | |
658 | | // Update stats and notify listeners. |
659 | 0 | if (stats_ != nullptr && file_read_hist_ != nullptr) { |
660 | | // elapsed doesn't take into account delay and overwrite as StopWatch does |
661 | | // in Read. |
662 | 0 | uint64_t elapsed = clock_->NowMicros() - read_async_info->start_time_; |
663 | 0 | file_read_hist_->Add(elapsed); |
664 | 0 | } |
665 | 0 | if (req.status.ok()) { |
666 | 0 | RecordInHistogram(stats_, ASYNC_READ_BYTES, req.result.size()); |
667 | 0 | } else if (!req.status.IsAborted()) { |
668 | 0 | RecordTick(stats_, ASYNC_READ_ERROR_COUNT, 1); |
669 | 0 | } |
670 | 0 | if (ShouldNotifyListeners()) { |
671 | 0 | auto finish_ts = FileOperationInfo::FinishNow(); |
672 | 0 | NotifyOnFileReadFinish(req.offset, req.result.size(), |
673 | 0 | read_async_info->fs_start_ts_, finish_ts, |
674 | 0 | req.status); |
675 | 0 | } |
676 | 0 | if (!req.status.ok()) { |
677 | 0 | NotifyOnIOError(req.status, FileOperationType::kRead, file_name(), |
678 | 0 | req.result.size(), req.offset); |
679 | 0 | } |
680 | 0 | RecordIOStats(stats_, file_temperature_, is_last_level_, req.result.size()); |
681 | 0 | delete read_async_info; |
682 | 0 | } |
683 | | } // namespace ROCKSDB_NAMESPACE |