Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/file/sequence_file_reader.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "file/sequence_file_reader.h"
11
12
#include <algorithm>
13
#include <mutex>
14
15
#include "file/read_write_util.h"
16
#include "monitoring/histogram.h"
17
#include "monitoring/iostats_context_imp.h"
18
#include "port/port.h"
19
#include "rocksdb/file_system.h"
20
#include "test_util/sync_point.h"
21
#include "util/aligned_buffer.h"
22
#include "util/random.h"
23
#include "util/rate_limiter_impl.h"
24
25
namespace ROCKSDB_NAMESPACE {
26
IOStatus SequentialFileReader::Create(
27
    const std::shared_ptr<FileSystem>& fs, const std::string& fname,
28
    const FileOptions& file_opts, std::unique_ptr<SequentialFileReader>* reader,
29
0
    IODebugContext* dbg, RateLimiter* rate_limiter) {
30
0
  std::unique_ptr<FSSequentialFile> file;
31
0
  IOStatus io_s = fs->NewSequentialFile(fname, file_opts, &file, dbg);
32
0
  if (io_s.ok()) {
33
0
    reader->reset(new SequentialFileReader(std::move(file), fname, nullptr, {},
34
0
                                           rate_limiter));
35
0
  }
36
0
  return io_s;
37
0
}
38
39
IOStatus SequentialFileReader::Read(size_t n, Slice* result, char* scratch,
40
41.4k
                                    Env::IOPriority rate_limiter_priority) {
41
41.4k
  IOStatus io_s;
42
41.4k
  IOOptions io_opts;
43
41.4k
  io_opts.rate_limiter_priority = rate_limiter_priority;
44
41.4k
  io_opts.verify_and_reconstruct_read = verify_and_reconstruct_read_;
45
41.4k
  if (use_direct_io()) {
46
    //
47
    //    |-offset_advance-|---bytes returned--|
48
    //    |----------------------buf size-------------------------|
49
    //    |                |                   |                  |
50
    // aligned           offset          offset + n  Roundup(offset + n,
51
    // offset                                             alignment)
52
    //
53
0
    size_t offset = offset_.fetch_add(n);
54
0
    size_t alignment = file_->GetRequiredBufferAlignment();
55
0
    size_t aligned_offset = TruncateToPageBoundary(alignment, offset);
56
0
    size_t offset_advance = offset - aligned_offset;
57
0
    size_t size = Roundup(offset + n, alignment) - aligned_offset;
58
0
    size_t r = 0;
59
0
    AlignedBuffer buf;
60
0
    buf.Alignment(alignment);
61
0
    buf.AllocateNewBuffer(size);
62
63
0
    while (buf.CurrentSize() < size) {
64
0
      size_t allowed;
65
0
      if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
66
0
        allowed = rate_limiter_->RequestToken(
67
0
            buf.Capacity() - buf.CurrentSize(), buf.Alignment(),
68
0
            rate_limiter_priority, nullptr /* stats */,
69
0
            RateLimiter::OpType::kRead);
70
0
      } else {
71
0
        assert(buf.CurrentSize() == 0);
72
0
        allowed = size;
73
0
      }
74
75
0
      Slice tmp;
76
0
      uint64_t orig_offset = 0;
77
0
      FileOperationInfo::StartTimePoint start_ts;
78
0
      if (ShouldNotifyListeners()) {
79
0
        orig_offset = aligned_offset + buf.CurrentSize();
80
0
        start_ts = FileOperationInfo::StartNow();
81
0
      }
82
0
      io_s = file_->PositionedRead(aligned_offset + buf.CurrentSize(), allowed,
83
0
                                   io_opts, &tmp, buf.Destination(),
84
0
                                   nullptr /* dbg */);
85
0
      if (ShouldNotifyListeners()) {
86
0
        auto finish_ts = FileOperationInfo::FinishNow();
87
0
        NotifyOnFileReadFinish(orig_offset, tmp.size(), start_ts, finish_ts,
88
0
                               io_s);
89
0
      }
90
0
      buf.Size(buf.CurrentSize() + tmp.size());
91
0
      if (!io_s.ok() || tmp.size() < allowed) {
92
0
        break;
93
0
      }
94
0
    }
95
96
0
    if (io_s.ok() && offset_advance < buf.CurrentSize()) {
97
0
      r = buf.Read(scratch, offset_advance,
98
0
                   std::min(buf.CurrentSize() - offset_advance, n));
99
0
    }
100
0
    *result = Slice(scratch, r);
101
41.4k
  } else {
102
    // To be paranoid, modify scratch a little bit, so in case underlying
103
    // FileSystem doesn't fill the buffer but return success and `scratch`
104
    // returns contains a previous block, returned value will not pass
105
    // checksum.
106
    // It's hard to find useful byte for direct I/O case, so we skip it.
107
41.4k
    if (n > 0 && scratch != nullptr) {
108
41.4k
      scratch[0]++;
109
41.4k
    }
110
111
41.4k
    size_t read = 0;
112
54.1k
    while (read < n) {
113
41.4k
      size_t allowed;
114
41.4k
      if (rate_limiter_priority != Env::IO_TOTAL && rate_limiter_ != nullptr) {
115
0
        allowed = rate_limiter_->RequestToken(
116
0
            n - read, 0 /* alignment */, rate_limiter_priority,
117
0
            nullptr /* stats */, RateLimiter::OpType::kRead);
118
41.4k
      } else {
119
41.4k
        allowed = n;
120
41.4k
      }
121
41.4k
      FileOperationInfo::StartTimePoint start_ts;
122
41.4k
      if (ShouldNotifyListeners()) {
123
0
        start_ts = FileOperationInfo::StartNow();
124
0
      }
125
41.4k
      Slice tmp;
126
41.4k
      io_s = file_->Read(allowed, io_opts, &tmp, scratch + read,
127
41.4k
                         nullptr /* dbg */);
128
41.4k
      if (ShouldNotifyListeners()) {
129
0
        auto finish_ts = FileOperationInfo::FinishNow();
130
0
        size_t offset = offset_.fetch_add(tmp.size());
131
0
        NotifyOnFileReadFinish(offset, tmp.size(), start_ts, finish_ts, io_s);
132
0
      }
133
41.4k
      read += tmp.size();
134
41.4k
      if (!io_s.ok() || tmp.size() < allowed) {
135
28.7k
        break;
136
28.7k
      }
137
41.4k
    }
138
41.4k
    *result = Slice(scratch, read);
139
41.4k
  }
140
41.4k
  IOSTATS_ADD(bytes_read, result->size());
141
41.4k
  return io_s;
142
41.4k
}
143
144
0
IOStatus SequentialFileReader::Skip(uint64_t n) {
145
0
  if (use_direct_io()) {
146
0
    offset_ += static_cast<size_t>(n);
147
0
    return IOStatus::OK();
148
0
  }
149
0
  return file_->Skip(n);
150
0
}
151
152
namespace {
153
// This class wraps a SequentialFile, exposing same API, with the differenece
154
// of being able to prefetch up to readahead_size bytes and then serve them
155
// from memory, avoiding the entire round-trip if, for example, the data for the
156
// file is actually remote.
157
class ReadaheadSequentialFile : public FSSequentialFile {
158
 public:
159
  ReadaheadSequentialFile(std::unique_ptr<FSSequentialFile>&& file,
160
                          size_t readahead_size)
161
      : file_(std::move(file)),
162
        alignment_(file_->GetRequiredBufferAlignment()),
163
        readahead_size_(Roundup(readahead_size, alignment_)),
164
        buffer_(),
165
        buffer_offset_(0),
166
11.8k
        read_offset_(0) {
167
11.8k
    buffer_.Alignment(alignment_);
168
11.8k
    buffer_.AllocateNewBuffer(readahead_size_);
169
11.8k
  }
170
171
  ReadaheadSequentialFile(const ReadaheadSequentialFile&) = delete;
172
173
  ReadaheadSequentialFile& operator=(const ReadaheadSequentialFile&) = delete;
174
175
  IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
176
12.5k
                IODebugContext* dbg) override {
177
12.5k
    std::unique_lock<std::mutex> lk(lock_);
178
179
12.5k
    size_t cached_len = 0;
180
    // Check if there is a cache hit, meaning that [offset, offset + n) is
181
    // either completely or partially in the buffer. If it's completely cached,
182
    // including end of file case when offset + n is greater than EOF, then
183
    // return.
184
12.5k
    if (TryReadFromCache(n, &cached_len, scratch) &&
185
12.5k
        (cached_len == n || buffer_.CurrentSize() < readahead_size_)) {
186
      // We read exactly what we needed, or we hit end of file - return.
187
736
      *result = Slice(scratch, cached_len);
188
736
      return IOStatus::OK();
189
736
    }
190
11.8k
    n -= cached_len;
191
192
11.8k
    IOStatus s;
193
    // Read-ahead only make sense if we have some slack left after reading
194
11.8k
    if (n + alignment_ >= readahead_size_) {
195
0
      s = file_->Read(n, opts, result, scratch + cached_len, dbg);
196
0
      if (s.ok()) {
197
0
        read_offset_ += result->size();
198
0
        *result = Slice(scratch, cached_len + result->size());
199
0
      }
200
0
      buffer_.Clear();
201
0
      return s;
202
0
    }
203
204
11.8k
    s = ReadIntoBuffer(readahead_size_, opts, dbg);
205
11.8k
    if (s.ok()) {
206
      // The data we need is now in cache, so we can safely read it
207
11.8k
      size_t remaining_len;
208
11.8k
      TryReadFromCache(n, &remaining_len, scratch + cached_len);
209
11.8k
      *result = Slice(scratch, cached_len + remaining_len);
210
11.8k
    }
211
11.8k
    return s;
212
11.8k
  }
213
214
0
  IOStatus Skip(uint64_t n) override {
215
0
    std::unique_lock<std::mutex> lk(lock_);
216
0
    IOStatus s = IOStatus::OK();
217
    // First check if we need to skip already cached data
218
0
    if (buffer_.CurrentSize() > 0) {
219
      // Do we need to skip beyond cached data?
220
0
      if (read_offset_ + n >= buffer_offset_ + buffer_.CurrentSize()) {
221
        // Yes. Skip whaterver is in memory and adjust offset accordingly
222
0
        n -= buffer_offset_ + buffer_.CurrentSize() - read_offset_;
223
0
        read_offset_ = buffer_offset_ + buffer_.CurrentSize();
224
0
      } else {
225
        // No. The entire section to be skipped is entirely i cache.
226
0
        read_offset_ += n;
227
0
        n = 0;
228
0
      }
229
0
    }
230
0
    if (n > 0) {
231
      // We still need to skip more, so call the file API for skipping
232
0
      s = file_->Skip(n);
233
0
      if (s.ok()) {
234
0
        read_offset_ += n;
235
0
      }
236
0
      buffer_.Clear();
237
0
    }
238
0
    return s;
239
0
  }
240
241
  IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
242
                          Slice* result, char* scratch,
243
0
                          IODebugContext* dbg) override {
244
0
    return file_->PositionedRead(offset, n, opts, result, scratch, dbg);
245
0
  }
