Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/util/io_dispatcher_imp.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
7
//  This source code is licensed under both the GPLv2 (found in the
8
//  COPYING file in the root directory) and Apache 2.0 License
9
//  (found in the LICENSE.Apache file in the root directory).
10
//
11
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
12
// Use of this source code is governed by a BSD-style license that can be
13
// found in the LICENSE file. See the AUTHORS file for names of contributors.
14
15
#include "util/io_dispatcher_imp.h"
16
17
#include <deque>
18
#include <memory>
19
#include <unordered_map>
20
#include <unordered_set>
21
#include <vector>
22
23
#include "file/random_access_file_reader.h"
24
#include "monitoring/statistics_impl.h"
25
#include "port/port.h"
26
#include "rocksdb/file_system.h"
27
#include "rocksdb/io_dispatcher.h"
28
#include "rocksdb/options.h"
29
#include "rocksdb/status.h"
30
#include "table/block_based/block_based_table_reader.h"
31
#include "table/block_based/cachable_entry.h"
32
#include "table/block_based/reader_common.h"
33
#include "table/format.h"
34
#include "test_util/sync_point.h"
35
#include "util/mutexlock.h"
36
37
namespace ROCKSDB_NAMESPACE {
38
39
// IODispatcherImplData is the base that provides ReleaseMemory interface
40
// for ReadSets to call back when releasing blocks. Defined here so it's
41
// visible to ReadSet methods.
42
struct IODispatcherImplData {
43
0
  virtual ~IODispatcherImplData() = default;
44
  virtual void ReleaseMemory(size_t bytes) = 0;
45
};
46
47
// Helper function to create and pin a block from a buffer
48
// Used by both ReadSet::PollAndProcessAsyncIO and IODispatcherImpl::Impl
49
static Status CreateAndPinBlockFromBuffer(
50
    const std::shared_ptr<IOJob>& job, const BlockHandle& block,
51
    uint64_t buffer_start_offset, const Slice& buffer_data,
52
0
    CachableEntry<Block>& pinned_block_entry) {
53
0
  auto* rep = job->table->get_rep();
54
55
  // Get decompressor
56
0
  UnownedPtr<Decompressor> decompressor = rep->decompressor.get();
57
0
  CachableEntry<DecompressorDict> cached_dict;
58
59
0
  if (rep->uncompression_dict_reader) {
60
0
    Status s = rep->uncompression_dict_reader->GetOrReadUncompressionDictionary(
61
0
        nullptr, job->job_options.read_options, nullptr, nullptr, &cached_dict);
62
0
    if (!s.ok()) {
63
0
      return s;
64
0
    }
65
0
    if (cached_dict.GetValue()) {
66
0
      decompressor = cached_dict.GetValue()->decompressor_.get();
67
0
    }
68
0
  }
69
70
  // Create block from buffer data
71
0
  const auto block_size_with_trailer =
72
0
      BlockBasedTable::BlockSizeWithTrailer(block);
73
0
  const auto block_offset_in_buffer = block.offset() - buffer_start_offset;
74
75
0
  CacheAllocationPtr data = AllocateBlock(
76
0
      block_size_with_trailer, GetMemoryAllocator(rep->table_options));
77
0
  memcpy(data.get(), buffer_data.data() + block_offset_in_buffer,
78
0
         block_size_with_trailer);
79
0
  BlockContents tmp_contents(std::move(data), block.size());
80
81
#ifndef NDEBUG
82
  tmp_contents.has_trailer = rep->footer.GetBlockTrailerSize() > 0;
83
#endif
84
85
0
  return job->table->CreateAndPinBlockInCache<Block_kData>(
86
0
      job->job_options.read_options, block, decompressor, &tmp_contents,
87
0
      &pinned_block_entry.As<Block_kData>());
88
0
}
89
90
// State for async IO operations (implementation detail)
91
struct AsyncIOState {
92
0
  AsyncIOState() : offset(static_cast<uint64_t>(-1)) {}
93
0
  ~AsyncIOState() { read_req.status.PermitUncheckedError(); }
94
95
  AsyncIOState(const AsyncIOState&) = delete;
96
  AsyncIOState& operator=(const AsyncIOState&) = delete;
97
  AsyncIOState(AsyncIOState&&) = default;
98
  AsyncIOState& operator=(AsyncIOState&&) = default;
99
100
  std::unique_ptr<char[]> buf;
101
  AlignedBuf aligned_buf;
102
  void* io_handle = nullptr;
103
  IOHandleDeleter del_fn;
104
  uint64_t offset;
105
  std::vector<size_t> block_indices;
106
  std::vector<BlockHandle> blocks;
107
  FSReadRequest read_req;
108
};
109
110
// ReadSet destructor - clean up IO handles
111
// Must call AbortIO before deleting handles to avoid use-after-free when
112
// io_uring completions arrive for deleted handles.
113
0
ReadSet::~ReadSet() {
114
  // Release memory for any blocks still pinned
115
  // Note: block_sizes_[i] is only set for async IO reads where memory
116
  // limiting applies. For sync reads, block_sizes_ remains 0, so this
117
  // loop is effectively a no-op for sync reads.
118
0
  if (auto dispatcher_data = dispatcher_data_.lock()) {
119
0
    for (size_t i = 0; i < block_sizes_.size(); ++i) {
120
0
      if (block_sizes_[i] > 0 && pinned_blocks_[i].GetValue()) {
121
0
        dispatcher_data->ReleaseMemory(block_sizes_[i]);
122
0
      }
123
0
    }
124
0
  }
125
126
0
  if (async_io_map_.empty()) {
127
0
    return;
128
0
  }
129
130
  // Collect unique pending IO handles (multiple block indices may share the
131
  // same async_state due to coalescing)
132
0
  std::vector<void*> pending_handles;
133
0
  std::unordered_set<void*> seen_handles;
134
0
  for (auto& pair : async_io_map_) {
135
0
    auto& async_state = pair.second;
136
0
    if (async_state->io_handle != nullptr &&
137
0
        seen_handles.find(async_state->io_handle) == seen_handles.end()) {
138
0
      pending_handles.push_back(async_state->io_handle);
139
0
      seen_handles.insert(async_state->io_handle);
140
0
    }
141
0
  }
142
143
  // Abort all pending IO operations before deleting handles
144
0
  if (!pending_handles.empty() && fs_) {
145
    // AbortIO cancels pending requests and waits for completions
146
0
    IOStatus s = fs_->AbortIO(pending_handles);
147
0
    (void)s;  // Ignore errors in destructor
148
0
  }
149
150
  // Now safe to delete the handles
151
0
  for (auto& pair : async_io_map_) {
152
0
    auto& async_state = pair.second;
153
0
    if (async_state->io_handle != nullptr && async_state->del_fn != nullptr) {
154
0
      async_state->del_fn(async_state->io_handle);
155
0
      async_state->io_handle = nullptr;
156
0
    }
157
0
  }
158
0
}
159
160
// Main Read() method - transparently handles cache, async IO, and sync reads
161
0
Status ReadSet::ReadIndex(size_t block_index, CachableEntry<Block>* out) {
162
  // Bounds check
163
0
  if (block_index >= pinned_blocks_.size()) {
164
0
    return Status::InvalidArgument("Block index out of range");
165
0
  }
166
167
  // Case 1: Block is already available (from cache or sync read during
168
  // SubmitJob)
169
0
  if (pinned_blocks_[block_index].GetValue()) {
170
0
    *out = std::move(pinned_blocks_[block_index]);
171
    // Release memory accounting for prefetched blocks. After moving the value
172
    // out, ReleaseBlock() and the destructor check pinned_blocks_.GetValue()
173
    // which will be null, so they won't release memory again.
174
0
    if (block_index < block_sizes_.size() && block_sizes_[block_index] > 0) {
175
0
      if (auto dispatcher_data = dispatcher_data_.lock()) {
176
0
        dispatcher_data->ReleaseMemory(block_sizes_[block_index]);
177
0
      }
178
0
      block_sizes_[block_index] = 0;
179
0
    }
180
    // Note: Statistics for this block were already counted during SubmitJob
181
    // (either as cache hit or sync read)
182
0
    return Status::OK();
183
0
  }
184
185
  // Case 2: Block has async IO in progress - poll and process
186
0
  if (job_->job_options.read_options.async_io) {
187
0
    auto it = async_io_map_.find(block_index);
188
0
    if (it != async_io_map_.end()) {
189
      // Get the number of blocks in this coalesced async request BEFORE polling
190
      // (since PollAndProcessAsyncIO will remove entries from the map)
191
0
      size_t num_blocks_in_request = it->second->block_indices.size();
192
193
0
      if (Status s = PollAndProcessAsyncIO(it->second); !s.ok()) {
194
0
        return s;
195
0
      }
196
      // Count all blocks that were read in this async request
197
0
      num_async_reads_ += num_blocks_in_request;
198
199
      // After polling, the block should be in pinned_blocks_
200
0
      if (pinned_blocks_[block_index].GetValue()) {
201
0
        *out = std::move(pinned_blocks_[block_index]);
202
        // Release memory accounting (same as case 1 above)
203
0
        if (block_index < block_sizes_.size() &&
204
0
            block_sizes_[block_index] > 0) {
205
0
          if (auto dispatcher_data = dispatcher_data_.lock()) {
206
0
            dispatcher_data->ReleaseMemory(block_sizes_[block_index]);
207
0
          }
208
0
          block_sizes_[block_index] = 0;
209
0
        }
210
0
        return Status::OK();
211
0
      }
212
213
0
      return Status::IOError("Failed to process async IO result");
214
0
    }
215
0
  }
216
217
  // Case 3: Block needs synchronous read (pending or never-dispatched blocks).
218
  // No ReleaseMemory() needed here because blocks reaching this path never had
219
  // TryAcquireMemory() called -- they were either pending prefetch or skipped
220
  // during SubmitJob. block_sizes_[block_index] may be > 0 (set during
221
  // SubmitJob for all uncached blocks) but that does not imply memory was
222
  // acquired.
223
0
  RemoveFromPending(block_index);
224
225
0
  Status s = SyncRead(block_index);
226
0
  if (s.ok()) {
227
0
    *out = std::move(pinned_blocks_[block_index]);
228
0
    num_sync_reads_++;
229
0
  }
230
0
  return s;
231
0
}
232
233
0
Status ReadSet::ReadOffset(size_t offset, CachableEntry<Block>* out) {
234
0
  if (sorted_block_indices_.empty()) {
235
0
    return Status::InvalidArgument("ReadSet not initialized");
236
0
  }
237
238
  // Use binary search on the sorted index to find the block containing offset.
239
  // sorted_block_indices_ contains original indices sorted by block offset.
240
0
  const auto& block_handles = job_->block_handles;
241
242
  // Binary search for the first block whose offset is > offset, then back up
243
0
  auto it = std::upper_bound(sorted_block_indices_.begin(),
244
0
                             sorted_block_indices_.end(), offset,
245
0
                             [&block_handles](size_t off, size_t idx) {
246
0
                               return off < block_handles[idx].offset();
247
0
                             });
248
249
  // If it == begin(), offset is before all blocks
250
0
  if (it == sorted_block_indices_.begin()) {
251
0
    return Status::InvalidArgument("Offset not found in any block");
252
0
  }
253
254
  // Back up to the candidate block (largest offset <= our offset)
255
0
  --it;
256
0
  size_t candidate_idx = *it;
257
0
  const auto& handle = block_handles[candidate_idx];
258
259
  // Check if offset falls within this block
260
0
  if (offset >= handle.offset() && offset < (handle.offset() + handle.size())) {
261
0
    return ReadIndex(candidate_idx, out);
262
0
  }
263
264
0
  return Status::InvalidArgument("Offset not found in any block");
265
0
}
266
267
0
void ReadSet::ReleaseBlock(size_t block_index) {
268
0
  if (block_index >= pinned_blocks_.size()) {
269
0
    return;
270
0
  }
271
272
  // Remove from pending if applicable
273
0
  RemoveFromPending(block_index);
274
275
  // Release memory BEFORE unpinning
276
  // Note: block_sizes_[idx] is only set for async IO reads where memory
277
  // limiting applies. For sync reads, block_sizes_ remains 0, so this
278
  // check implicitly skips ReleaseMemory for sync reads.
279
0
  if (pinned_blocks_[block_index].GetValue() &&
280
0
      block_index < block_sizes_.size() && block_sizes_[block_index] > 0) {
281
0
    if (auto dispatcher_data = dispatcher_data_.lock()) {
282
0
      dispatcher_data->ReleaseMemory(block_sizes_[block_index]);
283
0
    }
284
0
    block_sizes_[block_index] = 0;  // Prevent double-release
285
0
  }
286
287
  // Unpin the block from cache
288
0
  pinned_blocks_[block_index].Reset();
289
  // Clean up any pending async IO for this block
290
0
  async_io_map_.erase(block_index);
291
0
}
292
293
0
bool ReadSet::IsBlockAvailable(size_t block_index) const {
294
0
  if (block_index >= pinned_blocks_.size()) {
295
0
    return false;
296
0
  }
297
  // Block is available if it hasn't been released (still has a value or
298
  // has pending async IO)
299
0
  return pinned_blocks_[block_index].GetValue() != nullptr ||
300
0
         async_io_map_.find(block_index) != async_io_map_.end();
301
0
}
302
303
// Poll and process async IO for a specific block
304
Status ReadSet::PollAndProcessAsyncIO(
305
0
    const std::shared_ptr<AsyncIOState>& async_state) {
306
0
  auto* rep = job_->table->get_rep();
307
308
  // Poll for IO completion using FileSystem Poll API
309
0
  std::vector<void*> io_handles = {async_state->io_handle};
310
0
  IOStatus io_s = rep->ioptions.env->GetFileSystem()->Poll(io_handles, 1);
311
0
  if (!io_s.ok()) {
312
0
    return io_s;
313
0
  }
314
315
  // Check for read errors
316
0
  if (!async_state->read_req.status.ok()) {
317
0
    return async_state->read_req.status;
318
0
  }
319
320
  // Use the result slice from the callback which has been correctly set
321
  // with any necessary alignment adjustments for direct IO
322
0
  const Slice& buffer_data = async_state->read_req.result;
323
324
  // Process all blocks in this async request
325
0
  for (size_t i = 0; i < async_state->block_indices.size(); ++i) {
326
0
    const size_t idx = async_state->block_indices[i];
327
0
    const auto& block_handle = async_state->blocks[i];
328
329
0
    Status s =
330
0
        CreateAndPinBlockFromBuffer(job_, block_handle, async_state->offset,
331
0
                                    buffer_data, pinned_blocks_[idx]);
332
0
    if (!s.ok()) {
333
0
      return s;
334
0
    }
335
0
  }
336
337
  // Clean up IO handle
338
0
  if (async_state->io_handle != nullptr && async_state->del_fn != nullptr) {
339
0
    async_state->del_fn(async_state->io_handle);
340
0
    async_state->io_handle = nullptr;
341
0
  }
342
343
  // Remove from map - all blocks in this request have been processed
344
  // Store indices in a temporary vector to avoid iterator invalidation
345
0
  std::vector<size_t> indices_to_remove = async_state->block_indices;
346
0
  for (const auto idx : indices_to_remove) {
347
0
    async_io_map_.erase(idx);
348
0
  }
349
350
0
  return Status::OK();
351
0
}
352
353
// Perform synchronous read for a specific block
354
// This performs a direct synchronous read from disk when the block is not in
355
// cache
356
0
Status ReadSet::SyncRead(size_t block_index) {
357
0
  const auto& block_handle = job_->block_handles[block_index];
358
0
  auto* rep = job_->table->get_rep();
359
360
  // Get dictionary-aware decompressor if available
361
0
  UnownedPtr<Decompressor> decompressor = rep->decompressor.get();
362
0
  CachableEntry<DecompressorDict> cached_dict;
363
0
  if (rep->uncompression_dict_reader) {
364
0
    Status s = rep->uncompression_dict_reader->GetOrReadUncompressionDictionary(
365
0
        nullptr, job_->job_options.read_options, nullptr, nullptr,
366
0
        &cached_dict);
367
0
    if (!s.ok()) {
368
0
      return s;
369
0
    }
370
0
    if (cached_dict.GetValue()) {
371
0
      decompressor = cached_dict.GetValue()->decompressor_.get();
372
0
    }
373
0
  }
374
375
0
  return job_->table->RetrieveBlock<Block_kData>(
376
0
      /*prefetch_buffer=*/nullptr, job_->job_options.read_options, block_handle,
377
0
      decompressor, &pinned_blocks_[block_index].As<Block_kData>(),
378
0
      /*get_context=*/nullptr, /*lookup_context=*/nullptr,
379
0
      /*for_compaction=*/false, /*use_cache=*/true,
380
0
      /*async_read=*/false, /*use_block_cache_for_lookup=*/true);
381
0
}
382
383
// A pre-coalesced group of blocks for prefetching
384
struct CoalescedPrefetchGroup {
385
  std::vector<size_t> block_indices;  // Blocks in this group (sorted by offset)
386
  size_t total_bytes = 0;             // Total bytes for this IO
387
};
388
389
// State for a pending memory request waiting to be granted
390
// Groups are pre-coalesced at queue time for efficient dispatch
391
struct PendingPrefetchRequest {
392
  std::weak_ptr<ReadSet> read_set;
393
  std::shared_ptr<IOJob> job;
394
395
  // Pre-coalesced groups ready for dispatch (ordered by first block index)
396
  std::deque<CoalescedPrefetchGroup> coalesced_groups;
397
398
  // Individual block indices still pending (for RemoveFromPending lookup)
399
  std::unordered_set<size_t> block_indices_to_prefetch;
400
401
  std::atomic<size_t> pending_bytes_{0};  // Track remaining bytes
402
  mutable port::Mutex groups_mutex_;  // Protects groups and set modifications
403
};
404
405
// Remove a block from pending prefetch (called when block is read or released)
406
0
void ReadSet::RemoveFromPending(size_t block_index) {
407
0
  if (!pending_prefetch_flags_ || block_index >= pending_prefetch_flags_size_) {
408
0
    return;
409
0
  }
410
411
  // Atomic exchange - returns true only if it was previously true
412
0
  if (!pending_prefetch_flags_[block_index].exchange(false)) {
413
0
    return;  // Already removed or never pending
414
0
  }
415
416
0
  if (pending_request_) {
417
0
    MutexLock lock(&pending_request_->groups_mutex_);
418
0
    pending_request_->block_indices_to_prefetch.erase(block_index);
419
0
    pending_request_->pending_bytes_ -= block_sizes_[block_index];
420
0
  }
421
0
}
422
423
// IODispatcherImpl::Impl inherits from IODispatcherImplData
424
struct IODispatcherImpl::Impl : public IODispatcherImplData,
425
                                public std::enable_shared_from_this<Impl> {
426
  explicit Impl(const IODispatcherOptions& options);
427
  ~Impl() override;
428
429
  // Non-copyable and non-movable
430
  Impl(const Impl&) = delete;
431
  Impl& operator=(const Impl&) = delete;
432
  Impl(Impl&&) = delete;
433
  Impl& operator=(Impl&&) = delete;
434
435
  Status SubmitJob(const std::shared_ptr<IOJob>& job,
436
                   std::shared_ptr<ReadSet>* read_set);
437
438
  // Memory management methods - non-blocking
439
  bool TryAcquireMemory(size_t bytes);
440
  void ReleaseMemory(size_t bytes) override;
441
442
  // Memory limiting state
443
  size_t max_prefetch_memory_bytes_ = 0;
444
  std::atomic<size_t> memory_used_{0};  // Atomic for lock-free accounting
445
  std::atomic<bool> has_pending_requests_{false};  // Fast-path check
446
  port::Mutex memory_mutex_;  // Only for pending_prefetch_queue_ access
447
  std::deque<std::shared_ptr<PendingPrefetchRequest>> pending_prefetch_queue_;
448
  Statistics* statistics_ = nullptr;
449
450
 private:
451
  void PrepareIORequests(
452
      const std::shared_ptr<IOJob>& job,
453
      const std::vector<size_t>& block_indices_to_read,
454
      const std::vector<BlockHandle>& block_handles,
455
      std::vector<FSReadRequest>* read_reqs,
456
      std::vector<std::vector<size_t>>* coalesced_block_indices);
457
458
  // Surface actual async IO errors to caller, but allow fallback for
459
  // unsupported cases. Returns block indices that need sync fallback.
460
  std::vector<size_t> ExecuteAsyncIO(
461
      const std::shared_ptr<IOJob>& job,
462
      const std::shared_ptr<ReadSet>& read_set,
463
      std::vector<FSReadRequest>& read_reqs,
464
      const std::vector<std::vector<size_t>>& coalesced_block_indices,
465
      Status* out_status);
466
467
  Status ExecuteSyncIO(
468
      const std::shared_ptr<IOJob>& job,
469
      const std::shared_ptr<ReadSet>& read_set,
470
      std::vector<FSReadRequest>& read_reqs,
471
      const std::vector<std::vector<size_t>>& coalesced_block_indices);
472
473
  // Try to dispatch pending prefetch requests when memory becomes available
474
  void TryDispatchPendingPrefetches();
475
476
  // Dispatch prefetch for a specific ReadSet (called when memory is available)
477
  void DispatchPrefetch(const std::shared_ptr<ReadSet>& read_set,
478
                        const std::shared_ptr<IOJob>& job,
479
                        const std::vector<size_t>& block_indices);
480
481
  // Pre-coalesce blocks into groups, respecting max_group_bytes size limit.
482
  // Returns groups ordered by first block index (earlier blocks first).
483
  std::vector<CoalescedPrefetchGroup> PreCoalesceBlocks(
484
      const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& rs,
485
      const std::vector<size_t>& block_indices, size_t max_group_bytes);
486
};
487
488
IODispatcherImpl::Impl::Impl(const IODispatcherOptions& options)
489
0
    : max_prefetch_memory_bytes_(options.max_prefetch_memory_bytes),
490
0
      statistics_(options.statistics) {}
491
492
0
IODispatcherImpl::Impl::~Impl() {}
493
494
0
bool IODispatcherImpl::Impl::TryAcquireMemory(size_t bytes) {
495
0
  if (max_prefetch_memory_bytes_ == 0) {
496
0
    return true;  // No limit configured
497
0
  }
498
499
  // Lock-free memory acquisition using compare-exchange
500
0
  size_t current = memory_used_.load(std::memory_order_relaxed);
501
0
  while (true) {
502
0
    if (current + bytes > max_prefetch_memory_bytes_) {
503
      // Not enough memory - caller should queue for later
504
0
      RecordTick(statistics_, PREFETCH_MEMORY_REQUESTS_BLOCKED);
505
0
      return false;
506
0
    }
507
0
    if (memory_used_.compare_exchange_weak(current, current + bytes,
508
0
                                           std::memory_order_release,
509
0
                                           std::memory_order_relaxed)) {
510
0
      RecordTick(statistics_, PREFETCH_MEMORY_BYTES_GRANTED, bytes);
511
0
      return true;
512
0
    }
513
    // current is updated by compare_exchange_weak on failure, retry
514
0
  }
515
0
}
516
517
0
void IODispatcherImpl::Impl::ReleaseMemory(size_t bytes) {
518
0
  if (max_prefetch_memory_bytes_ == 0) {
519
0
    return;  // No limit configured
520
0
  }
521
522
  // Lock-free memory release using atomic fetch_sub
523
0
  size_t old_val = memory_used_.fetch_sub(bytes, std::memory_order_release);
524
0
  assert(old_val >= bytes);
525
0
  (void)old_val;  // Suppress unused warning in release builds
526
0
  RecordTick(statistics_, PREFETCH_MEMORY_BYTES_RELEASED, bytes);
527
528
  // Fast-path: skip dispatch attempt if no pending requests
529
  // This avoids mutex contention in the common single-threaded iterator case
530
0
  if (!has_pending_requests_.load(std::memory_order_acquire)) {
531
0
    return;
532
0
  }
533
534
  // Try to dispatch pending prefetches now that memory is available
535
0
  TryDispatchPendingPrefetches();
536
0
}
537
538
0
void IODispatcherImpl::Impl::TryDispatchPendingPrefetches() {
539
  // Process pending prefetch requests - dispatch entire coalesced groups
540
0
  while (true) {
541
0
    std::shared_ptr<PendingPrefetchRequest> pending;
542
543
0
    {
544
0
      MutexLock lock(&memory_mutex_);
545
0
      if (pending_prefetch_queue_.empty()) {
546
0
        has_pending_requests_.store(false, std::memory_order_release);
547
0
        return;
548
0
      }
549
550
      // Get the next pending request
551
0
      pending = std::move(pending_prefetch_queue_.front());
552
0
      pending_prefetch_queue_.pop_front();
553
0
    }
554
555
    // Check if the ReadSet is still alive
556
0
    auto read_set = pending->read_set.lock();
557
0
    if (!read_set) {
558
0
      continue;  // ReadSet was destroyed, skip this request
559
0
    }
560
561
    // Try to acquire memory for coalesced groups (entire groups at a time)
562
0
    std::vector<size_t> blocks_to_dispatch;
563
0
    bool has_remaining_groups = false;
564
565
0
    {
566
0
      MutexLock lock(&pending->groups_mutex_);
567
568
0
      while (!pending->coalesced_groups.empty()) {
569
0
        auto& group = pending->coalesced_groups.front();
570
571
        // Filter out blocks that were already read (not in pending set anymore)
572
0
        std::vector<size_t> remaining_blocks;
573
0
        size_t remaining_bytes = 0;
574
0
        for (size_t idx : group.block_indices) {
575
0
          if (pending->block_indices_to_prefetch.count(idx) > 0) {
576
0
            remaining_blocks.push_back(idx);
577
0
            remaining_bytes += read_set->block_sizes_[idx];
578
0
          }
579
0
        }
580
581
        // Skip empty groups (all blocks were already read)
582
0
        if (remaining_blocks.empty()) {
583
0
          pending->coalesced_groups.pop_front();
584
0
          continue;
585
0
        }
586
587
        // Try to acquire memory for remaining blocks only
588
0
        if (TryAcquireMemory(remaining_bytes)) {
589
          // Add all remaining blocks from this group to dispatch
590
0
          for (size_t idx : remaining_blocks) {
591
0
            blocks_to_dispatch.push_back(idx);
592
0
            pending->block_indices_to_prefetch.erase(idx);
593
0
          }
594
0
          pending->pending_bytes_ -= remaining_bytes;
595
0
          pending->coalesced_groups.pop_front();
596
0
        } else {
597
          // Not enough memory for this group - update with remaining blocks
598
0
          group.block_indices = std::move(remaining_blocks);
599
0
          group.total_bytes = remaining_bytes;
600
0
          has_remaining_groups = true;
601
0
          break;
602
0
        }
603
0
      }
604
0
    }
605
606
    // Save job before potential move of pending
607
0
    auto job = pending->job;
608
609
    // Requeue if groups remain
610
0
    if (has_remaining_groups) {
611
0
      MutexLock lock(&memory_mutex_);
612
0
      pending_prefetch_queue_.push_front(std::move(pending));
613
0
    } else {
614
      // All groups dispatched, clear pending state
615
0
      read_set->pending_request_.reset();
616
0
    }
617
618
    // Clear pending flags for dispatched blocks
619
0
    if (read_set->pending_prefetch_flags_) {
620
0
      for (size_t idx : blocks_to_dispatch) {
621
0
        if (idx < read_set->pending_prefetch_flags_size_) {
622
0
          read_set->pending_prefetch_flags_[idx].store(false);
623
0
        }
624
0
      }
625
0
    }
626
627
    // Dispatch acquired blocks
628
0
    if (!blocks_to_dispatch.empty()) {
629
0
      DispatchPrefetch(read_set, job, blocks_to_dispatch);
630
0
    }
631
632
    // If we dispatched nothing, stop (no memory available for any group)
633
0
    if (blocks_to_dispatch.empty()) {
634
0
      return;
635
0
    }
636
0
  }
637
0
}
638
639
void IODispatcherImpl::Impl::DispatchPrefetch(
640
    const std::shared_ptr<ReadSet>& read_set, const std::shared_ptr<IOJob>& job,
641
0
    const std::vector<size_t>& block_indices) {
642
  // Sync point for testing partial prefetch - passes number of blocks being
643
  // dispatched
644
0
  TEST_SYNC_POINT_CALLBACK("IODispatcherImpl::DispatchPrefetch:BlockCount",
645
0
                           const_cast<std::vector<size_t>*>(&block_indices));
646
647
  // Prepare and execute IO for the given blocks
648
0
  std::vector<FSReadRequest> read_reqs;
649
0
  std::vector<std::vector<size_t>> coalesced_block_indices;
650
0
  PrepareIORequests(job, block_indices, job->block_handles, &read_reqs,
651
0
                    &coalesced_block_indices);
652
653
0
  if (job->job_options.read_options.async_io) {
654
0
    Status async_status;
655
0
    std::vector<size_t> fallback_indices = ExecuteAsyncIO(
656
0
        job, read_set, read_reqs, coalesced_block_indices, &async_status);
657
658
    // For blocks where async is not supported, do sync IO
659
0
    if (!fallback_indices.empty()) {
660
0
      std::vector<FSReadRequest> sync_read_reqs;
661
0
      std::vector<std::vector<size_t>> sync_coalesced_indices;
662
0
      PrepareIORequests(job, fallback_indices, job->block_handles,
663
0
                        &sync_read_reqs, &sync_coalesced_indices);
664
      // Prefetch errors are ignored - user will get the error when reading
665
0
      Status s =
666
0
          ExecuteSyncIO(job, read_set, sync_read_reqs, sync_coalesced_indices);
667
0
      s.PermitUncheckedError();
668
0
      read_set->num_sync_reads_ += fallback_indices.size();
669
0
    }
670
    // Async errors are also ignored - user will get the error when reading
671
0
    async_status.PermitUncheckedError();
672
0
  } else {
673
    // Prefetch errors are ignored - user will get the error when reading
674
0
    Status s = ExecuteSyncIO(job, read_set, read_reqs, coalesced_block_indices);
675
0
    s.PermitUncheckedError();
676
0
    read_set->num_sync_reads_ += block_indices.size();
677
0
  }
678
0
}
679
680
Status IODispatcherImpl::Impl::SubmitJob(const std::shared_ptr<IOJob>& job,
681
0
                                         std::shared_ptr<ReadSet>* read_set) {
682
0
  if (!read_set) {
683
0
    return Status::InvalidArgument("read_set output parameter is null");
684
0
  }
685
686
0
  auto rs = std::make_shared<ReadSet>();
687
688
  // Initialize ReadSet
689
0
  rs->job_ = job;
690
0
  rs->fs_ = job->table->get_rep()->ioptions.env->GetFileSystem();
691
0
  rs->pinned_blocks_.resize(job->block_handles.size());
692
0
  rs->block_sizes_.resize(job->block_handles.size(), 0);
693
694
  // Build sorted index for O(log n) ReadOffset lookups via binary search.
695
  // sorted_block_indices_[i] = original index of i-th smallest block by offset.
696
0
  rs->sorted_block_indices_.resize(job->block_handles.size());
697
0
  for (size_t i = 0; i < job->block_handles.size(); ++i) {
698
0
    rs->sorted_block_indices_[i] = i;
699
0
  }
700
0
  std::sort(rs->sorted_block_indices_.begin(), rs->sorted_block_indices_.end(),
701
0
            [&job](size_t a, size_t b) {
702
0
              return job->block_handles[a].offset() <
703
0
                     job->block_handles[b].offset();
704
0
            });
705
706
  // Step 1: Check cache and pin cached blocks
707
0
  std::vector<size_t> block_indices_to_read;
708
709
0
  for (size_t i = 0; i < job->block_handles.size(); ++i) {
710
0
    const auto& data_block_handle = job->block_handles[i];
711
712
    // Lookup and pin block in cache
713
0
    Status s = job->table->LookupAndPinBlocksInCache<Block_kData>(
714
0
        job->job_options.read_options, data_block_handle,
715
0
        &(rs->pinned_blocks_)[i].As<Block_kData>());
716
717
0
    if (!s.ok()) {
718
0
      continue;
719
0
    }
720
721
0
    if (!(rs->pinned_blocks_)[i].GetValue()) {
722
      // Block not in cache - needs to be read from disk
723
0
      block_indices_to_read.emplace_back(i);
724
0
    }
725
0
  }
726
727
  // Step 2: Prepare IO requests for blocks not in cache
728
0
  if (block_indices_to_read.empty()) {
729
    // All blocks found in cache - count them as cache hits
730
0
    rs->num_cache_hits_ = job->block_handles.size();
731
0
    *read_set = std::move(rs);
732
0
    return Status::OK();
733
0
  }
734
735
  // Count cache hits (blocks that were found in cache during lookup above)
736
0
  rs->num_cache_hits_ =
737
0
      job->block_handles.size() - block_indices_to_read.size();
738
739
  // Calculate block sizes for uncached blocks
740
0
  for (const auto& idx : block_indices_to_read) {
741
0
    size_t block_size =
742
0
        BlockBasedTable::BlockSizeWithTrailer(job->block_handles[idx]);
743
0
    rs->block_sizes_[idx] = block_size;
744
0
  }
745
746
  // Store dispatcher reference for release callbacks
747
0
  rs->dispatcher_data_ = shared_from_this();
748
749
  // Pre-coalesce blocks into groups, respecting memory budget per group
750
  // This ensures we dispatch meaningful IO sizes, not tiny single-block IOs
751
  // Both memory-limited and non-memory-limited paths use the same coalescing
752
0
  auto coalesced_groups = PreCoalesceBlocks(job, rs, block_indices_to_read,
753
0
                                            max_prefetch_memory_bytes_);
754
755
0
  std::vector<size_t> blocks_to_dispatch;
756
0
  std::deque<CoalescedPrefetchGroup> groups_to_queue;
757
758
  // Try to acquire memory for entire coalesced groups
759
0
  for (auto& group : coalesced_groups) {
760
0
    if (TryAcquireMemory(group.total_bytes)) {
761
      // Add all blocks from this group to dispatch
762
0
      for (size_t idx : group.block_indices) {
763
0
        blocks_to_dispatch.push_back(idx);
764
0
      }
765
0
    } else {
766
      // Queue this group for later
767
0
      groups_to_queue.push_back(std::move(group));
768
0
    }
769
0
  }
770
771
  // Dispatch acquired blocks immediately
772
0
  if (!blocks_to_dispatch.empty()) {
773
0
    DispatchPrefetch(rs, job, blocks_to_dispatch);
774
0
  }
775
776
  // Queue remaining groups for later (only applies when memory limiting)
777
0
  if (!groups_to_queue.empty()) {
778
0
    auto pending = std::make_shared<PendingPrefetchRequest>();
779
0
    pending->read_set = rs;
780
0
    pending->job = job;
781
782
0
    size_t pending_bytes = 0;
783
0
    for (const auto& group : groups_to_queue) {
784
0
      for (size_t idx : group.block_indices) {
785
0
        pending->block_indices_to_prefetch.insert(idx);
786
0
      }
787
0
      pending_bytes += group.total_bytes;
788
0
    }
789
0
    pending->coalesced_groups = std::move(groups_to_queue);
790
0
    pending->pending_bytes_ = pending_bytes;
791
792
    // Set up pending flags for queued blocks only
793
0
    size_t num_blocks = job->block_handles.size();
794
0
    rs->pending_prefetch_flags_ =
795
0
        std::make_unique<std::atomic<bool>[]>(num_blocks);
796
0
    rs->pending_prefetch_flags_size_ = num_blocks;
797
0
    for (size_t idx : pending->block_indices_to_prefetch) {
798
0
      rs->pending_prefetch_flags_[idx].store(true);
799
0
    }
800
0
    rs->pending_request_ = pending;
801
802
0
    {
803
0
      MutexLock lock(&memory_mutex_);
804
0
      pending_prefetch_queue_.push_back(std::move(pending));
805
0
      has_pending_requests_.store(true, std::memory_order_release);
806
0
    }
807
0
  }
808
809
0
  *read_set = std::move(rs);
810
0
  return Status::OK();
811
0
}
812
813
void IODispatcherImpl::Impl::PrepareIORequests(
814
    const std::shared_ptr<IOJob>& job,
815
    const std::vector<size_t>& block_indices_to_read,
816
    const std::vector<BlockHandle>& block_handles,
817
    std::vector<FSReadRequest>* read_reqs,
818
0
    std::vector<std::vector<size_t>>* coalesced_block_indices) {
819
  // This is necessary because block handles may not be in sorted order
820
0
  std::vector<size_t> sorted_block_indices = block_indices_to_read;
821
0
  std::sort(sorted_block_indices.begin(), sorted_block_indices.end(),
822
0
            [&block_handles](size_t a, size_t b) {
823
0
              return block_handles[a].offset() < block_handles[b].offset();
824
0
            });
825
826
0
  assert(coalesced_block_indices->empty());
827
0
  coalesced_block_indices->resize(1);
828
829
0
  for (const auto& block_idx : sorted_block_indices) {
830
0
    if (!coalesced_block_indices->back().empty()) {
831
      // Check if we can coalesce with previous block
832
0
      const auto& last_block_handle =
833
0
          block_handles[coalesced_block_indices->back().back()];
834
0
      uint64_t last_block_end =
835
0
          last_block_handle.offset() +
836
0
          BlockBasedTable::BlockSizeWithTrailer(last_block_handle);
837
0
      uint64_t current_start = block_handles[block_idx].offset();
838
839
0
      if (current_start >
840
0
          last_block_end + job->job_options.io_coalesce_threshold) {
841
        // Gap too large - start new IO request
842
0
        coalesced_block_indices->emplace_back();
843
0
      }
844
0
    }
845
0
    coalesced_block_indices->back().emplace_back(block_idx);
846
0
  }
847
848
  // Create FSReadRequest for each coalesced group
849
0
  assert(read_reqs->empty());
850
0
  read_reqs->reserve(coalesced_block_indices->size());
851
852
0
  for (const auto& block_indices : *coalesced_block_indices) {
853
0
    assert(!block_indices.empty());
854
855
    // Find the min and max offsets in this coalesced group
856
    // Since blocks are now sorted, first has min offset and last has max
857
0
    const auto& first_block_handle = block_handles[block_indices[0]];
858
0
    const auto& last_block_handle = block_handles[block_indices.back()];
859
860
0
    const auto start_offset = first_block_handle.offset();
861
0
    const auto end_offset =
862
0
        last_block_handle.offset() +
863
0
        BlockBasedTable::BlockSizeWithTrailer(last_block_handle);
864
865
0
    assert(end_offset > start_offset);
866
867
0
    read_reqs->emplace_back();
868
0
    read_reqs->back().offset = start_offset;
869
0
    read_reqs->back().len = end_offset - start_offset;
870
0
    read_reqs->back().scratch = nullptr;
871
0
  }
872
0
}
873
874
std::vector<CoalescedPrefetchGroup> IODispatcherImpl::Impl::PreCoalesceBlocks(
875
    const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& rs,
876
0
    const std::vector<size_t>& block_indices, size_t max_group_bytes) {
877
0
  std::vector<CoalescedPrefetchGroup> groups;
878
879
0
  if (block_indices.empty()) {
880
0
    return groups;
881
0
  }
882
883
0
  const auto& block_handles = job->block_handles;
884
0
  const uint64_t coalesce_threshold = job->job_options.io_coalesce_threshold;
885
886
  // Sort block indices by offset for coalescing
887
0
  std::vector<size_t> sorted_indices = block_indices;
888
0
  std::sort(sorted_indices.begin(), sorted_indices.end(),
889
0
            [&block_handles](size_t a, size_t b) {
890
0
              return block_handles[a].offset() < block_handles[b].offset();
891
0
            });
892
893
  // Build coalesced groups respecting max_group_bytes
894
0
  groups.emplace_back();
895
896
0
  for (size_t idx : sorted_indices) {
897
0
    size_t block_size = rs->block_sizes_[idx];
898
899
    // Skip blocks that are individually larger than the memory budget
900
    // These will be read synchronously when needed (via ReadIndex fallback)
901
0
    if (max_group_bytes > 0 && block_size > max_group_bytes) {
902
0
      continue;
903
0
    }
904
905
    // Check if we need to start a new group
906
0
    bool start_new_group = false;
907
908
0
    if (!groups.back().block_indices.empty()) {
909
      // Check gap with previous block
910
0
      size_t last_idx = groups.back().block_indices.back();
911
0
      const auto& last_handle = block_handles[last_idx];
912
0
      uint64_t last_end = last_handle.offset() +
913
0
                          BlockBasedTable::BlockSizeWithTrailer(last_handle);
914
0
      uint64_t current_start = block_handles[idx].offset();
915
916
0
      if (current_start > last_end + coalesce_threshold) {
917
0
        start_new_group = true;  // Gap too large
918
0
      } else if (max_group_bytes > 0 &&
919
0
                 groups.back().total_bytes + block_size > max_group_bytes) {
920
0
        start_new_group = true;  // Would exceed size limit
921
0
      }
922
0
    }
923
924
0
    if (start_new_group) {
925
0
      groups.emplace_back();
926
0
    }
927
928
0
    groups.back().block_indices.push_back(idx);
929
0
    groups.back().total_bytes += block_size;
930
0
  }
931
932
0
  return groups;
933
0
}
934
935
std::vector<size_t> IODispatcherImpl::Impl::ExecuteAsyncIO(
936
    const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& read_set,
937
    std::vector<FSReadRequest>& read_reqs,
938
    const std::vector<std::vector<size_t>>& coalesced_block_indices,
939
0
    Status* out_status) {
940
0
  std::vector<size_t> fallback_block_indices;
941
0
  *out_status = Status::OK();
942
943
  // Get file and IO options
944
0
  auto* rep = job->table->get_rep();
945
0
  IOOptions io_opts;
946
0
  Status s =
947
0
      rep->file->PrepareIOOptions(job->job_options.read_options, io_opts);
948
0
  if (!s.ok()) {
949
0
    *out_status = s;
950
0
    return fallback_block_indices;
951
0
  }
952
953
0
  const bool direct_io = rep->file->use_direct_io();
954
955
  // Submit async read requests and store them in the ReadSet
956
0
  for (size_t i = 0; i < read_reqs.size(); ++i) {
957
0
    auto async_state = std::make_shared<AsyncIOState>();
958
959
0
    async_state->offset = read_reqs[i].offset;
960
0
    async_state->block_indices = coalesced_block_indices[i];
961
0
    async_state->read_req = std::move(read_reqs[i]);
962
963
0
    for (const auto idx : coalesced_block_indices[i]) {
964
0
      async_state->blocks.emplace_back(job->block_handles[idx]);
965
0
    }
966
967
0
    if (direct_io) {
968
0
      async_state->read_req.scratch = nullptr;
969
0
    } else {
970
0
      async_state->buf.reset(new char[async_state->read_req.len]);
971
0
      async_state->read_req.scratch = async_state->buf.get();
972
0
    }
973
974
    // Callback for async read completion
975
    // Store the result slice and status back into async_state so we can access
976
    // them after Poll() completes.
977
0
    auto cb = [](const FSReadRequest& req, void* cb_arg) {
978
0
      auto* state = static_cast<AsyncIOState*>(cb_arg);
979
0
      state->read_req.result = req.result;
980
0
      state->read_req.status = req.status;
981
0
    };
982
983
0
    s = rep->file->ReadAsync(async_state->read_req, io_opts, cb,
984
0
                             async_state.get(), &async_state->io_handle,
985
0
                             &async_state->del_fn,
986
0
                             direct_io ? &async_state->aligned_buf : nullptr);
987
988
0
    if (s.IsNotSupported()) {
989
      // Async IO may be compiled in but unavailable at runtime. Fall back to
990
      // the synchronous coalesced path for these blocks.
991
0
      for (const auto idx : coalesced_block_indices[i]) {
992
0
        fallback_block_indices.push_back(idx);
993
0
      }
994
0
      continue;
995
0
    }
996
997
0
    if (!s.ok()) {
998
      // Actual error - surface to caller
999
0
      *out_status = s;
1000
0
      return fallback_block_indices;
1001
0
    }
1002
1003
0
    if (async_state->io_handle == nullptr) {
1004
      // Async IO not supported - add to fallback list for sync IO
1005
0
      for (const auto idx : coalesced_block_indices[i]) {
1006
0
        fallback_block_indices.push_back(idx);
1007
0
      }
1008
0
      continue;
1009
0
    }
1010
1011
    // Add async state to map for all blocks in this request
1012
0
    for (const auto idx : async_state->block_indices) {
1013
0
      read_set->async_io_map_[idx] = async_state;
1014
0
    }
1015
0
  }
1016
1017
0
  return fallback_block_indices;
1018
0
}
1019
1020
Status IODispatcherImpl::Impl::ExecuteSyncIO(
1021
    const std::shared_ptr<IOJob>& job, const std::shared_ptr<ReadSet>& read_set,
1022
    std::vector<FSReadRequest>& read_reqs,
1023
0
    const std::vector<std::vector<size_t>>& coalesced_block_indices) {
1024
  // Get file and IO options
1025
0
  auto* rep = job->table->get_rep();
1026
0
  IOOptions io_opts;
1027
0
  if (Status s =
1028
0
          rep->file->PrepareIOOptions(job->job_options.read_options, io_opts);
1029
0
      !s.ok()) {
1030
0
    return s;
1031
0
  }
1032
1033
0
  const bool direct_io = rep->file->use_direct_io();
1034
1035
  // Setup scratch buffers for MultiRead
1036
0
  std::unique_ptr<char[]> buf;
1037
1038
0
  if (direct_io) {
1039
0
    for (auto& read_req : read_reqs) {
1040
0
      read_req.scratch = nullptr;
1041
0
    }
1042
0
  } else {
1043
    // Allocate a single contiguous buffer for all requests
1044
0
    size_t total_len = 0;
1045
0
    for (const auto& req : read_reqs) {
1046
0
      total_len += req.len;
1047
0
    }
1048
0
    buf.reset(new char[total_len]);
1049
0
    size_t offset = 0;
1050
0
    for (auto& read_req : read_reqs) {
1051
0
      read_req.scratch = buf.get() + offset;
1052
0
      offset += read_req.len;
1053
0
    }
1054
0
  }
1055
1056
  // Execute MultiRead
1057
0
  AlignedBuf aligned_buf;
1058
0
  if (Status s =
1059
0
          rep->file->MultiRead(io_opts, read_reqs.data(), read_reqs.size(),
1060
0
                               direct_io ? &aligned_buf : nullptr);
1061
0
      !s.ok()) {
1062
0
    return s;
1063
0
  }
1064
1065
0
  for (const auto& rq : read_reqs) {
1066
0
    if (!rq.status.ok()) {
1067
0
      return rq.status;
1068
0
    }
1069
0
  }
1070
1071
  // Process all blocks from the MultiRead results
1072
0
  for (size_t i = 0; i < coalesced_block_indices.size(); ++i) {
1073
0
    const auto& read_req = read_reqs[i];
1074
0
    for (const auto& block_idx : coalesced_block_indices[i]) {
1075
0
      const auto& block_handle = job->block_handles[block_idx];
1076
1077
0
      Status create_status = CreateAndPinBlockFromBuffer(
1078
0
          job, block_handle, read_req.offset, read_req.result,
1079
0
          read_set->pinned_blocks_[block_idx]);
1080
0
      if (!create_status.ok()) {
1081
0
        return create_status;
1082
0
      }
1083
0
    }
1084
0
  }
1085
1086
0
  return Status::OK();
1087
0
}
1088
1089
IODispatcherImpl::IODispatcherImpl()
1090
0
    : impl_(std::make_shared<Impl>(IODispatcherOptions())) {}
1091
1092
IODispatcherImpl::IODispatcherImpl(const IODispatcherOptions& options)
1093
0
    : impl_(std::make_shared<Impl>(options)) {}
1094
1095
0
IODispatcherImpl::~IODispatcherImpl() = default;
1096
1097
Status IODispatcherImpl::SubmitJob(const std::shared_ptr<IOJob>& job,
1098
0
                                   std::shared_ptr<ReadSet>* read_set) {
1099
0
  return impl_->SubmitJob(job, read_set);
1100
0
}
1101
1102
0
IODispatcher* NewIODispatcher() { return new IODispatcherImpl(); }
1103
1104
0
IODispatcher* NewIODispatcher(const IODispatcherOptions& options) {
1105
0
  return new IODispatcherImpl(options);
1106
0
}
1107
1108
}  // namespace ROCKSDB_NAMESPACE