Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/env/io_posix.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#pragma once
10
#include <errno.h>
11
#if defined(ROCKSDB_IOURING_PRESENT)
12
#include <liburing.h>
13
#include <sys/uio.h>
14
#endif
15
#include <unistd.h>
16
17
#include <atomic>
18
#include <functional>
19
#include <map>
20
#include <string>
21
22
#include "port/port.h"
23
#include "rocksdb/env.h"
24
#include "rocksdb/file_system.h"
25
#include "rocksdb/io_status.h"
26
#include "test_util/sync_point.h"
27
#include "util/mutexlock.h"
28
#include "util/thread_local.h"
29
30
// For non linux platform, the following macros are used only as place
31
// holder.
32
#if !(defined OS_LINUX) && !(defined OS_FREEBSD) && !(defined CYGWIN) && \
33
    !(defined OS_AIX)
34
#define POSIX_FADV_NORMAL 0     /* [MC1] no further special treatment */
35
#define POSIX_FADV_RANDOM 1     /* [MC1] expect random page refs */
36
#define POSIX_FADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
37
#define POSIX_FADV_WILLNEED 3   /* [MC1] will need these pages */
38
#define POSIX_FADV_DONTNEED 4   /* [MC1] don't need these pages */
39
40
#define POSIX_MADV_NORMAL 0     /* [MC1] no further special treatment */
41
#define POSIX_MADV_RANDOM 1     /* [MC1] expect random page refs */
42
#define POSIX_MADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
43
#define POSIX_MADV_WILLNEED 3   /* [MC1] will need these pages */
44
#define POSIX_MADV_DONTNEED 4   /* [MC1] don't need these pages */
45
#endif
46
47
namespace ROCKSDB_NAMESPACE {
48
std::string IOErrorMsg(const std::string& context,
49
                       const std::string& file_name);
50
// file_name can be left empty if it is not unkown.
51
IOStatus IOError(const std::string& context, const std::string& file_name,
52
                 int err_number);
53
54
class PosixHelper {
55
 public:
56
  static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size);
57
  static size_t GetLogicalBlockSizeOfFd(int fd);
58
  static Status GetLogicalBlockSizeOfDirectory(const std::string& directory,
59
                                               size_t* size);
60
};
61
62
/*
63
 * DirectIOHelper
64
 */