246
247
0
  IOStatus InvalidateCache(size_t offset, size_t length) override {
248
0
    std::unique_lock<std::mutex> lk(lock_);
249
0
    buffer_.Clear();
250
0
    return file_->InvalidateCache(offset, length);
251
0
  }
252
253
12.5k
  bool use_direct_io() const override { return file_->use_direct_io(); }
254
255
 private:
256
  // Tries to read from buffer_ n bytes. If anything was read from the cache, it
257
  // sets cached_len to the number of bytes actually read, copies these number
258
  // of bytes to scratch and returns true.
259
  // If nothing was read sets cached_len to 0 and returns false.
260
24.3k
  bool TryReadFromCache(size_t n, size_t* cached_len, char* scratch) {
261
24.3k
    if (read_offset_ < buffer_offset_ ||
262
24.3k
        read_offset_ >= buffer_offset_ + buffer_.CurrentSize()) {
263
11.8k
      *cached_len = 0;
264
11.8k
      return false;
265
11.8k
    }
266
12.5k
    uint64_t offset_in_buffer = read_offset_ - buffer_offset_;
267
12.5k
    *cached_len = std::min(
268
12.5k
        buffer_.CurrentSize() - static_cast<size_t>(offset_in_buffer), n);
269
12.5k
    memcpy(scratch, buffer_.BufferStart() + offset_in_buffer, *cached_len);
270
12.5k
    read_offset_ += *cached_len;
271
12.5k
    return true;
272
24.3k
  }
