/src/rocksdb/file/file_prefetch_buffer.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #pragma once |
11 | | |
12 | | #include <algorithm> |
13 | | #include <atomic> |
14 | | #include <deque> |
15 | | #include <sstream> |
16 | | #include <string> |
17 | | |
18 | | #include "file/readahead_file_info.h" |
19 | | #include "monitoring/statistics_impl.h" |
20 | | #include "port/port.h" |
21 | | #include "rocksdb/env.h" |
22 | | #include "rocksdb/file_system.h" |
23 | | #include "rocksdb/options.h" |
24 | | #include "util/aligned_buffer.h" |
25 | | #include "util/autovector.h" |
26 | | #include "util/stop_watch.h" |
27 | | |
28 | | namespace ROCKSDB_NAMESPACE { |
29 | | |
30 | | #define DEFAULT_DECREMENT 8 * 1024 |
31 | | |
32 | | struct IOOptions; |
33 | | class RandomAccessFileReader; |
34 | | |
35 | | struct ReadaheadParams { |
36 | 44.0k | ReadaheadParams() {} |
37 | | |
38 | | // The initial readahead size. |
39 | | size_t initial_readahead_size = 0; |
40 | | |
41 | | // The maximum readahead size. |
42 | | // If max_readahead_size > readahead_size, then readahead size will be doubled |
43 | | // on every IO until max_readahead_size is hit. Typically this is set as a |
44 | | // multiple of initial_readahead_size. initial_readahead_size should be |
45 | | // greater than equal to initial_readahead_size. |
46 | | size_t max_readahead_size = 0; |
47 | | |
48 | | // If true, Readahead is enabled implicitly by rocksdb |
49 | | // after doing sequential scans for num_file_reads_for_auto_readahead. |
50 | | bool implicit_auto_readahead = false; |
51 | | |
52 | | // TODO akanksha - Remove num_file_reads when BlockPrefetcher is refactored. |
53 | | uint64_t num_file_reads = 0; |
54 | | uint64_t num_file_reads_for_auto_readahead = 0; |
55 | | |
56 | | // Number of buffers to maintain that contains prefetched data. If num_buffers |
57 | | // > 1 then buffers will be filled asynchronously whenever they get emptied. |
58 | | size_t num_buffers = 1; |
59 | | }; |
60 | | |
61 | | struct BufferInfo { |
62 | 0 | void ClearBuffer() { |
63 | 0 | buffer_.Clear(); |
64 | 0 | initial_end_offset_ = 0; |
65 | 0 | async_req_len_ = 0; |
66 | 0 | } |
67 | | |
68 | | AlignedBuffer buffer_; |
69 | | |
70 | | uint64_t offset_ = 0; |
71 | | |
72 | | // Below parameters are used in case of async read flow. |
73 | | // Length requested for in ReadAsync. |
74 | | size_t async_req_len_ = 0; |
75 | | |
76 | | // async_read_in_progress can be used as mutex. Callback can update the buffer |
77 | | // and its size but async_read_in_progress is only set by main thread. |
78 | | bool async_read_in_progress_ = false; |
79 | | |
80 | | // io_handle is allocated and used by underlying file system in case of |
81 | | // asynchronous reads. |
82 | | void* io_handle_ = nullptr; |
83 | | |
84 | | IOHandleDeleter del_fn_ = nullptr; |
85 | | |
86 | | // initial_end_offset is used to keep track of the end offset of the buffer |
87 | | // that was originally called. It's helpful in case of autotuning of readahead |
88 | | // size when callback is made to BlockBasedTableIterator. |
89 | | // initial end offset of this buffer which will be the starting |
90 | | // offset of next prefetch. |
91 | | // |
92 | | // For example - if end offset of previous buffer was 100 and because of |
93 | | // readahead_size optimization, end_offset was trimmed to 60. Then for next |
94 | | // prefetch call, start_offset should be intialized to 100 i.e start_offset = |
95 | | // buf->initial_end_offset_. |
96 | | uint64_t initial_end_offset_ = 0; |
97 | | |
98 | 0 | bool IsDataBlockInBuffer(uint64_t offset, size_t length) { |
99 | 0 | assert(async_read_in_progress_ == false); |
100 | 0 | return (offset >= offset_ && |
101 | 0 | offset + length <= offset_ + buffer_.CurrentSize()); |
102 | 0 | } |
103 | | |
104 | 0 | bool IsOffsetInBuffer(uint64_t offset) { |
105 | 0 | assert(async_read_in_progress_ == false); |
106 | 0 | return (offset >= offset_ && offset < offset_ + buffer_.CurrentSize()); |
107 | 0 | } |
108 | | |
109 | 0 | bool DoesBufferContainData() { |
110 | 0 | assert(async_read_in_progress_ == false); |
111 | 0 | return buffer_.CurrentSize() > 0; |
112 | 0 | } |
113 | | |
114 | 0 | bool IsBufferOutdated(uint64_t offset) { |
115 | 0 | return (!async_read_in_progress_ && DoesBufferContainData() && |
116 | 0 | offset >= offset_ + buffer_.CurrentSize()); |
117 | 0 | } |
118 | | |
119 | 0 | bool IsBufferOutdatedWithAsyncProgress(uint64_t offset) { |
120 | 0 | return (async_read_in_progress_ && io_handle_ != nullptr && |
121 | 0 | offset >= offset_ + async_req_len_); |
122 | 0 | } |
123 | | |
124 | 0 | bool IsOffsetInBufferWithAsyncProgress(uint64_t offset) { |
125 | 0 | return (async_read_in_progress_ && offset >= offset_ && |
126 | 0 | offset < offset_ + async_req_len_); |
127 | 0 | } |
128 | | |
129 | 0 | size_t CurrentSize() { return buffer_.CurrentSize(); } |
130 | | }; |
131 | | |
132 | | enum class FilePrefetchBufferUsage { |
133 | | kTableOpenPrefetchTail, |
134 | | kUserScanPrefetch, |
135 | | kUnknown, |
136 | | }; |
137 | | |
138 | | // Implementation: |
139 | | // FilePrefetchBuffer maintains a dequeu of free buffers (free_bufs_) with no |
140 | | // data and bufs_ which contains the prefetched data. Whenever a buffer is |
141 | | // consumed or is outdated (w.r.t. to requested offset), that buffer is cleared |
142 | | // and returned to free_bufs_. |
143 | | // |
144 | | // If a buffer is available in free_bufs_, it's moved to bufs_ and is sent for |
145 | | // prefetching. |
146 | | // num_buffers_ defines how many buffers FilePrefetchBuffer can maintain at a |
147 | | // time that contains prefetched data with num_buffers_ == bufs_.size() + |
148 | | // free_bufs_.size(). |
149 | | // |
150 | | // If num_buffers_ == 1, it's a sequential read flow. Read API will be called on |
151 | | // that one buffer whenever the data is requested and is not in the buffer. |
152 | | // If num_buffers_ > 1, then the data is prefetched asynchronosuly in the |
153 | | // buffers whenever the data is consumed from the buffers and that buffer is |
154 | | // freed. |
155 | | // If num_buffers > 1, then requested data can be overlapping between 2 buffers. |
156 | | // To return the continuous buffer, overlap_buf_ is used. The requested data is |
157 | | // copied from 2 buffers to the overlap_buf_ and overlap_buf_ is returned to |
158 | | // the caller. |
159 | | |
160 | | // FilePrefetchBuffer is a smart buffer to store and read data from a file. |
161 | | class FilePrefetchBuffer { |
162 | | public: |
163 | | // Constructor. |
164 | | // |
165 | | // All arguments are optional. |
166 | | // ReadaheadParams : Parameters to control the readahead behavior. |
167 | | // enable : controls whether reading from the buffer is enabled. |
168 | | // If false, TryReadFromCache() always return false, and we |
169 | | // only take stats for the minimum offset if |
170 | | // track_min_offset = true. |
171 | | // See below NOTE about mmap reads. |
172 | | // track_min_offset : Track the minimum offset ever read and collect stats on |
173 | | // it. Used for adaptable readahead of the file |
174 | | // footer/metadata. |
175 | | // |
176 | | // A user can construct a FilePrefetchBuffer without any arguments, but use |
177 | | // `Prefetch` to load data into the buffer. |
178 | | // NOTE: FilePrefetchBuffer is incompatible with prefetching from |
179 | | // RandomAccessFileReaders using mmap reads, so it is common to use |
180 | | // `!use_mmap_reads` for the `enable` parameter. |
181 | | FilePrefetchBuffer( |
182 | | const ReadaheadParams& readahead_params = {}, bool enable = true, |
183 | | bool track_min_offset = false, FileSystem* fs = nullptr, |
184 | | SystemClock* clock = nullptr, Statistics* stats = nullptr, |
185 | | const std::function<void(bool, uint64_t&, uint64_t&)>& cb = nullptr, |
186 | | FilePrefetchBufferUsage usage = FilePrefetchBufferUsage::kUnknown) |
187 | | : readahead_size_(readahead_params.initial_readahead_size), |
188 | | initial_auto_readahead_size_(readahead_params.initial_readahead_size), |
189 | | max_readahead_size_(readahead_params.max_readahead_size), |
190 | | min_offset_read_(std::numeric_limits<size_t>::max()), |
191 | | enable_(enable), |
192 | | track_min_offset_(track_min_offset), |
193 | | implicit_auto_readahead_(readahead_params.implicit_auto_readahead), |
194 | | prev_offset_(0), |
195 | | prev_len_(0), |
196 | | num_file_reads_for_auto_readahead_( |
197 | | readahead_params.num_file_reads_for_auto_readahead), |
198 | | num_file_reads_(readahead_params.num_file_reads), |
199 | | explicit_prefetch_submitted_(false), |
200 | | fs_(fs), |
201 | | clock_(clock), |
202 | | stats_(stats), |
203 | | usage_(usage), |
204 | | readaheadsize_cb_(cb), |
205 | 26.3k | num_buffers_(readahead_params.num_buffers) { |
206 | 26.3k | assert((num_file_reads_ >= num_file_reads_for_auto_readahead_ + 1) || |
207 | 26.3k | (num_file_reads_ == 0)); |
208 | | |
209 | | // If num_buffers_ > 1, data is asynchronously filled in the |
210 | | // queue. As result, data can be overlapping in two buffers. It copies the |
211 | | // data to overlap_buf_ in order to to return continuous buffer. |
212 | 26.3k | if (num_buffers_ > 1) { |
213 | 0 | overlap_buf_ = new BufferInfo(); |
214 | 0 | } |
215 | | |
216 | 26.3k | free_bufs_.resize(num_buffers_); |
217 | 52.6k | for (uint32_t i = 0; i < num_buffers_; i++) { |
218 | 26.3k | free_bufs_[i] = new BufferInfo(); |
219 | 26.3k | } |
220 | 26.3k | } |
221 | | |
222 | 26.3k | ~FilePrefetchBuffer() { |
223 | | // Abort any pending async read request before destroying the class object. |
224 | 26.3k | if (fs_ != nullptr) { |
225 | 0 | std::vector<void*> handles; |
226 | 0 | for (auto& buf : bufs_) { |
227 | 0 | if (buf->async_read_in_progress_ && buf->io_handle_ != nullptr) { |
228 | 0 | handles.emplace_back(buf->io_handle_); |
229 | 0 | } |
230 | 0 | } |
231 | 0 | if (!handles.empty()) { |
232 | 0 | StopWatch sw(clock_, stats_, ASYNC_PREFETCH_ABORT_MICROS); |
233 | 0 | Status s = fs_->AbortIO(handles); |
234 | 0 | assert(s.ok()); |
235 | 0 | } |
236 | |
|
237 | 0 | for (auto& buf : bufs_) { |
238 | 0 | if (buf->io_handle_ != nullptr) { |
239 | 0 | DestroyAndClearIOHandle(buf); |
240 | 0 | buf->ClearBuffer(); |
241 | 0 | } |
242 | 0 | buf->async_read_in_progress_ = false; |
243 | 0 | } |
244 | 0 | } |
245 | | |
246 | | // Prefetch buffer bytes discarded. |
247 | 26.3k | uint64_t bytes_discarded = 0; |
248 | | // Iterated over buffers. |
249 | 26.3k | for (auto& buf : bufs_) { |
250 | 0 | if (buf->DoesBufferContainData()) { |
251 | | // If last read was from this block and some bytes are still unconsumed. |
252 | 0 | if (prev_offset_ >= buf->offset_ && |
253 | 0 | prev_offset_ + prev_len_ < buf->offset_ + buf->CurrentSize()) { |
254 | 0 | bytes_discarded += |
255 | 0 | buf->CurrentSize() - (prev_offset_ + prev_len_ - buf->offset_); |
256 | 0 | } |
257 | | // If last read was from previous blocks and this block is unconsumed. |
258 | 0 | else if (prev_offset_ < buf->offset_ && |
259 | 0 | prev_offset_ + prev_len_ <= buf->offset_) { |
260 | 0 | bytes_discarded += buf->CurrentSize(); |
261 | 0 | } |
262 | 0 | } |
263 | 0 | } |
264 | | |
265 | 26.3k | RecordInHistogram(stats_, PREFETCHED_BYTES_DISCARDED, bytes_discarded); |
266 | | |
267 | 26.3k | for (auto& buf : bufs_) { |
268 | 0 | delete buf; |
269 | 0 | buf = nullptr; |
270 | 0 | } |
271 | | |
272 | 26.3k | for (auto& buf : free_bufs_) { |
273 | 26.3k | delete buf; |
274 | 26.3k | buf = nullptr; |
275 | 26.3k | } |
276 | | |
277 | 26.3k | if (overlap_buf_ != nullptr) { |
278 | 0 | delete overlap_buf_; |
279 | 0 | overlap_buf_ = nullptr; |
280 | 0 | } |
281 | 26.3k | } |
282 | | |
283 | 0 | bool Enabled() const { return enable_; } |
284 | | |
285 | | // Called externally by user to only load data into the buffer from a file |
286 | | // with num_buffers_ should be set to default(1). |
287 | | // |
288 | | // opts : the IO options to use. |
289 | | // reader : the file reader. |
290 | | // offset : the file offset to start reading from. |
291 | | // n : the number of bytes to read. |
292 | | // |
293 | | Status Prefetch(const IOOptions& opts, RandomAccessFileReader* reader, |
294 | | uint64_t offset, size_t n); |
295 | | |
296 | | // Request for reading the data from a file asynchronously. |
297 | | // If data already exists in the buffer, result will be updated. |
298 | | // reader : the file reader. |
299 | | // offset : the file offset to start reading from. |
300 | | // n : the number of bytes to read. |
301 | | // result : if data already exists in the buffer, result will |
302 | | // be updated with the data. |
303 | | // |
304 | | // If data already exist in the buffer, it will return Status::OK, otherwise |
305 | | // it will send asynchronous request and return Status::TryAgain. |
306 | | Status PrefetchAsync(const IOOptions& opts, RandomAccessFileReader* reader, |
307 | | uint64_t offset, size_t n, Slice* result); |
308 | | |
309 | | // Tries returning the data for a file read from this buffer if that data is |
310 | | // in the buffer. |
311 | | // It handles tracking the minimum read offset if track_min_offset = true. |
312 | | // It also does the exponential readahead when readahead_size is set as part |
313 | | // of the constructor. |
314 | | // |
315 | | // opts : the IO options to use. |
316 | | // reader : the file reader. |
317 | | // offset : the file offset. |
318 | | // n : the number of bytes. |
319 | | // result : output buffer to put the data into. |
320 | | // s : output status. |
321 | | // for_compaction : true if cache read is done for compaction read. |
322 | | bool TryReadFromCache(const IOOptions& opts, RandomAccessFileReader* reader, |
323 | | uint64_t offset, size_t n, Slice* result, Status* s, |
324 | | bool for_compaction = false); |
325 | | |
326 | | // The minimum `offset` ever passed to TryReadFromCache(). This will nly be |
327 | | // tracked if track_min_offset = true. |
328 | 26.2k | size_t min_offset_read() const { return min_offset_read_; } |
329 | | |
330 | 0 | size_t GetPrefetchOffset() const { return bufs_.front()->offset_; } |
331 | | |
332 | | // Called in case of implicit auto prefetching. |
333 | | void UpdateReadPattern(const uint64_t& offset, const size_t& len, |
334 | 0 | bool decrease_readaheadsize) { |
335 | 0 | if (decrease_readaheadsize) { |
336 | 0 | DecreaseReadAheadIfEligible(offset, len); |
337 | 0 | } |
338 | 0 | prev_offset_ = offset; |
339 | 0 | prev_len_ = len; |
340 | 0 | explicit_prefetch_submitted_ = false; |
341 | 0 | } |
342 | | |
343 | 0 | void GetReadaheadState(ReadaheadFileInfo::ReadaheadInfo* readahead_info) { |
344 | 0 | readahead_info->readahead_size = readahead_size_; |
345 | 0 | readahead_info->num_file_reads = num_file_reads_; |
346 | 0 | } |
347 | | |
348 | | void DecreaseReadAheadIfEligible(uint64_t offset, size_t size, |
349 | 0 | size_t value = DEFAULT_DECREMENT) { |
350 | 0 | if (bufs_.empty()) { |
351 | 0 | return; |
352 | 0 | } |
353 | | |
354 | | // Decrease the readahead_size if |
355 | | // - its enabled internally by RocksDB (implicit_auto_readahead_) and, |
356 | | // - readahead_size is greater than 0 and, |
357 | | // - this block would have called prefetch API if not found in cache for |
358 | | // which conditions are: |
359 | | // - few/no bytes are in buffer and, |
360 | | // - block is sequential with the previous read and, |
361 | | // - num_file_reads_ + 1 (including this read) > |
362 | | // num_file_reads_for_auto_readahead_ |
363 | | |
364 | 0 | size_t curr_size = bufs_.front()->async_read_in_progress_ |
365 | 0 | ? bufs_.front()->async_req_len_ |
366 | 0 | : bufs_.front()->CurrentSize(); |
367 | 0 | if (implicit_auto_readahead_ && readahead_size_ > 0) { |
368 | 0 | if ((offset + size > bufs_.front()->offset_ + curr_size) && |
369 | 0 | IsBlockSequential(offset) && |
370 | 0 | (num_file_reads_ + 1 > num_file_reads_for_auto_readahead_)) { |
371 | 0 | readahead_size_ = |
372 | 0 | std::max(initial_auto_readahead_size_, |
373 | 0 | (readahead_size_ >= value ? readahead_size_ - value : 0)); |
374 | 0 | } |
375 | 0 | } |
376 | 0 | } |
377 | | |
378 | | // Callback function passed to underlying FS in case of asynchronous reads. |
379 | | void PrefetchAsyncCallback(FSReadRequest& req, void* cb_arg); |
380 | | |
381 | | void TEST_GetBufferOffsetandSize( |
382 | 0 | std::vector<std::pair<uint64_t, size_t>>& buffer_info) { |
383 | 0 | for (size_t i = 0; i < bufs_.size(); i++) { |
384 | 0 | buffer_info[i].first = bufs_[i]->offset_; |
385 | 0 | buffer_info[i].second = bufs_[i]->async_read_in_progress_ |
386 | 0 | ? bufs_[i]->async_req_len_ |
387 | 0 | : bufs_[i]->CurrentSize(); |
388 | 0 | } |
389 | 0 | } |
390 | | |
391 | | private: |
392 | | // Calculates roundoff offset and length to be prefetched based on alignment |
393 | | // and data present in buffer_. It also allocates new buffer or refit tail if |
394 | | // required. |
395 | | void PrepareBufferForRead(BufferInfo* buf, size_t alignment, uint64_t offset, |
396 | | size_t roundup_len, bool refit_tail, |
397 | | uint64_t& aligned_useful_len); |
398 | | |
399 | | void AbortOutdatedIO(uint64_t offset); |
400 | | |
401 | | void AbortAllIOs(); |
402 | | |
403 | | void ClearOutdatedData(uint64_t offset, size_t len); |
404 | | |
405 | | // It calls Poll API to check for any pending asynchronous request. |
406 | | void PollIfNeeded(uint64_t offset, size_t len); |
407 | | |
408 | | Status PrefetchInternal(const IOOptions& opts, RandomAccessFileReader* reader, |
409 | | uint64_t offset, size_t length, size_t readahead_size, |
410 | | bool& copy_to_third_buffer); |
411 | | |
412 | | Status Read(BufferInfo* buf, const IOOptions& opts, |
413 | | RandomAccessFileReader* reader, uint64_t read_len, |
414 | | uint64_t aligned_useful_len, uint64_t start_offset); |
415 | | |
416 | | Status ReadAsync(BufferInfo* buf, const IOOptions& opts, |
417 | | RandomAccessFileReader* reader, uint64_t read_len, |
418 | | uint64_t start_offset); |
419 | | |
420 | | // Copy the data from src to overlap_buf_. |
421 | | void CopyDataToBuffer(BufferInfo* src, uint64_t& offset, size_t& length); |
422 | | |
423 | 0 | bool IsBlockSequential(const size_t& offset) { |
424 | 0 | return (prev_len_ == 0 || (prev_offset_ + prev_len_ == offset)); |
425 | 0 | } |
426 | | |
427 | | // Called in case of implicit auto prefetching. |
428 | 0 | void ResetValues() { |
429 | 0 | num_file_reads_ = 1; |
430 | 0 | readahead_size_ = initial_auto_readahead_size_; |
431 | 0 | } |
432 | | |
433 | | // Called in case of implicit auto prefetching. |
434 | 0 | bool IsEligibleForPrefetch(uint64_t offset, size_t n) { |
435 | | // Prefetch only if this read is sequential otherwise reset readahead_size_ |
436 | | // to initial value. |
437 | 0 | if (!IsBlockSequential(offset)) { |
438 | 0 | UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); |
439 | 0 | ResetValues(); |
440 | 0 | return false; |
441 | 0 | } |
442 | 0 | num_file_reads_++; |
443 | | |
444 | | // Since async request was submitted in last call directly by calling |
445 | | // PrefetchAsync, it skips num_file_reads_ check as this call is to poll the |
446 | | // data submitted in previous call. |
447 | 0 | if (explicit_prefetch_submitted_) { |
448 | 0 | return true; |
449 | 0 | } |
450 | 0 | if (num_file_reads_ <= num_file_reads_for_auto_readahead_) { |
451 | 0 | UpdateReadPattern(offset, n, false /*decrease_readaheadsize*/); |
452 | 0 | return false; |
453 | 0 | } |
454 | 0 | return true; |
455 | 0 | } |
456 | | |
457 | 0 | bool IsEligibleForFurtherPrefetching() { |
458 | 0 | if (free_bufs_.empty()) { |
459 | 0 | return false; |
460 | 0 | } |
461 | | // Readahead size can be 0 because of trimming. |
462 | 0 | if (readahead_size_ == 0) { |
463 | 0 | return false; |
464 | 0 | } |
465 | 0 | return true; |
466 | 0 | } |
467 | | |
468 | 0 | void DestroyAndClearIOHandle(BufferInfo* buf) { |
469 | 0 | if (buf->io_handle_ != nullptr && buf->del_fn_ != nullptr) { |
470 | 0 | buf->del_fn_(buf->io_handle_); |
471 | 0 | buf->io_handle_ = nullptr; |
472 | 0 | buf->del_fn_ = nullptr; |
473 | 0 | } |
474 | 0 | buf->async_read_in_progress_ = false; |
475 | 0 | } |
476 | | |
477 | | Status HandleOverlappingData(const IOOptions& opts, |
478 | | RandomAccessFileReader* reader, uint64_t offset, |
479 | | size_t length, size_t readahead_size, |
480 | | bool& copy_to_third_buffer, uint64_t& tmp_offset, |
481 | | size_t& tmp_length); |
482 | | |
483 | | bool TryReadFromCacheUntracked(const IOOptions& opts, |
484 | | RandomAccessFileReader* reader, |
485 | | uint64_t offset, size_t n, Slice* result, |
486 | | Status* s, |
487 | | bool for_compaction = false); |
488 | | |
489 | | void ReadAheadSizeTuning(BufferInfo* buf, bool read_curr_block, |
490 | | bool refit_tail, uint64_t prev_buf_end_offset, |
491 | | size_t alignment, size_t length, |
492 | | size_t readahead_size, uint64_t& offset, |
493 | | uint64_t& end_offset, size_t& read_len, |
494 | | uint64_t& aligned_useful_len); |
495 | | |
496 | 0 | void UpdateStats(bool found_in_buffer, size_t length_found) { |
497 | 0 | if (found_in_buffer) { |
498 | 0 | RecordTick(stats_, PREFETCH_HITS); |
499 | 0 | } |
500 | 0 | if (length_found > 0) { |
501 | 0 | RecordTick(stats_, PREFETCH_BYTES_USEFUL, length_found); |
502 | 0 | } |
503 | 0 | } |
504 | | |
505 | | void UpdateReadAheadTrimmedStat(size_t initial_length, |
506 | 0 | size_t updated_length) { |
507 | 0 | if (initial_length != updated_length) { |
508 | 0 | RecordTick(stats_, READAHEAD_TRIMMED); |
509 | 0 | } |
510 | 0 | } |
511 | | |
512 | | Status PrefetchRemBuffers(const IOOptions& opts, |
513 | | RandomAccessFileReader* reader, |
514 | | uint64_t end_offset1, size_t alignment, |
515 | | size_t readahead_size); |
516 | | |
517 | | // *** BEGIN APIs related to allocating and freeing buffers *** |
518 | 0 | bool IsBufferQueueEmpty() { return bufs_.empty(); } |
519 | | |
520 | 0 | BufferInfo* GetFirstBuffer() { return bufs_.front(); } |
521 | | |
522 | 0 | BufferInfo* GetLastBuffer() { return bufs_.back(); } |
523 | | |
524 | 0 | size_t NumBuffersAllocated() { return bufs_.size(); } |
525 | | |
526 | 0 | void AllocateBuffer() { |
527 | 0 | assert(!free_bufs_.empty()); |
528 | 0 | BufferInfo* buf = free_bufs_.front(); |
529 | 0 | free_bufs_.pop_front(); |
530 | 0 | bufs_.emplace_back(buf); |
531 | 0 | } |
532 | | |
533 | 0 | void AllocateBufferIfEmpty() { |
534 | 0 | if (bufs_.empty()) { |
535 | 0 | AllocateBuffer(); |
536 | 0 | } |
537 | 0 | } |
538 | | |
539 | 0 | void FreeFrontBuffer() { |
540 | 0 | BufferInfo* buf = bufs_.front(); |
541 | 0 | buf->ClearBuffer(); |
542 | 0 | bufs_.pop_front(); |
543 | 0 | free_bufs_.emplace_back(buf); |
544 | 0 | } |
545 | | |
546 | 0 | void FreeLastBuffer() { |
547 | 0 | BufferInfo* buf = bufs_.back(); |
548 | 0 | buf->ClearBuffer(); |
549 | 0 | bufs_.pop_back(); |
550 | 0 | free_bufs_.emplace_back(buf); |
551 | 0 | } |
552 | | |
553 | 0 | void FreeAllBuffers() { |
554 | 0 | while (!bufs_.empty()) { |
555 | 0 | BufferInfo* buf = bufs_.front(); |
556 | 0 | buf->ClearBuffer(); |
557 | 0 | bufs_.pop_front(); |
558 | 0 | free_bufs_.emplace_back(buf); |
559 | 0 | } |
560 | 0 | } |
561 | | |
562 | 0 | void FreeEmptyBuffers() { |
563 | 0 | if (bufs_.empty()) { |
564 | 0 | return; |
565 | 0 | } |
566 | | |
567 | 0 | std::deque<BufferInfo*> tmp_buf; |
568 | 0 | while (!bufs_.empty()) { |
569 | 0 | BufferInfo* buf = bufs_.front(); |
570 | 0 | bufs_.pop_front(); |
571 | 0 | if (buf->async_read_in_progress_ || buf->DoesBufferContainData()) { |
572 | 0 | tmp_buf.emplace_back(buf); |
573 | 0 | } else { |
574 | 0 | free_bufs_.emplace_back(buf); |
575 | 0 | } |
576 | 0 | } |
577 | 0 | bufs_ = tmp_buf; |
578 | 0 | } |
579 | | |
580 | | // *** END APIs related to allocating and freeing buffers *** |
581 | | |
582 | | std::deque<BufferInfo*> bufs_; |
583 | | std::deque<BufferInfo*> free_bufs_; |
584 | | BufferInfo* overlap_buf_ = nullptr; |
585 | | |
586 | | size_t readahead_size_; |
587 | | size_t initial_auto_readahead_size_; |
588 | | // FilePrefetchBuffer object won't be created from Iterator flow if |
589 | | // max_readahead_size_ = 0. |
590 | | size_t max_readahead_size_; |
591 | | |
592 | | // The minimum `offset` ever passed to TryReadFromCache(). |
593 | | size_t min_offset_read_; |
594 | | // if false, TryReadFromCache() always return false, and we only take stats |
595 | | // for track_min_offset_ if track_min_offset_ = true |
596 | | bool enable_; |
597 | | // If true, track minimum `offset` ever passed to TryReadFromCache(), which |
598 | | // can be fetched from min_offset_read(). |
599 | | bool track_min_offset_; |
600 | | |
601 | | // implicit_auto_readahead is enabled by rocksdb internally after 2 |
602 | | // sequential IOs. |
603 | | bool implicit_auto_readahead_; |
604 | | uint64_t prev_offset_; |
605 | | size_t prev_len_; |
606 | | // num_file_reads_ and num_file_reads_for_auto_readahead_ is only used when |
607 | | // implicit_auto_readahead_ is set. |
608 | | uint64_t num_file_reads_for_auto_readahead_; |
609 | | uint64_t num_file_reads_; |
610 | | |
611 | | // If explicit_prefetch_submitted_ is set then it indicates RocksDB called |
612 | | // PrefetchAsync to submit request. It needs to call TryReadFromCache to |
613 | | // poll the submitted request without checking if data is sequential and |
614 | | // num_file_reads_. |
615 | | bool explicit_prefetch_submitted_; |
616 | | |
617 | | FileSystem* fs_; |
618 | | SystemClock* clock_; |
619 | | Statistics* stats_; |
620 | | |
621 | | FilePrefetchBufferUsage usage_; |
622 | | |
623 | | std::function<void(bool, uint64_t&, uint64_t&)> readaheadsize_cb_; |
624 | | |
625 | | // num_buffers_ is the number of buffers maintained by FilePrefetchBuffer to |
626 | | // prefetch the data at a time. |
627 | | size_t num_buffers_; |
628 | | }; |
629 | | } // namespace ROCKSDB_NAMESPACE |