65
0
inline bool IsSectorAligned(const size_t off, size_t sector_size) {
66
0
  assert((sector_size & (sector_size - 1)) == 0);
67
0
  return (off & (sector_size - 1)) == 0;
68
0
}
69
70
#ifndef NDEBUG
71
inline bool IsSectorAligned(const void* ptr, size_t sector_size) {
72
  return uintptr_t(ptr) % sector_size == 0;
73
}
74
#endif
75
76
#if defined(ROCKSDB_IOURING_PRESENT)
77
struct Posix_IOHandle {
78
  Posix_IOHandle(struct io_uring* _iu,
79
                 std::function<void(FSReadRequest&, void*)> _cb, void* _cb_arg,
80
                 uint64_t _offset, size_t _len, char* _scratch,
81
                 bool _use_direct_io, size_t _alignment)
82
      : iu(_iu),
83
        cb(_cb),
84
        cb_arg(_cb_arg),
85
        offset(_offset),
86
        len(_len),
87
        scratch(_scratch),
88
        use_direct_io(_use_direct_io),
89
        alignment(_alignment),
90
        is_finished(false),
91
        req_count(0) {}
92
93
  struct iovec iov;
94
  struct io_uring* iu;
95
  std::function<void(FSReadRequest&, void*)> cb;
96
  void* cb_arg;
97
  uint64_t offset;
98
  size_t len;
99
  char* scratch;
100
  bool use_direct_io;
101
  size_t alignment;
102
  bool is_finished;
103
  // req_count is used by AbortIO API to keep track of number of requests.
104
  uint32_t req_count;
105
};
106
107
inline void UpdateResult(struct io_uring_cqe* cqe, const std::string& file_name,
108
                         size_t len, size_t iov_len, bool async_read,
109
                         bool use_direct_io, size_t alignment,
110
                         size_t& finished_len, FSReadRequest* req,
111
                         size_t& bytes_read, bool& read_again) {
112
  read_again = false;
113
  if (cqe->res < 0) {
114
    req->result = Slice(req->scratch, 0);
115
    req->status = IOError("Req failed", file_name, cqe->res);
116
  } else {
117
    bytes_read = static_cast<size_t>(cqe->res);
118
    TEST_SYNC_POINT_CALLBACK("UpdateResults::io_uring_result", &bytes_read);
119
    if (bytes_read == iov_len) {
120
      req->result = Slice(req->scratch, req->len);
121
      req->status = IOStatus::OK();
122
    } else if (bytes_read == 0) {
123
      /// cqe->res == 0 can means EOF, or can mean partial results. See
124
      // comment
125
      // https://github.com/facebook/rocksdb/pull/6441#issuecomment-589843435
126
      // Fall back to pread in this case.
127
      if (use_direct_io && !IsSectorAligned(finished_len, alignment)) {
128
        // Bytes reads don't fill sectors. Should only happen at the end
129
        // of the file.
130
        req->result = Slice(req->scratch, finished_len);
131
        req->status = IOStatus::OK();
132
      } else {
133
        if (async_read) {
134
          // No  bytes read. It can means EOF. In case of partial results, it's
135
          // caller responsibility to call read/readasync again.
136
          req->result = Slice(req->scratch, 0);
137
          req->status = IOStatus::OK();
138
        } else {
139
          read_again = true;
140
        }
141
      }
142
    } else if (bytes_read < iov_len) {
143
      assert(bytes_read > 0);
144
      if (async_read) {
145
        req->result = Slice(req->scratch, bytes_read);
146
        req->status = IOStatus::OK();
147
      } else {
148
        assert(bytes_read + finished_len < len);
149
        finished_len += bytes_read;
150
      }
151
    } else {
152
      req->result = Slice(req->scratch, 0);
153
      req->status = IOError("Req returned more bytes than requested", file_name,
154
                            cqe->res);
155
    }
156
  }
157
#ifdef NDEBUG
158
  (void)len;
159
#endif
160
}
161
#endif
162
163
#ifdef OS_LINUX
164
// Files under a specific directory have the same logical block size.
165
// This class caches the logical block size for the specified directories to
166
// save the CPU cost of computing the size.
167
// Safe for concurrent access from multiple threads without any external
168
// synchronization.
169
class LogicalBlockSizeCache {
170
 public:
171
  LogicalBlockSizeCache(
172
      std::function<size_t(int)> get_logical_block_size_of_fd =
173
          PosixHelper::GetLogicalBlockSizeOfFd,
174
      std::function<Status(const std::string&, size_t*)>
175
          get_logical_block_size_of_directory =
176
              PosixHelper::GetLogicalBlockSizeOfDirectory)
177
      : get_logical_block_size_of_fd_(get_logical_block_size_of_fd),
178
        get_logical_block_size_of_directory_(
179
2
            get_logical_block_size_of_directory) {}
180
181
  // Takes the following actions:
182
  // 1. Increases reference count of the directories;
183
  // 2. If the directory's logical block size is not cached,
184
  //    compute the buffer size and cache the result.
185
  Status RefAndCacheLogicalBlockSize(
186
      const std::vector<std::string>& directories);
187
188
  // Takes the following actions:
189
  // 1. Decreases reference count of the directories;
190
  // 2. If the reference count of a directory reaches 0, remove the directory
191
  //    from the cache.
192
  void UnrefAndTryRemoveCachedLogicalBlockSize(
193
      const std::vector<std::string>& directories);
194
195
  // Returns the logical block size for the file.
196
  //
197
  // If the file is under a cached directory, return the cached size.
198
  // Otherwise, the size is computed.
199
  size_t GetLogicalBlockSize(const std::string& fname, int fd);
200
201
0
  int GetRefCount(const std::string& dir) {
202
0
    ReadLock lock(&cache_mutex_);
203
0
    auto it = cache_.find(dir);
204
0
    if (it == cache_.end()) {
205
0
      return 0;
206
0
    }
207
0
    return it->second.ref;
208
0
  }
209
210
0
  size_t Size() const { return cache_.size(); }
211
212
0
  bool Contains(const std::string& dir) {
213
0
    ReadLock lock(&cache_mutex_);
214
0
    return cache_.find(dir) != cache_.end();
215
0
  }
216
217
 private:
218
  struct CacheValue {
219
13.9k
    CacheValue() : size(0), ref(0) {}
220
221
    // Logical block size of the directory.
222
    size_t size;
223
    // Reference count of the directory.
224
    int ref;
225
  };
226
227
  std::function<size_t(int)> get_logical_block_size_of_fd_;
228
  std::function<Status(const std::string&, size_t*)>
229
      get_logical_block_size_of_directory_;
230
231
  std::map<std::string, CacheValue> cache_;
232
  port::RWMutex cache_mutex_;
233
};
234
#endif
235
236
class PosixSequentialFile : public FSSequentialFile {
237
 private:
238
  std::string filename_;
239
  FILE* file_;
240
  int fd_;
241
  bool use_direct_io_;
242
  size_t logical_sector_size_;
243
244
 public:
245
  PosixSequentialFile(const std::string& fname, FILE* file, int fd,
246
                      size_t logical_block_size, const EnvOptions& options);
247
  virtual ~PosixSequentialFile();
248
249
  IOStatus Read(size_t n, const IOOptions& opts, Slice* result, char* scratch,
250
                IODebugContext* dbg) override;
251
  IOStatus PositionedRead(uint64_t offset, size_t n, const IOOptions& opts,
252
                          Slice* result, char* scratch,
253
                          IODebugContext* dbg) override;
254
  IOStatus Skip(uint64_t n) override;
255
  IOStatus InvalidateCache(size_t offset, size_t length) override;
256
143k
  bool use_direct_io() const override { return use_direct_io_; }
257
77.9k
  size_t GetRequiredBufferAlignment() const override {
258
77.9k
    return logical_sector_size_;
259
77.9k
  }
260
};
261
262
#if defined(ROCKSDB_IOURING_PRESENT)
263
// io_uring instance queue depth
264
const unsigned int kIoUringDepth = 256;
265
266
inline void DeleteIOUring(void* p) {
267
  struct io_uring* iu = static_cast<struct io_uring*>(p);
268
  delete iu;
269
}
270
271
inline struct io_uring* CreateIOUring() {
272
  struct io_uring* new_io_uring = new struct io_uring;
273
  int ret = io_uring_queue_init(kIoUringDepth, new_io_uring, 0);
274
  if (ret) {
275
    delete new_io_uring;
276
    new_io_uring = nullptr;
277
  }
278
  return new_io_uring;
279
}
280
#endif  // defined(ROCKSDB_IOURING_PRESENT)
281
282
class PosixRandomAccessFile : public FSRandomAccessFile {
283
 protected:
284
  std::string filename_;
285
  int fd_;
286
  bool use_direct_io_;
287
  size_t logical_sector_size_;
288
#if defined(ROCKSDB_IOURING_PRESENT)
289
  ThreadLocalPtr* thread_local_io_urings_;
290
#endif
291
292
 public:
293
  PosixRandomAccessFile(const std::string& fname, int fd,
294
                        size_t logical_block_size, const EnvOptions& options
295
#if defined(ROCKSDB_IOURING_PRESENT)
296
                        ,
297
                        ThreadLocalPtr* thread_local_io_urings
298
#endif
299
  );
300
  virtual ~PosixRandomAccessFile();
301
302
  IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts, Slice* result,
303
                char* scratch, IODebugContext* dbg) const override;
