/src/rocksdb/util/io_dispatcher_imp.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
7 | | // This source code is licensed under both the GPLv2 (found in the |
8 | | // COPYING file in the root directory) and Apache 2.0 License |
9 | | // (found in the LICENSE.Apache file in the root directory). |
10 | | // |
11 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
12 | | // Use of this source code is governed by a BSD-style license that can be |
13 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
14 | | |
15 | | #include "util/io_dispatcher_imp.h" |
16 | | |
17 | | #include <deque> |
18 | | #include <memory> |
19 | | #include <unordered_map> |
20 | | #include <unordered_set> |
21 | | #include <vector> |
22 | | |
23 | | #include "file/random_access_file_reader.h" |
24 | | #include "monitoring/statistics_impl.h" |
25 | | #include "port/port.h" |
26 | | #include "rocksdb/file_system.h" |
27 | | #include "rocksdb/io_dispatcher.h" |
28 | | #include "rocksdb/options.h" |
29 | | #include "rocksdb/status.h" |
30 | | #include "table/block_based/block_based_table_reader.h" |
31 | | #include "table/block_based/cachable_entry.h" |
32 | | #include "table/block_based/reader_common.h" |
33 | | #include "table/format.h" |
34 | | #include "test_util/sync_point.h" |
35 | | #include "util/mutexlock.h" |
36 | | |
37 | | namespace ROCKSDB_NAMESPACE { |
38 | | |
39 | | // IODispatcherImplData is the base that provides ReleaseMemory interface |
40 | | // for ReadSets to call back when releasing blocks. Defined here so it's |
41 | | // visible to ReadSet methods. |
42 | | struct IODispatcherImplData { |
43 | 0 | virtual ~IODispatcherImplData() = default; |
44 | | virtual void ReleaseMemory(size_t bytes) = 0; |
45 | | }; |
46 | | |
47 | | // Helper function to create and pin a block from a buffer |
48 | | // Used by both ReadSet::PollAndProcessAsyncIO and IODispatcherImpl::Impl |
49 | | static Status CreateAndPinBlockFromBuffer( |
50 | | const std::shared_ptr<IOJob>& job, const BlockHandle& block, |
51 | | uint64_t buffer_start_offset, const Slice& buffer_data, |
52 | 0 | CachableEntry<Block>& pinned_block_entry) { |
53 | 0 | auto* rep = job->table->get_rep(); |
54 | | |
55 | | // Get decompressor |
56 | 0 | UnownedPtr<Decompressor> decompressor = rep->decompressor.get(); |
57 | 0 | CachableEntry<DecompressorDict> cached_dict; |
58 | |
|
59 | 0 | if (rep->uncompression_dict_reader) { |
60 | 0 | Status s = rep->uncompression_dict_reader->GetOrReadUncompressionDictionary( |
61 | 0 | nullptr, job->job_options.read_options, nullptr, nullptr, &cached_dict); |
62 | 0 | if (!s.ok()) { |
63 | 0 | return s; |
64 | 0 | } |
65 | 0 | if (cached_dict.GetValue()) { |
66 | 0 | decompressor = cached_dict.GetValue()->decompressor_.get(); |
67 | 0 | } |
68 | 0 | } |
69 | | |
70 | | // Create block from buffer data |
71 | 0 | const auto block_size_with_trailer = |
72 | 0 | BlockBasedTable::BlockSizeWithTrailer(block); |
73 | 0 | const auto block_offset_in_buffer = block.offset() - buffer_start_offset; |
74 | |
|
75 | 0 | CacheAllocationPtr data = AllocateBlock( |
76 | 0 | block_size_with_trailer, GetMemoryAllocator(rep->table_options)); |
77 | 0 | memcpy(data.get(), buffer_data.data() + block_offset_in_buffer, |
78 | 0 | block_size_with_trailer); |
79 | 0 | BlockContents tmp_contents(std::move(data), block.size()); |
80 | |
|
81 | | #ifndef NDEBUG |
82 | | tmp_contents.has_trailer = rep->footer.GetBlockTrailerSize() > 0; |
83 | | #endif |
84 | |
|
85 | 0 | return job->table->CreateAndPinBlockInCache<Block_kData>( |
86 | 0 | job->job_options.read_options, block, decompressor, &tmp_contents, |
87 | 0 | &pinned_block_entry.As<Block_kData>()); |
88 | 0 | } |
89 | | |
90 | | // State for async IO operations (implementation detail) |
91 | | struct AsyncIOState { |
92 | 0 | AsyncIOState() : offset(static_cast<uint64_t>(-1)) {} |
93 | 0 | ~AsyncIOState() { read_req.status.PermitUncheckedError(); } |
94 | | |
95 | | AsyncIOState(const AsyncIOState&) = delete; |
96 | | AsyncIOState& operator=(const AsyncIOState&) = delete; |
97 | | AsyncIOState(AsyncIOState&&) = default; |
98 | | AsyncIOState& operator=(AsyncIOState&&) = default; |
99 | | |
100 | | std::unique_ptr<char[]> buf; |
101 | | AlignedBuf aligned_buf; |
102 | | void* io_handle = nullptr; |
103 | | IOHandleDeleter del_fn; |
104 | | uint64_t offset; |
105 | | std::vector<size_t> block_indices; |
106 | | std::vector<BlockHandle> blocks; |
107 | | FSReadRequest read_req; |
108 | | }; |
109 | | |
110 | | // ReadSet destructor - clean up IO handles |
111 | | // Must call AbortIO before deleting handles to avoid use-after-free when |
112 | | // io_uring completions arrive for deleted handles. |
113 | 0 | ReadSet::~ReadSet() { |
114 | | // Release memory for any blocks still pinned |
115 | | // Note: block_sizes_[i] is only set for async IO reads where memory |
116 | | // limiting applies. For sync reads, block_sizes_ remains 0, so this |
117 | | // loop is effectively a no-op for sync reads. |
118 | 0 | if (auto dispatcher_data = dispatcher_data_.lock()) { |
119 | 0 | for (size_t i = 0; i < block_sizes_.size(); ++i) { |
120 | 0 | if (block_sizes_[i] > 0 && pinned_blocks_[i].GetValue()) { |
121 | 0 | dispatcher_data->ReleaseMemory(block_sizes_[i]); |
122 | 0 | } |
123 | 0 | } |
124 | 0 | } |
125 | |
|
126 | 0 | if (async_io_map_.empty()) { |
127 | 0 | return; |
128 | 0 | } |
129 | | |
130 | | // Collect unique pending IO handles (multiple block indices may share the |
131 | | // same async_state due to coalescing) |
132 | 0 | std::vector<void*> pending_handles; |
133 | 0 | std::unordered_set<void*> seen_handles; |
134 | 0 | for (auto& pair : async_io_map_) { |
135 | 0 | auto& async_state = pair.second; |
136 | 0 | if (async_state->io_handle != nullptr && |
137 | 0 | seen_handles.find(async_state->io_handle) == seen_handles.end()) { |
138 | 0 | pending_handles.push_back(async_state->io_handle); |
139 | 0 | seen_handles.insert(async_state->io_handle); |
140 | 0 | } |
141 | 0 | } |
142 | | |
143 | | // Abort all pending IO operations before deleting handles |
144 | 0 | if (!pending_handles.empty() && fs_) { |
145 | | // AbortIO cancels pending requests and waits for completions |
146 | 0 | IOStatus s = fs_->AbortIO(pending_handles); |
147 | 0 | (void)s; // Ignore errors in destructor |
148 | 0 | } |
149 | | |
150 | | // Now safe to delete the handles |
151 | 0 | for (auto& pair : async_io_map_) { |
152 | 0 | auto& async_state = pair.second; |
153 | 0 | if (async_state->io_handle != nullptr && async_state->del_fn != nullptr) { |
154 | 0 | async_state->del_fn(async_state->io_handle); |
155 | 0 | async_state->io_handle = nullptr; |
156 | 0 | } |
157 | 0 | } |
158 | 0 | } |
159 | | |
160 | | // Main Read() method - transparently handles cache, async IO, and sync reads |
161 | 0 | Status ReadSet::ReadIndex(size_t block_index, CachableEntry<Block>* out) { |
162 | | // Bounds check |
163 | 0 | if (block_index >= pinned_blocks_.size()) { |
164 | 0 | return Status::InvalidArgument("Block index out of range"); |
165 | 0 | } |
166 | | |
167 | | // Case 1: Block is already available (from cache or sync read during |
168 | | // SubmitJob) |
169 | 0 | if (pinned_blocks_[block_index].GetValue()) { |
170 | 0 | *out = std::move(pinned_blocks_[block_index]); |
171 | | // Release memory accounting for prefetched blocks. After moving the value |
172 | | // out, ReleaseBlock() and the destructor check pinned_blocks_.GetValue() |
173 | | // which will be null, so they won't release memory again. |
174 | 0 | if (block_index < block_sizes_.size() && block_sizes_[block_index] > 0) { |
175 | 0 | if (auto dispatcher_data = dispatcher_data_.lock()) { |
176 | 0 | dispatcher_data->ReleaseMemory(block_sizes_[block_index]); |
177 | 0 | } |
178 | 0 | block_sizes_[block_index] = 0; |
179 | 0 | } |
180 | | // Note: Statistics for this block were already counted during SubmitJob |
181 | | // (either as cache hit or sync read) |
182 | 0 | return Status::OK(); |
183 | 0 | } |
184 | | |
185 | | // Case 2: Block has async IO in progress - poll and process |
186 | 0 | if (job_->job_options.read_options.async_io) { |
187 | 0 | auto it = async_io_map_.find(block_index); |
188 | 0 | if (it != async_io_map_.end()) { |
189 | | // Get the number of blocks in this coalesced async request BEFORE polling |
190 | | // (since PollAndProcessAsyncIO will remove entries from the map) |
191 | 0 | size_t num_blocks_in_request = it->second->block_indices.size(); |
192 | |
|
193 | 0 | if (Status s = PollAndProcessAsyncIO(it->second); !s.ok()) { |
194 | 0 | return s; |
195 | 0 | } |
196 | | // Count all blocks that were read in this async request |
197 | 0 | num_async_reads_ += num_blocks_in_request; |
198 | | |
199 | | // After polling, the block should be in pinned_blocks_ |
200 | 0 | if (pinned_blocks_[block_index].GetValue()) { |
201 | 0 | *out = std::move(pinned_blocks_[block_index]); |
202 | | // Release memory accounting (same as case 1 above) |
203 | 0 | if (block_index < block_sizes_.size() && |
204 | 0 | block_sizes_[block_index] > 0) { |
205 | 0 | if (auto dispatcher_data = dispatcher_data_.lock()) { |
206 | 0 | dispatcher_data->ReleaseMemory(block_sizes_[block_index]); |
207 | 0 | } |
208 | 0 | block_sizes_[block_index] = 0; |
209 | 0 | } |
210 | 0 | return Status::OK(); |
211 | 0 | } |
212 | | |
213 | 0 | return Status::IOError("Failed to process async IO result"); |
214 | 0 | } |
215 | 0 | } |
216 | | |
217 | | // Case 3: Block needs synchronous read (pending or never-dispatched blocks). |
218 | | // No ReleaseMemory() needed here because blocks reaching this path never had |
219 | | // TryAcquireMemory() called -- they were either pending prefetch or skipped |
220 | | // during SubmitJob. block_sizes_[block_index] may be > 0 (set during |
221 | | // SubmitJob for all uncached blocks) but that does not imply memory was |
222 | | // acquired. |
223 | 0 | RemoveFromPending(block_index); |
224 | |
|
225 | 0 | Status s = SyncRead(block_index); |
226 | 0 | if (s.ok()) { |
227 | 0 | *out = std::move(pinned_blocks_[block_index]); |
228 | 0 | num_sync_reads_++; |
229 | 0 | } |
230 | 0 | return s; |
231 | 0 | } |
232 | | |
233 | 0 | Status ReadSet::ReadOffset(size_t offset, CachableEntry<Block>* out) { |
234 | 0 | if (sorted_block_indices_.empty()) { |
235 | 0 | return Status::InvalidArgument("ReadSet not initialized"); |
236 | 0 | } |
237 | | |
238 | | // Use binary search on the sorted index to find the block containing offset. |
239 | | // sorted_block_indices_ contains original indices sorted by block offset. |
240 | 0 | const auto& block_handles = job_->block_handles; |
241 | | |
242 | | // Binary search for the first block whose offset is > offset, then back up |
243 | 0 | auto it = std::upper_bound(sorted_block_indices_.begin(), |
244 | 0 | sorted_block_indices_.end(), offset, |
245 | 0 | [&block_handles](size_t off, size_t idx) { |
246 | 0 | return off < block_handles[idx].offset(); |
247 | 0 | }); |
248 | | |
249 | | // If it == begin(), offset is before all blocks |
250 | 0 | if (it == sorted_block_indices_.begin()) { |
251 | 0 | return Status::InvalidArgument("Offset not found in any block"); |
252 | 0 | } |
253 | | |
254 | | // Back up to the candidate block (largest offset <= our offset) |
255 | 0 | --it; |
256 | 0 | size_t candidate_idx = *it; |
257 | 0 | const auto& handle = block_handles[candidate_idx]; |
258 | | |
259 | | // Check if offset falls within this block |
260 | 0 | if (offset >= handle.offset() && offset < (handle.offset() + handle.size())) { |
261 | 0 | return ReadIndex(candidate_idx, out); |
262 | 0 | } |
263 | | |
264 | 0 | return Status::InvalidArgument("Offset not found in any block"); |
265 | 0 | } |
266 | | |
267 | 0 | void ReadSet::ReleaseBlock(size_t block_index) { |
268 | 0 | if (block_index >= pinned_blocks_.size()) { |
269 | 0 | return; |
270 | 0 | } |
271 | | |
272 | | // Remove from pending if applicable |
273 | 0 | RemoveFromPending(block_index); |
274 | | |
275 | | // Release memory BEFORE unpinning |
276 | | // Note: block_sizes_[idx] is only set for async IO reads where memory |
277 | | // limiting applies. For sync reads, block_sizes_ remains 0, so this |
278 | | // check implicitly skips ReleaseMemory for sync reads. |
279 | 0 | if (pinned_blocks_[block_index].GetValue() && |
280 | 0 | block_index < block_sizes_.size() && block_sizes_[block_index] > 0) { |
281 | 0 | if (auto dispatcher_data = dispatcher_data_.lock()) { |
282 | 0 | dispatcher_data->ReleaseMemory(block_sizes_[block_index]); |
283 | 0 | } |
284 | 0 | block_sizes_[block_index] = 0; // Prevent double-release |
285 | 0 | } |
286 | | |
287 | | // Unpin the block from cache |
288 | 0 | pinned_blocks_[block_index].Reset(); |
289 | | // Clean up any pending async IO for this block |
290 | 0 | async_io_map_.erase(block_index); |
291 | 0 | } |
292 | | |
293 | 0 | bool ReadSet::IsBlockAvailable(size_t block_index) const { |
294 | 0 | if (block_index >= pinned_blocks_.size()) { |
295 | 0 | return false; |
296 | 0 | } |
297 | | // Block is available if it hasn't been released (still has a value or |
298 | | // has pending async IO) |
299 | 0 | return pinned_blocks_[block_index].GetValue() != nullptr || |
300 | 0 | async_io_map_.find(block_index) != async_io_map_.end(); |
301 | 0 | } |
302 | | |
303 | | // Poll and process async IO for a specific block |
304 | | Status ReadSet::PollAndProcessAsyncIO( |
305 | 0 | const std::shared_ptr<AsyncIOState>& async_state) { |
306 | 0 | auto* rep = job_->table->get_rep(); |
307 | | |
308 | | // Poll for IO completion using FileSystem Poll API |
309 | 0 | std::vector<void*> io_handles = {async_state->io_handle}; |
310 | 0 | IOStatus io_s = rep->ioptions.env->GetFileSystem()->Poll(io_handles, 1); |
311 | 0 | if (!io_s.ok()) { |
312 | 0 | return io_s; |
313 | 0 | } |
314 | | |
315 | | // Check for read errors |
316 | 0 | if (!async_state->read_req.status.ok()) { |
317 | 0 | return async_state->read_req.status; |
318 | 0 | } |
319 | | |
320 | | // Use the result slice from the callback which has been correctly set |
321 | | // with any necessary alignment adjustments for direct IO |
322 | 0 | const Slice& buffer_data = async_state->read_req.result; |
323 | | |
324 | | // Process all blocks in this async request |
325 | 0 | for (size_t i = 0; i < async_state->block_indices.size(); ++i) { |
326 | 0 | const size_t idx = async_state->block_indices[i]; |
327 | 0 | const auto& block_handle = async_state->blocks[i]; |
328 | |
|
329 | 0 | Status s = |
330 | 0 | CreateAndPinBlockFromBuffer(job_, block_handle, async_state->offset, |
331 | 0 | buffer_data, pinned_blocks_[idx]); |
332 | 0 | if (!s.ok()) { |
333 | 0 | return s; |
334 | 0 | } |
335 | 0 | } |
336 | | |
337 | | // Clean up IO handle |
338 | 0 | if (async_state->io_handle != nullptr && async_state->del_fn != nullptr) { |
339 | 0 | async_state->del_fn(async_state->io_handle); |
340 | 0 | async_state->io_handle = nullptr; |
341 | 0 | } |
342 | | |
343 | | // Remove from map - all blocks in this request have been processed |
344 | | // Store indices in a temporary vector to avoid iterator invalidation |
345 | 0 | std::vector<size_t> indices_to_remove = async_state->block_indices; |
346 | 0 | for (const auto idx : indices_to_remove) { |
347 | 0 | async_io_map_.erase(idx); |
348 | 0 | } |
349 | |
|
350 | 0 | return Status::OK(); |
351 | 0 | } |
352 | | |
353 | | // Perform synchronous read for a specific block |
354 | | // This performs a direct synchronous read from disk when the block is not in |
355 | | // cache |
356 | 0 | Status ReadSet::SyncRead(size_t block_index) { |
357 | 0 | const auto& block_handle = job_->block_handles[block_index]; |
358 | 0 | auto* rep = job_->table->get_rep(); |
359 | | |
360 | | // Get dictionary-aware decompressor if available |
361 | 0 | UnownedPtr<Decompressor> decompressor = rep->decompressor.get(); |
362 | 0 | CachableEntry<DecompressorDict> cached_dict; |
363 | 0 | if (rep->uncompression_dict_reader) { |
364 | 0 | Status s = rep->uncompression_dict_reader->GetOrReadUncompressionDictionary( |
365 | 0 | nullptr, job_->job_options.read_options, nullptr, nullptr, |
366 | 0 | &cached_dict); |
367 | 0 | if (!s.ok()) { |
368 | 0 | return s; |
369 | 0 | } |
370 | 0 | if (cached_dict.GetValue()) { |
371 | 0 | decompressor = cached_dict.GetValue()->decompressor_.get(); |
372 | 0 | } |
373 | 0 | } |
374 | | |
375 | 0 | return job_->table->RetrieveBlock<Block_kData>( |
376 | 0 | /*prefetch_buffer=*/nullptr, job_->job_options.read_options, block_handle, |
377 | 0 | decompressor, &pinned_blocks_[block_index].As<Block_kData>(), |
378 | 0 | /*get_context=*/nullptr, /*lookup_context=*/nullptr, |
379 | 0 | /*for_compaction=*/false, /*use_cache=*/true, |
380 | 0 | /*async_read=*/false, /*use_block_cache_for_lookup=*/true); |
381 | 0 | } |
382 | | |
383 | | // A pre-coalesced group of blocks for prefetching |
384 | | struct CoalescedPrefetchGroup { |
385 | | std::vector<size_t> block_indices; // Blocks in this group (sorted by offset) |
386 | | size_t total_bytes = 0; // Total bytes for this IO |
387 | | }; |
388 | | |
389 | | // State for a pending memory request waiting to be granted |
390 | | // Groups are pre-coalesced at queue time for efficient dispatch |
391 | | struct PendingPrefetchRequest { |
392 | | std::weak_ptr<ReadSet> read_set; |
393 | | std::shared_ptr<IOJob> job; |
394 | | |
395 | | // Pre-coalesced groups ready for dispatch (ordered by first block index) |
396 | | std::deque<CoalescedPrefetchGroup> coalesced_groups; |
397 | | |
398 | | // Individual block indices still pending (for RemoveFromPending lookup) |
399 | | std::unordered_set<size_t> block_indices_to_prefetch; |
400 | | |
401 | | std::atomic<size_t> pending_bytes_{0}; // Track remaining bytes |
402 | | mutable port::Mutex groups_mutex_; // Protects groups and set modifications |
403 | | }; |
404 | | |
405 | | // Remove a block from pending prefetch (called when block is read or released) |
406 | 0 | void ReadSet::RemoveFromPending(size_t block_index) { |
407 | 0 | if (!pending_prefetch_flags_ || block_index >= pending_prefetch_flags_size_) { |
408 | 0 | return; |
409 | 0 | } |
410 | | |
411 | | // Atomic exchange - returns true only if it was previously true |
412 | 0 | if (!pending_prefetch_flags_[block_index].exchange(false)) { |
413 | 0 | return; // Already removed or never pending |
414 | 0 | } |
415 | | |
416 | 0 | if (pending_request_) { |
417 | 0 | MutexLock lock(&pending_request_->groups_mutex_); |
418 | 0 | pending_request_->block_indices_to_prefetch.erase(block_index); |
419 | 0 | pending_request_->pending_bytes_ -= block_sizes_[block_index]; |
420 | 0 | } |
421 | 0 | } |
422 | | |
423 | | // IODispatcherImpl::Impl inherits from IODispatcherImplData |
424 | | struct IODispatcherImpl::Impl : public IODispatcherImplData, |
425 | | public std::enable_shared_from_this<Impl> { |
426 | | explicit Impl(const IODispatcherOptions& options); |
427 | | ~Impl() override; |
428 | | |
429 | | // Non-copyable and non-movable |
430 | | Impl(const Impl&) = delete; |
431 | | Impl& operator=(const Impl&) = delete; |
432 | | Impl(Impl&&) = delete; |
433 | | Impl& operator=(Impl&&) = delete; |
434 | | |
435 | | Status SubmitJob(const std::shared_ptr<IOJob>& job, |
436 | | std::shared_ptr<ReadSet>* read_set); |
437 | | |
438 | | // Memory management methods - non-blocking |
439 | | bool TryAcquireMemory(size_t bytes); |
440 | | void ReleaseMemory(size_t bytes) override; |
441 | | |
442 | | // Memory limiting state |
443 | | size_t max_prefetch_memory_bytes_ = 0; |
444 | | std::atomic<size_t> memory_used_{0}; // Atomic for lock-free accounting |
445 | | std::atomic<bool> has_pending_requests_{false}; // Fast-path check |
446 | | port::Mutex memory_mutex_; // Only for pending_prefetch_queue_ access |
447 | | std::deque<std::shared_ptr<PendingPrefetchRequest>> pending_prefetch_queue_; |
448 | | Statistics* statistics_ = nullptr; |
449 | | |
450 | | private: |
451 | | void PrepareIORequests( |
452 | | const std::shared_ptr<IOJob>& job, |
453 | | const std::vector<size_t>& block_indices_to_read, |
454 | | const std::vector<BlockHandle>& block_handles, |
455 | | std::vector<FSReadRequest>* read_reqs, |
456 | | std::vector<std::vector<size_t>>* coalesced_block_indices); |
457 | | |
458 | | // Surface actual async IO errors to caller, but allow fallback for |
459 | | // unsupported cases. Returns block indices that need sync fallback. |
460 | | std::vector<size_t> ExecuteAsyncIO( |
461 | | const std::shared_ptr<IOJob>& job, |
462 | | const std::shared_ptr<ReadSet>& read_set, |
463 | | std::vector<FSReadRequest>& read_reqs, |
464 | | const std::vector<std::vector<size_t>>& coalesced_block_indices, |
465 | | Status* out_status); |
466 | | |
467 | | Status ExecuteSyncIO( |
468 | | const std::shared_ptr<IOJob>& job, |
469 | | const std::shared_ptr<ReadSet>& read_set, |
470 | | std::vector<FSReadRequest>& read_reqs, |
471 | | const std::vector<std::vector<size_t>>& coalesced_block_indices); |
472 | | |
473 | | // Try to dispatch pending prefetch requests when memory becomes available |
474 | | void TryDispatchPendingPrefetches(); |
475 | | |
476 | | // Dispatch prefetch for a specific ReadSet (called when memory is available) |
477 | | void DispatchPrefetch(const std::shared_ptr<ReadSet>& read_set, |
478 | | const std::shared_ptr<IOJob>& job, |
479 | | const std::vector<size_t>& block_indices); |
480 | | |
481 | | // Pre-coalesce blocks into groups, respecting max_group_bytes size limit. |
482 | | // Returns groups ordered by first block index (earlier blocks first). |
483 | | std::vector<CoalescedPrefetchGroup> PreCoalesceBlocks( |
484 | | const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& rs, |
485 | | const std::vector<size_t>& block_indices, size_t max_group_bytes); |
486 | | }; |
487 | | |
488 | | IODispatcherImpl::Impl::Impl(const IODispatcherOptions& options) |
489 | 0 | : max_prefetch_memory_bytes_(options.max_prefetch_memory_bytes), |
490 | 0 | statistics_(options.statistics) {} |
491 | | |
492 | 0 | IODispatcherImpl::Impl::~Impl() {} |
493 | | |
494 | 0 | bool IODispatcherImpl::Impl::TryAcquireMemory(size_t bytes) { |
495 | 0 | if (max_prefetch_memory_bytes_ == 0) { |
496 | 0 | return true; // No limit configured |
497 | 0 | } |
498 | | |
499 | | // Lock-free memory acquisition using compare-exchange |
500 | 0 | size_t current = memory_used_.load(std::memory_order_relaxed); |
501 | 0 | while (true) { |
502 | 0 | if (current + bytes > max_prefetch_memory_bytes_) { |
503 | | // Not enough memory - caller should queue for later |
504 | 0 | RecordTick(statistics_, PREFETCH_MEMORY_REQUESTS_BLOCKED); |
505 | 0 | return false; |
506 | 0 | } |
507 | 0 | if (memory_used_.compare_exchange_weak(current, current + bytes, |
508 | 0 | std::memory_order_release, |
509 | 0 | std::memory_order_relaxed)) { |
510 | 0 | RecordTick(statistics_, PREFETCH_MEMORY_BYTES_GRANTED, bytes); |
511 | 0 | return true; |
512 | 0 | } |
513 | | // current is updated by compare_exchange_weak on failure, retry |
514 | 0 | } |
515 | 0 | } |
516 | | |
517 | 0 | void IODispatcherImpl::Impl::ReleaseMemory(size_t bytes) { |
518 | 0 | if (max_prefetch_memory_bytes_ == 0) { |
519 | 0 | return; // No limit configured |
520 | 0 | } |
521 | | |
522 | | // Lock-free memory release using atomic fetch_sub |
523 | 0 | size_t old_val = memory_used_.fetch_sub(bytes, std::memory_order_release); |
524 | 0 | assert(old_val >= bytes); |
525 | 0 | (void)old_val; // Suppress unused warning in release builds |
526 | 0 | RecordTick(statistics_, PREFETCH_MEMORY_BYTES_RELEASED, bytes); |
527 | | |
528 | | // Fast-path: skip dispatch attempt if no pending requests |
529 | | // This avoids mutex contention in the common single-threaded iterator case |
530 | 0 | if (!has_pending_requests_.load(std::memory_order_acquire)) { |
531 | 0 | return; |
532 | 0 | } |
533 | | |
534 | | // Try to dispatch pending prefetches now that memory is available |
535 | 0 | TryDispatchPendingPrefetches(); |
536 | 0 | } |
537 | | |
538 | 0 | void IODispatcherImpl::Impl::TryDispatchPendingPrefetches() { |
539 | | // Process pending prefetch requests - dispatch entire coalesced groups |
540 | 0 | while (true) { |
541 | 0 | std::shared_ptr<PendingPrefetchRequest> pending; |
542 | |
|
543 | 0 | { |
544 | 0 | MutexLock lock(&memory_mutex_); |
545 | 0 | if (pending_prefetch_queue_.empty()) { |
546 | 0 | has_pending_requests_.store(false, std::memory_order_release); |
547 | 0 | return; |
548 | 0 | } |
549 | | |
550 | | // Get the next pending request |
551 | 0 | pending = std::move(pending_prefetch_queue_.front()); |
552 | 0 | pending_prefetch_queue_.pop_front(); |
553 | 0 | } |
554 | | |
555 | | // Check if the ReadSet is still alive |
556 | 0 | auto read_set = pending->read_set.lock(); |
557 | 0 | if (!read_set) { |
558 | 0 | continue; // ReadSet was destroyed, skip this request |
559 | 0 | } |
560 | | |
561 | | // Try to acquire memory for coalesced groups (entire groups at a time) |
562 | 0 | std::vector<size_t> blocks_to_dispatch; |
563 | 0 | bool has_remaining_groups = false; |
564 | |
|
565 | 0 | { |
566 | 0 | MutexLock lock(&pending->groups_mutex_); |
567 | |
|
568 | 0 | while (!pending->coalesced_groups.empty()) { |
569 | 0 | auto& group = pending->coalesced_groups.front(); |
570 | | |
571 | | // Filter out blocks that were already read (not in pending set anymore) |
572 | 0 | std::vector<size_t> remaining_blocks; |
573 | 0 | size_t remaining_bytes = 0; |
574 | 0 | for (size_t idx : group.block_indices) { |
575 | 0 | if (pending->block_indices_to_prefetch.count(idx) > 0) { |
576 | 0 | remaining_blocks.push_back(idx); |
577 | 0 | remaining_bytes += read_set->block_sizes_[idx]; |
578 | 0 | } |
579 | 0 | } |
580 | | |
581 | | // Skip empty groups (all blocks were already read) |
582 | 0 | if (remaining_blocks.empty()) { |
583 | 0 | pending->coalesced_groups.pop_front(); |
584 | 0 | continue; |
585 | 0 | } |
586 | | |
587 | | // Try to acquire memory for remaining blocks only |
588 | 0 | if (TryAcquireMemory(remaining_bytes)) { |
589 | | // Add all remaining blocks from this group to dispatch |
590 | 0 | for (size_t idx : remaining_blocks) { |
591 | 0 | blocks_to_dispatch.push_back(idx); |
592 | 0 | pending->block_indices_to_prefetch.erase(idx); |
593 | 0 | } |
594 | 0 | pending->pending_bytes_ -= remaining_bytes; |
595 | 0 | pending->coalesced_groups.pop_front(); |
596 | 0 | } else { |
597 | | // Not enough memory for this group - update with remaining blocks |
598 | 0 | group.block_indices = std::move(remaining_blocks); |
599 | 0 | group.total_bytes = remaining_bytes; |
600 | 0 | has_remaining_groups = true; |
601 | 0 | break; |
602 | 0 | } |
603 | 0 | } |
604 | 0 | } |
605 | | |
606 | | // Save job before potential move of pending |
607 | 0 | auto job = pending->job; |
608 | | |
609 | | // Requeue if groups remain |
610 | 0 | if (has_remaining_groups) { |
611 | 0 | MutexLock lock(&memory_mutex_); |
612 | 0 | pending_prefetch_queue_.push_front(std::move(pending)); |
613 | 0 | } else { |
614 | | // All groups dispatched, clear pending state |
615 | 0 | read_set->pending_request_.reset(); |
616 | 0 | } |
617 | | |
618 | | // Clear pending flags for dispatched blocks |
619 | 0 | if (read_set->pending_prefetch_flags_) { |
620 | 0 | for (size_t idx : blocks_to_dispatch) { |
621 | 0 | if (idx < read_set->pending_prefetch_flags_size_) { |
622 | 0 | read_set->pending_prefetch_flags_[idx].store(false); |
623 | 0 | } |
624 | 0 | } |
625 | 0 | } |
626 | | |
627 | | // Dispatch acquired blocks |
628 | 0 | if (!blocks_to_dispatch.empty()) { |
629 | 0 | DispatchPrefetch(read_set, job, blocks_to_dispatch); |
630 | 0 | } |
631 | | |
632 | | // If we dispatched nothing, stop (no memory available for any group) |
633 | 0 | if (blocks_to_dispatch.empty()) { |
634 | 0 | return; |
635 | 0 | } |
636 | 0 | } |
637 | 0 | } |
638 | | |
639 | | void IODispatcherImpl::Impl::DispatchPrefetch( |
640 | | const std::shared_ptr<ReadSet>& read_set, const std::shared_ptr<IOJob>& job, |
641 | 0 | const std::vector<size_t>& block_indices) { |
642 | | // Sync point for testing partial prefetch - passes number of blocks being |
643 | | // dispatched |
644 | 0 | TEST_SYNC_POINT_CALLBACK("IODispatcherImpl::DispatchPrefetch:BlockCount", |
645 | 0 | const_cast<std::vector<size_t>*>(&block_indices)); |
646 | | |
647 | | // Prepare and execute IO for the given blocks |
648 | 0 | std::vector<FSReadRequest> read_reqs; |
649 | 0 | std::vector<std::vector<size_t>> coalesced_block_indices; |
650 | 0 | PrepareIORequests(job, block_indices, job->block_handles, &read_reqs, |
651 | 0 | &coalesced_block_indices); |
652 | |
|
653 | 0 | if (job->job_options.read_options.async_io) { |
654 | 0 | Status async_status; |
655 | 0 | std::vector<size_t> fallback_indices = ExecuteAsyncIO( |
656 | 0 | job, read_set, read_reqs, coalesced_block_indices, &async_status); |
657 | | |
658 | | // For blocks where async is not supported, do sync IO |
659 | 0 | if (!fallback_indices.empty()) { |
660 | 0 | std::vector<FSReadRequest> sync_read_reqs; |
661 | 0 | std::vector<std::vector<size_t>> sync_coalesced_indices; |
662 | 0 | PrepareIORequests(job, fallback_indices, job->block_handles, |
663 | 0 | &sync_read_reqs, &sync_coalesced_indices); |
664 | | // Prefetch errors are ignored - user will get the error when reading |
665 | 0 | Status s = |
666 | 0 | ExecuteSyncIO(job, read_set, sync_read_reqs, sync_coalesced_indices); |
667 | 0 | s.PermitUncheckedError(); |
668 | 0 | read_set->num_sync_reads_ += fallback_indices.size(); |
669 | 0 | } |
670 | | // Async errors are also ignored - user will get the error when reading |
671 | 0 | async_status.PermitUncheckedError(); |
672 | 0 | } else { |
673 | | // Prefetch errors are ignored - user will get the error when reading |
674 | 0 | Status s = ExecuteSyncIO(job, read_set, read_reqs, coalesced_block_indices); |
675 | 0 | s.PermitUncheckedError(); |
676 | 0 | read_set->num_sync_reads_ += block_indices.size(); |
677 | 0 | } |
678 | 0 | } |
679 | | |
680 | | Status IODispatcherImpl::Impl::SubmitJob(const std::shared_ptr<IOJob>& job, |
681 | 0 | std::shared_ptr<ReadSet>* read_set) { |
682 | 0 | if (!read_set) { |
683 | 0 | return Status::InvalidArgument("read_set output parameter is null"); |
684 | 0 | } |
685 | | |
686 | 0 | auto rs = std::make_shared<ReadSet>(); |
687 | | |
688 | | // Initialize ReadSet |
689 | 0 | rs->job_ = job; |
690 | 0 | rs->fs_ = job->table->get_rep()->ioptions.env->GetFileSystem(); |
691 | 0 | rs->pinned_blocks_.resize(job->block_handles.size()); |
692 | 0 | rs->block_sizes_.resize(job->block_handles.size(), 0); |
693 | | |
694 | | // Build sorted index for O(log n) ReadOffset lookups via binary search. |
695 | | // sorted_block_indices_[i] = original index of i-th smallest block by offset. |
696 | 0 | rs->sorted_block_indices_.resize(job->block_handles.size()); |
697 | 0 | for (size_t i = 0; i < job->block_handles.size(); ++i) { |
698 | 0 | rs->sorted_block_indices_[i] = i; |
699 | 0 | } |
700 | 0 | std::sort(rs->sorted_block_indices_.begin(), rs->sorted_block_indices_.end(), |
701 | 0 | [&job](size_t a, size_t b) { |
702 | 0 | return job->block_handles[a].offset() < |
703 | 0 | job->block_handles[b].offset(); |
704 | 0 | }); |
705 | | |
706 | | // Step 1: Check cache and pin cached blocks |
707 | 0 | std::vector<size_t> block_indices_to_read; |
708 | |
|
709 | 0 | for (size_t i = 0; i < job->block_handles.size(); ++i) { |
710 | 0 | const auto& data_block_handle = job->block_handles[i]; |
711 | | |
712 | | // Lookup and pin block in cache |
713 | 0 | Status s = job->table->LookupAndPinBlocksInCache<Block_kData>( |
714 | 0 | job->job_options.read_options, data_block_handle, |
715 | 0 | &(rs->pinned_blocks_)[i].As<Block_kData>()); |
716 | |
|
717 | 0 | if (!s.ok()) { |
718 | 0 | continue; |
719 | 0 | } |
720 | | |
721 | 0 | if (!(rs->pinned_blocks_)[i].GetValue()) { |
722 | | // Block not in cache - needs to be read from disk |
723 | 0 | block_indices_to_read.emplace_back(i); |
724 | 0 | } |
725 | 0 | } |
726 | | |
727 | | // Step 2: Prepare IO requests for blocks not in cache |
728 | 0 | if (block_indices_to_read.empty()) { |
729 | | // All blocks found in cache - count them as cache hits |
730 | 0 | rs->num_cache_hits_ = job->block_handles.size(); |
731 | 0 | *read_set = std::move(rs); |
732 | 0 | return Status::OK(); |
733 | 0 | } |
734 | | |
735 | | // Count cache hits (blocks that were found in cache during lookup above) |
736 | 0 | rs->num_cache_hits_ = |
737 | 0 | job->block_handles.size() - block_indices_to_read.size(); |
738 | | |
739 | | // Calculate block sizes for uncached blocks |
740 | 0 | for (const auto& idx : block_indices_to_read) { |
741 | 0 | size_t block_size = |
742 | 0 | BlockBasedTable::BlockSizeWithTrailer(job->block_handles[idx]); |
743 | 0 | rs->block_sizes_[idx] = block_size; |
744 | 0 | } |
745 | | |
746 | | // Store dispatcher reference for release callbacks |
747 | 0 | rs->dispatcher_data_ = shared_from_this(); |
748 | | |
749 | | // Pre-coalesce blocks into groups, respecting memory budget per group |
750 | | // This ensures we dispatch meaningful IO sizes, not tiny single-block IOs |
751 | | // Both memory-limited and non-memory-limited paths use the same coalescing |
752 | 0 | auto coalesced_groups = PreCoalesceBlocks(job, rs, block_indices_to_read, |
753 | 0 | max_prefetch_memory_bytes_); |
754 | |
|
755 | 0 | std::vector<size_t> blocks_to_dispatch; |
756 | 0 | std::deque<CoalescedPrefetchGroup> groups_to_queue; |
757 | | |
758 | | // Try to acquire memory for entire coalesced groups |
759 | 0 | for (auto& group : coalesced_groups) { |
760 | 0 | if (TryAcquireMemory(group.total_bytes)) { |
761 | | // Add all blocks from this group to dispatch |
762 | 0 | for (size_t idx : group.block_indices) { |
763 | 0 | blocks_to_dispatch.push_back(idx); |
764 | 0 | } |
765 | 0 | } else { |
766 | | // Queue this group for later |
767 | 0 | groups_to_queue.push_back(std::move(group)); |
768 | 0 | } |
769 | 0 | } |
770 | | |
771 | | // Dispatch acquired blocks immediately |
772 | 0 | if (!blocks_to_dispatch.empty()) { |
773 | 0 | DispatchPrefetch(rs, job, blocks_to_dispatch); |
774 | 0 | } |
775 | | |
776 | | // Queue remaining groups for later (only applies when memory limiting) |
777 | 0 | if (!groups_to_queue.empty()) { |
778 | 0 | auto pending = std::make_shared<PendingPrefetchRequest>(); |
779 | 0 | pending->read_set = rs; |
780 | 0 | pending->job = job; |
781 | |
|
782 | 0 | size_t pending_bytes = 0; |
783 | 0 | for (const auto& group : groups_to_queue) { |
784 | 0 | for (size_t idx : group.block_indices) { |
785 | 0 | pending->block_indices_to_prefetch.insert(idx); |
786 | 0 | } |
787 | 0 | pending_bytes += group.total_bytes; |
788 | 0 | } |
789 | 0 | pending->coalesced_groups = std::move(groups_to_queue); |
790 | 0 | pending->pending_bytes_ = pending_bytes; |
791 | | |
792 | | // Set up pending flags for queued blocks only |
793 | 0 | size_t num_blocks = job->block_handles.size(); |
794 | 0 | rs->pending_prefetch_flags_ = |
795 | 0 | std::make_unique<std::atomic<bool>[]>(num_blocks); |
796 | 0 | rs->pending_prefetch_flags_size_ = num_blocks; |
797 | 0 | for (size_t idx : pending->block_indices_to_prefetch) { |
798 | 0 | rs->pending_prefetch_flags_[idx].store(true); |
799 | 0 | } |
800 | 0 | rs->pending_request_ = pending; |
801 | |
|
802 | 0 | { |
803 | 0 | MutexLock lock(&memory_mutex_); |
804 | 0 | pending_prefetch_queue_.push_back(std::move(pending)); |
805 | 0 | has_pending_requests_.store(true, std::memory_order_release); |
806 | 0 | } |
807 | 0 | } |
808 | |
|
809 | 0 | *read_set = std::move(rs); |
810 | 0 | return Status::OK(); |
811 | 0 | } |
812 | | |
813 | | void IODispatcherImpl::Impl::PrepareIORequests( |
814 | | const std::shared_ptr<IOJob>& job, |
815 | | const std::vector<size_t>& block_indices_to_read, |
816 | | const std::vector<BlockHandle>& block_handles, |
817 | | std::vector<FSReadRequest>* read_reqs, |
818 | 0 | std::vector<std::vector<size_t>>* coalesced_block_indices) { |
819 | | // This is necessary because block handles may not be in sorted order |
820 | 0 | std::vector<size_t> sorted_block_indices = block_indices_to_read; |
821 | 0 | std::sort(sorted_block_indices.begin(), sorted_block_indices.end(), |
822 | 0 | [&block_handles](size_t a, size_t b) { |
823 | 0 | return block_handles[a].offset() < block_handles[b].offset(); |
824 | 0 | }); |
825 | |
|
826 | 0 | assert(coalesced_block_indices->empty()); |
827 | 0 | coalesced_block_indices->resize(1); |
828 | |
|
829 | 0 | for (const auto& block_idx : sorted_block_indices) { |
830 | 0 | if (!coalesced_block_indices->back().empty()) { |
831 | | // Check if we can coalesce with previous block |
832 | 0 | const auto& last_block_handle = |
833 | 0 | block_handles[coalesced_block_indices->back().back()]; |
834 | 0 | uint64_t last_block_end = |
835 | 0 | last_block_handle.offset() + |
836 | 0 | BlockBasedTable::BlockSizeWithTrailer(last_block_handle); |
837 | 0 | uint64_t current_start = block_handles[block_idx].offset(); |
838 | |
|
839 | 0 | if (current_start > |
840 | 0 | last_block_end + job->job_options.io_coalesce_threshold) { |
841 | | // Gap too large - start new IO request |
842 | 0 | coalesced_block_indices->emplace_back(); |
843 | 0 | } |
844 | 0 | } |
845 | 0 | coalesced_block_indices->back().emplace_back(block_idx); |
846 | 0 | } |
847 | | |
848 | | // Create FSReadRequest for each coalesced group |
849 | 0 | assert(read_reqs->empty()); |
850 | 0 | read_reqs->reserve(coalesced_block_indices->size()); |
851 | |
|
852 | 0 | for (const auto& block_indices : *coalesced_block_indices) { |
853 | 0 | assert(!block_indices.empty()); |
854 | | |
855 | | // Find the min and max offsets in this coalesced group |
856 | | // Since blocks are now sorted, first has min offset and last has max |
857 | 0 | const auto& first_block_handle = block_handles[block_indices[0]]; |
858 | 0 | const auto& last_block_handle = block_handles[block_indices.back()]; |
859 | |
|
860 | 0 | const auto start_offset = first_block_handle.offset(); |
861 | 0 | const auto end_offset = |
862 | 0 | last_block_handle.offset() + |
863 | 0 | BlockBasedTable::BlockSizeWithTrailer(last_block_handle); |
864 | |
|
865 | 0 | assert(end_offset > start_offset); |
866 | |
|
867 | 0 | read_reqs->emplace_back(); |
868 | 0 | read_reqs->back().offset = start_offset; |
869 | 0 | read_reqs->back().len = end_offset - start_offset; |
870 | 0 | read_reqs->back().scratch = nullptr; |
871 | 0 | } |
872 | 0 | } |
873 | | |
874 | | std::vector<CoalescedPrefetchGroup> IODispatcherImpl::Impl::PreCoalesceBlocks( |
875 | | const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& rs, |
876 | 0 | const std::vector<size_t>& block_indices, size_t max_group_bytes) { |
877 | 0 | std::vector<CoalescedPrefetchGroup> groups; |
878 | |
|
879 | 0 | if (block_indices.empty()) { |
880 | 0 | return groups; |
881 | 0 | } |
882 | | |
883 | 0 | const auto& block_handles = job->block_handles; |
884 | 0 | const uint64_t coalesce_threshold = job->job_options.io_coalesce_threshold; |
885 | | |
886 | | // Sort block indices by offset for coalescing |
887 | 0 | std::vector<size_t> sorted_indices = block_indices; |
888 | 0 | std::sort(sorted_indices.begin(), sorted_indices.end(), |
889 | 0 | [&block_handles](size_t a, size_t b) { |
890 | 0 | return block_handles[a].offset() < block_handles[b].offset(); |
891 | 0 | }); |
892 | | |
893 | | // Build coalesced groups respecting max_group_bytes |
894 | 0 | groups.emplace_back(); |
895 | |
|
896 | 0 | for (size_t idx : sorted_indices) { |
897 | 0 | size_t block_size = rs->block_sizes_[idx]; |
898 | | |
899 | | // Skip blocks that are individually larger than the memory budget |
900 | | // These will be read synchronously when needed (via ReadIndex fallback) |
901 | 0 | if (max_group_bytes > 0 && block_size > max_group_bytes) { |
902 | 0 | continue; |
903 | 0 | } |
904 | | |
905 | | // Check if we need to start a new group |
906 | 0 | bool start_new_group = false; |
907 | |
|
908 | 0 | if (!groups.back().block_indices.empty()) { |
909 | | // Check gap with previous block |
910 | 0 | size_t last_idx = groups.back().block_indices.back(); |
911 | 0 | const auto& last_handle = block_handles[last_idx]; |
912 | 0 | uint64_t last_end = last_handle.offset() + |
913 | 0 | BlockBasedTable::BlockSizeWithTrailer(last_handle); |
914 | 0 | uint64_t current_start = block_handles[idx].offset(); |
915 | |
|
916 | 0 | if (current_start > last_end + coalesce_threshold) { |
917 | 0 | start_new_group = true; // Gap too large |
918 | 0 | } else if (max_group_bytes > 0 && |
919 | 0 | groups.back().total_bytes + block_size > max_group_bytes) { |
920 | 0 | start_new_group = true; // Would exceed size limit |
921 | 0 | } |
922 | 0 | } |
923 | |
|
924 | 0 | if (start_new_group) { |
925 | 0 | groups.emplace_back(); |
926 | 0 | } |
927 | |
|
928 | 0 | groups.back().block_indices.push_back(idx); |
929 | 0 | groups.back().total_bytes += block_size; |
930 | 0 | } |
931 | |
|
932 | 0 | return groups; |
933 | 0 | } |
934 | | |
935 | | std::vector<size_t> IODispatcherImpl::Impl::ExecuteAsyncIO( |
936 | | const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& read_set, |
937 | | std::vector<FSReadRequest>& read_reqs, |
938 | | const std::vector<std::vector<size_t>>& coalesced_block_indices, |
939 | 0 | Status* out_status) { |
940 | 0 | std::vector<size_t> fallback_block_indices; |
941 | 0 | *out_status = Status::OK(); |
942 | | |
943 | | // Get file and IO options |
944 | 0 | auto* rep = job->table->get_rep(); |
945 | 0 | IOOptions io_opts; |
946 | 0 | Status s = |
947 | 0 | rep->file->PrepareIOOptions(job->job_options.read_options, io_opts); |
948 | 0 | if (!s.ok()) { |
949 | 0 | *out_status = s; |
950 | 0 | return fallback_block_indices; |
951 | 0 | } |
952 | | |
953 | 0 | const bool direct_io = rep->file->use_direct_io(); |
954 | | |
955 | | // Submit async read requests and store them in the ReadSet |
956 | 0 | for (size_t i = 0; i < read_reqs.size(); ++i) { |
957 | 0 | auto async_state = std::make_shared<AsyncIOState>(); |
958 | |
|
959 | 0 | async_state->offset = read_reqs[i].offset; |
960 | 0 | async_state->block_indices = coalesced_block_indices[i]; |
961 | 0 | async_state->read_req = std::move(read_reqs[i]); |
962 | |
|
963 | 0 | for (const auto idx : coalesced_block_indices[i]) { |
964 | 0 | async_state->blocks.emplace_back(job->block_handles[idx]); |
965 | 0 | } |
966 | |
|
967 | 0 | if (direct_io) { |
968 | 0 | async_state->read_req.scratch = nullptr; |
969 | 0 | } else { |
970 | 0 | async_state->buf.reset(new char[async_state->read_req.len]); |
971 | 0 | async_state->read_req.scratch = async_state->buf.get(); |
972 | 0 | } |
973 | | |
974 | | // Callback for async read completion |
975 | | // Store the result slice and status back into async_state so we can access |
976 | | // them after Poll() completes. |
977 | 0 | auto cb = [](const FSReadRequest& req, void* cb_arg) { |
978 | 0 | auto* state = static_cast<AsyncIOState*>(cb_arg); |
979 | 0 | state->read_req.result = req.result; |
980 | 0 | state->read_req.status = req.status; |
981 | 0 | }; |
982 | |
|
983 | 0 | s = rep->file->ReadAsync(async_state->read_req, io_opts, cb, |
984 | 0 | async_state.get(), &async_state->io_handle, |
985 | 0 | &async_state->del_fn, |
986 | 0 | direct_io ? &async_state->aligned_buf : nullptr); |
987 | |
|
988 | 0 | if (s.IsNotSupported()) { |
989 | | // Async IO may be compiled in but unavailable at runtime. Fall back to |
990 | | // the synchronous coalesced path for these blocks. |
991 | 0 | for (const auto idx : coalesced_block_indices[i]) { |
992 | 0 | fallback_block_indices.push_back(idx); |
993 | 0 | } |
994 | 0 | continue; |
995 | 0 | } |
996 | | |
997 | 0 | if (!s.ok()) { |
998 | | // Actual error - surface to caller |
999 | 0 | *out_status = s; |
1000 | 0 | return fallback_block_indices; |
1001 | 0 | } |
1002 | | |
1003 | 0 | if (async_state->io_handle == nullptr) { |
1004 | | // Async IO not supported - add to fallback list for sync IO |
1005 | 0 | for (const auto idx : coalesced_block_indices[i]) { |
1006 | 0 | fallback_block_indices.push_back(idx); |
1007 | 0 | } |
1008 | 0 | continue; |
1009 | 0 | } |
1010 | | |
1011 | | // Add async state to map for all blocks in this request |
1012 | 0 | for (const auto idx : async_state->block_indices) { |
1013 | 0 | read_set->async_io_map_[idx] = async_state; |
1014 | 0 | } |
1015 | 0 | } |
1016 | | |
1017 | 0 | return fallback_block_indices; |
1018 | 0 | } |
1019 | | |
1020 | | Status IODispatcherImpl::Impl::ExecuteSyncIO( |
1021 | | const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& read_set, |
1022 | | std::vector<FSReadRequest>& read_reqs, |
1023 | 0 | const std::vector<std::vector<size_t>>& coalesced_block_indices) { |
1024 | | // Get file and IO options |
1025 | 0 | auto* rep = job->table->get_rep(); |
1026 | 0 | IOOptions io_opts; |
1027 | 0 | if (Status s = |
1028 | 0 | rep->file->PrepareIOOptions(job->job_options.read_options, io_opts); |
1029 | 0 | !s.ok()) { |
1030 | 0 | return s; |
1031 | 0 | } |
1032 | | |
1033 | 0 | const bool direct_io = rep->file->use_direct_io(); |
1034 | | |
1035 | | // Setup scratch buffers for MultiRead |
1036 | 0 | std::unique_ptr<char[]> buf; |
1037 | |
|
1038 | 0 | if (direct_io) { |
1039 | 0 | for (auto& read_req : read_reqs) { |
1040 | 0 | read_req.scratch = nullptr; |
1041 | 0 | } |
1042 | 0 | } else { |
1043 | | // Allocate a single contiguous buffer for all requests |
1044 | 0 | size_t total_len = 0; |
1045 | 0 | for (const auto& req : read_reqs) { |
1046 | 0 | total_len += req.len; |
1047 | 0 | } |
1048 | 0 | buf.reset(new char[total_len]); |
1049 | 0 | size_t offset = 0; |
1050 | 0 | for (auto& read_req : read_reqs) { |
1051 | 0 | read_req.scratch = buf.get() + offset; |
1052 | 0 | offset += read_req.len; |
1053 | 0 | } |
1054 | 0 | } |
1055 | | |
1056 | | // Execute MultiRead |
1057 | 0 | AlignedBuf aligned_buf; |
1058 | 0 | if (Status s = |
1059 | 0 | rep->file->MultiRead(io_opts, read_reqs.data(), read_reqs.size(), |
1060 | 0 | direct_io ? &aligned_buf : nullptr); |
1061 | 0 | !s.ok()) { |
1062 | 0 | return s; |
1063 | 0 | } |
1064 | | |
1065 | 0 | for (const auto& rq : read_reqs) { |
1066 | 0 | if (!rq.status.ok()) { |
1067 | 0 | return rq.status; |
1068 | 0 | } |
1069 | 0 | } |
1070 | | |
1071 | | // Process all blocks from the MultiRead results |
1072 | 0 | for (size_t i = 0; i < coalesced_block_indices.size(); ++i) { |
1073 | 0 | const auto& read_req = read_reqs[i]; |
1074 | 0 | for (const auto& block_idx : coalesced_block_indices[i]) { |
1075 | 0 | const auto& block_handle = job->block_handles[block_idx]; |
1076 | |
|
1077 | 0 | Status create_status = CreateAndPinBlockFromBuffer( |
1078 | 0 | job, block_handle, read_req.offset, read_req.result, |
1079 | 0 | read_set->pinned_blocks_[block_idx]); |
1080 | 0 | if (!create_status.ok()) { |
1081 | 0 | return create_status; |
1082 | 0 | } |
1083 | 0 | } |
1084 | 0 | } |
1085 | | |
1086 | 0 | return Status::OK(); |
1087 | 0 | } |
1088 | | |
1089 | | IODispatcherImpl::IODispatcherImpl() |
1090 | 0 | : impl_(std::make_shared<Impl>(IODispatcherOptions())) {} |
1091 | | |
1092 | | IODispatcherImpl::IODispatcherImpl(const IODispatcherOptions& options) |
1093 | 0 | : impl_(std::make_shared<Impl>(options)) {} |
1094 | | |
1095 | 0 | IODispatcherImpl::~IODispatcherImpl() = default; |
1096 | | |
1097 | | Status IODispatcherImpl::SubmitJob(const std::shared_ptr<IOJob>& job, |
1098 | 0 | std::shared_ptr<ReadSet>* read_set) { |
1099 | 0 | return impl_->SubmitJob(job, read_set); |
1100 | 0 | } |
1101 | | |
1102 | 0 | IODispatcher* NewIODispatcher() { return new IODispatcherImpl(); } |
1103 | | |
1104 | 0 | IODispatcher* NewIODispatcher(const IODispatcherOptions& options) { |
1105 | 0 | return new IODispatcherImpl(options); |
1106 | 0 | } |
1107 | | |
1108 | | } // namespace ROCKSDB_NAMESPACE |