273
274
  // Reads into buffer_ the next n bytes from file_.
275
  // Can actually read less if EOF was reached.
276
  // Returns the status of the read operastion on the file.
277
  IOStatus ReadIntoBuffer(size_t n, const IOOptions& opts,
278
11.8k
                          IODebugContext* dbg) {
279
11.8k
    if (n > buffer_.Capacity()) {
280
0
      n = buffer_.Capacity();
281
0
    }
282
11.8k
    assert(IsFileSectorAligned(n, alignment_));
283
11.8k
    Slice result;
284
11.8k
    IOStatus s = file_->Read(n, opts, &result, buffer_.BufferStart(), dbg);
285
11.8k
    if (s.ok()) {
286
11.8k
      buffer_offset_ = read_offset_;
287
11.8k
      buffer_.Size(result.size());
288
11.8k
      assert(result.size() == 0 || buffer_.BufferStart() == result.data());
289
11.8k
    }
290
11.8k
    return s;
291
11.8k
  }
292
293
  const std::unique_ptr<FSSequentialFile> file_;
294
  const size_t alignment_;
295
  const size_t readahead_size_;
296
297
  std::mutex lock_;
298
  // The buffer storing the prefetched data
299
  AlignedBuffer buffer_;
300
  // The offset in file_, corresponding to data stored in buffer_
301
  uint64_t buffer_offset_;
302
  // The offset up to which data was read from file_. In fact, it can be larger
303
  // than the actual file size, since the file_->Skip(n) call doesn't return the
304
  // actual number of bytes that were skipped, which can be less than n.
305
  // This is not a problemm since read_offset_ is monotonically increasing and
306
  // its only use is to figure out if next piece of data should be read from
307
  // buffer_ or file_ directly.
308
  uint64_t read_offset_;
309
};
310
}  // namespace
311
312
std::unique_ptr<FSSequentialFile>
313
SequentialFileReader::NewReadaheadSequentialFile(
314
28.7k
    std::unique_ptr<FSSequentialFile>&& file, size_t readahead_size) {
315
28.7k
  if (file->GetRequiredBufferAlignment() >= readahead_size) {
316
    // Short-circuit and return the original file if readahead_size is
317
    // too small and hence doesn't make sense to be used for prefetching.
318
16.9k
    return std::move(file);
319
16.9k
  }
320
11.8k
  std::unique_ptr<FSSequentialFile> result(
321
11.8k
      new ReadaheadSequentialFile(std::move(file), readahead_size));
322
11.8k
  return result;
323
28.7k
}
324
}  // namespace ROCKSDB_NAMESPACE