304
305
  IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs,
306
                     const IOOptions& options, IODebugContext* dbg) override;
307
308
  IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& opts,
309
                    IODebugContext* dbg) override;
310
311
#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
312
  size_t GetUniqueId(char* id, size_t max_size) const override;
313
#endif
314
  void Hint(AccessPattern pattern) override;
315
  IOStatus InvalidateCache(size_t offset, size_t length) override;
316
534k
  bool use_direct_io() const override { return use_direct_io_; }
317
113k
  size_t GetRequiredBufferAlignment() const override {
318
113k
    return logical_sector_size_;
319
113k
  }
320
321
  virtual IOStatus ReadAsync(FSReadRequest& req, const IOOptions& opts,
322
                             std::function<void(FSReadRequest&, void*)> cb,
323
                             void* cb_arg, void** io_handle,
324
                             IOHandleDeleter* del_fn,
325
                             IODebugContext* dbg) override;
326
};
327
328
class PosixWritableFile : public FSWritableFile {
329
 protected:
330
  const std::string filename_;
331
  const bool use_direct_io_;
332
  int fd_;
333
  uint64_t filesize_;
334
  size_t logical_sector_size_;
335
#ifdef ROCKSDB_FALLOCATE_PRESENT
336
  bool allow_fallocate_;
337
  bool fallocate_with_keep_size_;
338
#endif
339
#ifdef ROCKSDB_RANGESYNC_PRESENT
340
  // Even if the syscall is present, the filesystem may still not properly
341
  // support it, so we need to do a dynamic check too.
342
  bool sync_file_range_supported_;
343
#endif  // ROCKSDB_RANGESYNC_PRESENT
344
345
 public:
346
  explicit PosixWritableFile(const std::string& fname, int fd,
347
                             size_t logical_block_size,
348
                             const EnvOptions& options);
349
  virtual ~PosixWritableFile();
350
351
  // Need to implement this so the file is truncated correctly
352
  // with direct I/O
353
  IOStatus Truncate(uint64_t size, const IOOptions& opts,
354
                    IODebugContext* dbg) override;
355
  IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
356
  IOStatus Append(const Slice& data, const IOOptions& opts,
357
                  IODebugContext* dbg) override;
358
  IOStatus Append(const Slice& data, const IOOptions& opts,
359
                  const DataVerificationInfo& /* verification_info */,
360
0
                  IODebugContext* dbg) override {
361
0
    return Append(data, opts, dbg);
362
0
  }
363
  IOStatus PositionedAppend(const Slice& data, uint64_t offset,
364
                            const IOOptions& opts,
365
                            IODebugContext* dbg) override;
366
  IOStatus PositionedAppend(const Slice& data, uint64_t offset,
367
                            const IOOptions& opts,
368
                            const DataVerificationInfo& /* verification_info */,
369
0
                            IODebugContext* dbg) override {
370
0
    return PositionedAppend(data, offset, opts, dbg);
371
0
  }
372
  IOStatus Flush(const IOOptions& opts, IODebugContext* dbg) override;
373
  IOStatus Sync(const IOOptions& opts, IODebugContext* dbg) override;
374
  IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
375
  bool IsSyncThreadSafe() const override;
376
11.6M
  bool use_direct_io() const override { return use_direct_io_; }
377
  void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override;
378
  uint64_t GetFileSize(const IOOptions& opts, IODebugContext* dbg) override;
379
  IOStatus InvalidateCache(size_t offset, size_t length) override;
380
79.4k
  size_t GetRequiredBufferAlignment() const override {
381
79.4k
    return logical_sector_size_;
382
79.4k
  }
383
#ifdef ROCKSDB_FALLOCATE_PRESENT
384
  IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& opts,
385
                    IODebugContext* dbg) override;
386
#endif
387
  IOStatus RangeSync(uint64_t offset, uint64_t nbytes, const IOOptions& opts,
388
                     IODebugContext* dbg) override;
389
#ifdef OS_LINUX
390
  size_t GetUniqueId(char* id, size_t max_size) const override;
391
#endif
392
};
393
394
// mmap() based random-access
395
class PosixMmapReadableFile : public FSRandomAccessFile {
396
 private:
397
  int fd_;
398
  std::string filename_;
399
  void* mmapped_region_;
400
  size_t length_;
401
402
 public:
403
  PosixMmapReadableFile(const int fd, const std::string& fname, void* base,
404
                        size_t length, const EnvOptions& options);
405
  virtual ~PosixMmapReadableFile();
406
  IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts, Slice* result,
407
                char* scratch, IODebugContext* dbg) const override;
408
  void Hint(AccessPattern pattern) override;
409
  IOStatus InvalidateCache(size_t offset, size_t length) override;
410
};
411
412
class PosixMmapFile : public FSWritableFile {
413
 private:
414
  std::string filename_;
415
  int fd_;
416
  size_t page_size_;
417
  size_t map_size_;       // How much extra memory to map at a time
418
  char* base_;            // The mapped region
419
  char* limit_;           // Limit of the mapped region
420
  char* dst_;             // Where to write next  (in range [base_,limit_])
421
  char* last_sync_;       // Where have we synced up to
422
  uint64_t file_offset_;  // Offset of base_ in file
423
#ifdef ROCKSDB_FALLOCATE_PRESENT
424
  bool allow_fallocate_;  // If false, fallocate calls are bypassed
425
  bool fallocate_with_keep_size_;
426
#endif
427
428
  // Roundup x to a multiple of y
429
0
  static size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; }
430
431
0
  size_t TruncateToPageBoundary(size_t s) {
432
0
    s -= (s & (page_size_ - 1));
433
0
    assert((s % page_size_) == 0);
434
0
    return s;
435
0
  }
436
437
  IOStatus MapNewRegion();
438
  IOStatus UnmapCurrentRegion();
439
  IOStatus Msync();
440
441
 public:
442
  PosixMmapFile(const std::string& fname, int fd, size_t page_size,
443
                const EnvOptions& options);
444
  ~PosixMmapFile();
445
446
  // Means Close() will properly take care of truncate
447
  // and it does not need any additional information
448
  IOStatus Truncate(uint64_t /*size*/, const IOOptions& /*opts*/,
449
0
                    IODebugContext* /*dbg*/) override {
450
0
    return IOStatus::OK();
451
0
  }
452
  IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
453
  IOStatus Append(const Slice& data, const IOOptions& opts,
454
                  IODebugContext* dbg) override;
455
  IOStatus Append(const Slice& data, const IOOptions& opts,
456
                  const DataVerificationInfo& /* verification_info */,
457
0
                  IODebugContext* dbg) override {
458
0
    return Append(data, opts, dbg);
459
0
  }
460
  IOStatus Flush(const IOOptions& opts, IODebugContext* dbg) override;
461
  IOStatus Sync(const IOOptions& opts, IODebugContext* dbg) override;
462
  IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
463
  uint64_t GetFileSize(const IOOptions& opts, IODebugContext* dbg) override;
464
  IOStatus InvalidateCache(size_t offset, size_t length) override;
465
#ifdef ROCKSDB_FALLOCATE_PRESENT
466
  IOStatus Allocate(uint64_t offset, uint64_t len, const IOOptions& opts,
467
                    IODebugContext* dbg) override;
468
#endif
469
};
470
471
class PosixRandomRWFile : public FSRandomRWFile {
472
 public:
473
  explicit PosixRandomRWFile(const std::string& fname, int fd,
474
                             const EnvOptions& options);
475
  virtual ~PosixRandomRWFile();
476
477
  IOStatus Write(uint64_t offset, const Slice& data, const IOOptions& opts,
478
                 IODebugContext* dbg) override;
479
480
  IOStatus Read(uint64_t offset, size_t n, const IOOptions& opts, Slice* result,
481
                char* scratch, IODebugContext* dbg) const override;
482
483
  IOStatus Flush(const IOOptions& opts, IODebugContext* dbg) override;
484
  IOStatus Sync(const IOOptions& opts, IODebugContext* dbg) override;
485
  IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
486
  IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
487
488
 private:
489
  const std::string filename_;
490
  int fd_;
491
};
492
493
struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer {
494
  PosixMemoryMappedFileBuffer(void* _base, size_t _length)
495
0
      : MemoryMappedFileBuffer(_base, _length) {}
496
  virtual ~PosixMemoryMappedFileBuffer();
497
};
498
499
class PosixDirectory : public FSDirectory {
500
 public:
501
  explicit PosixDirectory(int fd, const std::string& directory_name);
502
  ~PosixDirectory();
503
  IOStatus Fsync(const IOOptions& opts, IODebugContext* dbg) override;
504
505
  IOStatus Close(const IOOptions& opts, IODebugContext* dbg) override;
506
507
  IOStatus FsyncWithDirOptions(
508
      const IOOptions&, IODebugContext*,
509
      const DirFsyncOptions& dir_fsync_options) override;
510
511
 private:
512
  int fd_;
513
  bool is_btrfs_;
514
  const std::string directory_name_;
515
};
516
517
}  // namespace ROCKSDB_NAMESPACE