Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/env/composite_env.cc
Line
Count
Source
1
// Copyright (c) 2019-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
#include "env/composite_env_wrapper.h"
7
#include "rocksdb/utilities/options_type.h"
8
#include "util/string_util.h"
9
10
namespace ROCKSDB_NAMESPACE {
11
namespace {
12
// The CompositeEnvWrapper class provides an interface that is compatible
13
// with the old monolithic Env API, and an implementation that wraps around
14
// the new Env that provides threading and other OS related functionality, and
15
// the new FileSystem API that provides storage functionality. By
16
// providing the old Env interface, it allows the rest of RocksDB code to
17
// be agnostic of whether the underlying Env implementation is a monolithic
18
// Env or an Env + FileSystem. In the former case, the user will specify
19
// Options::env only, whereas in the latter case, the user will specify
20
// Options::env and Options::file_system.
21
22
class CompositeSequentialFileWrapper : public SequentialFile {
23
 public:
24
  explicit CompositeSequentialFileWrapper(
25
      std::unique_ptr<FSSequentialFile>& target)
26
0
      : target_(std::move(target)) {}
27
28
0
  Status Read(size_t n, Slice* result, char* scratch) override {
29
0
    IOOptions io_opts;
30
0
    IODebugContext dbg;
31
0
    return target_->Read(n, io_opts, result, scratch, &dbg);
32
0
  }
33
0
  Status Skip(uint64_t n) override { return target_->Skip(n); }
34
0
  bool use_direct_io() const override { return target_->use_direct_io(); }
35
0
  size_t GetRequiredBufferAlignment() const override {
36
0
    return target_->GetRequiredBufferAlignment();
37
0
  }
38
0
  Status InvalidateCache(size_t offset, size_t length) override {
39
0
    return target_->InvalidateCache(offset, length);
40
0
  }
41
  Status PositionedRead(uint64_t offset, size_t n, Slice* result,
42
0
                        char* scratch) override {
43
0
    IOOptions io_opts;
44
0
    IODebugContext dbg;
45
0
    return target_->PositionedRead(offset, n, io_opts, result, scratch, &dbg);
46
0
  }
47
48
 private:
49
  std::unique_ptr<FSSequentialFile> target_;
50
};
51
52
class CompositeRandomAccessFileWrapper : public RandomAccessFile {
53
 public:
54
  explicit CompositeRandomAccessFileWrapper(
55
      std::unique_ptr<FSRandomAccessFile>& target)
56
0
      : target_(std::move(target)) {}
57
58
  Status Read(uint64_t offset, size_t n, Slice* result,
59
0
              char* scratch) const override {
60
0
    IOOptions io_opts;
61
0
    IODebugContext dbg;
62
0
    return target_->Read(offset, n, io_opts, result, scratch, &dbg);
63
0
  }
64
0
  Status MultiRead(ReadRequest* reqs, size_t num_reqs) override {
65
0
    IOOptions io_opts;
66
0
    IODebugContext dbg;
67
0
    std::vector<FSReadRequest> fs_reqs;
68
0
    Status status;
69
70
0
    fs_reqs.resize(num_reqs);
71
0
    for (size_t i = 0; i < num_reqs; ++i) {
72
0
      fs_reqs[i].offset = reqs[i].offset;
73
0
      fs_reqs[i].len = reqs[i].len;
74
0
      fs_reqs[i].scratch = reqs[i].scratch;
75
0
      fs_reqs[i].status = IOStatus::OK();
76
0
    }
77
0
    status = target_->MultiRead(fs_reqs.data(), num_reqs, io_opts, &dbg);
78
0
    for (size_t i = 0; i < num_reqs; ++i) {
79
0
      reqs[i].result = fs_reqs[i].result;
80
0
      reqs[i].status = fs_reqs[i].status;
81
0
    }
82
0
    return status;
83
0
  }
84
0
  Status Prefetch(uint64_t offset, size_t n) override {
85
0
    IOOptions io_opts;
86
0
    IODebugContext dbg;
87
0
    return target_->Prefetch(offset, n, io_opts, &dbg);
88
0
  }
89
0
  size_t GetUniqueId(char* id, size_t max_size) const override {
90
0
    return target_->GetUniqueId(id, max_size);
91
0
  }
92
0
  void Hint(AccessPattern pattern) override {
93
0
    target_->Hint((FSRandomAccessFile::AccessPattern)pattern);
94
0
  }
95
0
  bool use_direct_io() const override { return target_->use_direct_io(); }
96
0
  size_t GetRequiredBufferAlignment() const override {
97
0
    return target_->GetRequiredBufferAlignment();
98
0
  }
99
0
  Status InvalidateCache(size_t offset, size_t length) override {
100
0
    return target_->InvalidateCache(offset, length);
101
0
  }
102
103
0
  Status GetFileSize(uint64_t* size) override {
104
0
    return target_->GetFileSize(size);
105
0
  }
106
107
 private:
108
  std::unique_ptr<FSRandomAccessFile> target_;
109
};
110
111
class CompositeWritableFileWrapper : public WritableFile {
112
 public:
113
  explicit CompositeWritableFileWrapper(std::unique_ptr<FSWritableFile>& t)
114
0
      : target_(std::move(t)) {}
115
116
0
  Status Append(const Slice& data) override {
117
0
    IOOptions io_opts;
118
0
    IODebugContext dbg;
119
0
    return target_->Append(data, io_opts, &dbg);
120
0
  }
121
  Status Append(const Slice& data,
122
0
                const DataVerificationInfo& verification_info) override {
123
0
    IOOptions io_opts;
124
0
    IODebugContext dbg;
125
0
    return target_->Append(data, io_opts, verification_info, &dbg);
126
0
  }
127
0
  Status PositionedAppend(const Slice& data, uint64_t offset) override {
128
0
    IOOptions io_opts;
129
0
    IODebugContext dbg;
130
0
    return target_->PositionedAppend(data, offset, io_opts, &dbg);
131
0
  }
132
  Status PositionedAppend(
133
      const Slice& data, uint64_t offset,
134
0
      const DataVerificationInfo& verification_info) override {
135
0
    IOOptions io_opts;
136
0
    IODebugContext dbg;
137
0
    return target_->PositionedAppend(data, offset, io_opts, verification_info,
138
0
                                     &dbg);
139
0
  }
140
0
  Status Truncate(uint64_t size) override {
141
0
    IOOptions io_opts;
142
0
    IODebugContext dbg;
143
0
    return target_->Truncate(size, io_opts, &dbg);
144
0
  }
145
0
  Status Close() override {
146
0
    IOOptions io_opts;
147
0
    IODebugContext dbg;
148
0
    return target_->Close(io_opts, &dbg);
149
0
  }
150
0
  Status Flush() override {
151
0
    IOOptions io_opts;
152
0
    IODebugContext dbg;
153
0
    return target_->Flush(io_opts, &dbg);
154
0
  }
155
0
  Status Sync() override {
156
0
    IOOptions io_opts;
157
0
    IODebugContext dbg;
158
0
    return target_->Sync(io_opts, &dbg);
159
0
  }
160
0
  Status Fsync() override {
161
0
    IOOptions io_opts;
162
0
    IODebugContext dbg;
163
0
    return target_->Fsync(io_opts, &dbg);
164
0
  }
165
0
  bool IsSyncThreadSafe() const override { return target_->IsSyncThreadSafe(); }
166
167
0
  bool use_direct_io() const override { return target_->use_direct_io(); }
168
169
0
  size_t GetRequiredBufferAlignment() const override {
170
0
    return target_->GetRequiredBufferAlignment();
171
0
  }
172
173
0
  void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override {
174
0
    target_->SetWriteLifeTimeHint(hint);
175
0
  }
176
177
0
  Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
178
0
    return target_->GetWriteLifeTimeHint();
179
0
  }
180
181
0
  uint64_t GetFileSize() override {
182
0
    IOOptions io_opts;
183
0
    IODebugContext dbg;
184
0
    return target_->GetFileSize(io_opts, &dbg);
185
0
  }
186
187
0
  void SetPreallocationBlockSize(size_t size) override {
188
0
    target_->SetPreallocationBlockSize(size);
189
0
  }
190
191
  void GetPreallocationStatus(size_t* block_size,
192
0
                              size_t* last_allocated_block) override {
193
0
    target_->GetPreallocationStatus(block_size, last_allocated_block);
194
0
  }
195
196
0
  size_t GetUniqueId(char* id, size_t max_size) const override {
197
0
    return target_->GetUniqueId(id, max_size);
198
0
  }
199
200
0
  Status InvalidateCache(size_t offset, size_t length) override {
201
0
    return target_->InvalidateCache(offset, length);
202
0
  }
203
204
0
  Status RangeSync(uint64_t offset, uint64_t nbytes) override {
205
0
    IOOptions io_opts;
206
0
    IODebugContext dbg;
207
0
    return target_->RangeSync(offset, nbytes, io_opts, &dbg);
208
0
  }
209
210
0
  void PrepareWrite(size_t offset, size_t len) override {
211
0
    IOOptions io_opts;
212
0
    IODebugContext dbg;
213
0
    target_->PrepareWrite(offset, len, io_opts, &dbg);
214
0
  }
215
216
0
  Status Allocate(uint64_t offset, uint64_t len) override {
217
0
    IOOptions io_opts;
218
0
    IODebugContext dbg;
219
0
    return target_->Allocate(offset, len, io_opts, &dbg);
220
0
  }
221
222
0
  std::unique_ptr<FSWritableFile>* target() { return &target_; }
223
224
 private:
225
  std::unique_ptr<FSWritableFile> target_;
226
};
227
228
class CompositeRandomRWFileWrapper : public RandomRWFile {
229
 public:
230
  explicit CompositeRandomRWFileWrapper(std::unique_ptr<FSRandomRWFile>& target)
231
0
      : target_(std::move(target)) {}
232
233
0
  bool use_direct_io() const override { return target_->use_direct_io(); }
234
0
  size_t GetRequiredBufferAlignment() const override {
235
0
    return target_->GetRequiredBufferAlignment();
236
0
  }
237
0
  Status Write(uint64_t offset, const Slice& data) override {
238
0
    IOOptions io_opts;
239
0
    IODebugContext dbg;
240
0
    return target_->Write(offset, data, io_opts, &dbg);
241
0
  }
242
  Status Read(uint64_t offset, size_t n, Slice* result,
243
0
              char* scratch) const override {
244
0
    IOOptions io_opts;
245
0
    IODebugContext dbg;
246
0
    return target_->Read(offset, n, io_opts, result, scratch, &dbg);
247
0
  }
248
0
  Status Flush() override {
249
0
    IOOptions io_opts;
250
0
    IODebugContext dbg;
251
0
    return target_->Flush(io_opts, &dbg);
252
0
  }
253
0
  Status Sync() override {
254
0
    IOOptions io_opts;
255
0
    IODebugContext dbg;
256
0
    return target_->Sync(io_opts, &dbg);
257
0
  }
258
0
  Status Fsync() override {
259
0
    IOOptions io_opts;
260
0
    IODebugContext dbg;
261
0
    return target_->Fsync(io_opts, &dbg);
262
0
  }
263
0
  Status Close() override {
264
0
    IOOptions io_opts;
265
0
    IODebugContext dbg;
266
0
    return target_->Close(io_opts, &dbg);
267
0
  }
268
269
 private:
270
  std::unique_ptr<FSRandomRWFile> target_;
271
};
272
273
class CompositeDirectoryWrapper : public Directory {
274
 public:
275
  explicit CompositeDirectoryWrapper(std::unique_ptr<FSDirectory>& target)
276
0
      : target_(std::move(target)) {}
277
278
0
  Status Fsync() override {
279
0
    IOOptions io_opts;
280
0
    IODebugContext dbg;
281
0
    return target_->FsyncWithDirOptions(io_opts, &dbg, DirFsyncOptions());
282
0
  }
283
284
0
  Status Close() override {
285
0
    IOOptions io_opts;
286
0
    IODebugContext dbg;
287
0
    return target_->Close(io_opts, &dbg);
288
0
  }
289
290
0
  size_t GetUniqueId(char* id, size_t max_size) const override {
291
0
    return target_->GetUniqueId(id, max_size);
292
0
  }
293
294
 private:
295
  std::unique_ptr<FSDirectory> target_;
296
};
297
}  // namespace
298
299
Status CompositeEnv::NewSequentialFile(const std::string& f,
300
                                       std::unique_ptr<SequentialFile>* r,
301
0
                                       const EnvOptions& options) {
302
0
  IODebugContext dbg;
303
0
  std::unique_ptr<FSSequentialFile> file;
304
0
  Status status;
305
0
  status =
306
0
      file_system_->NewSequentialFile(f, FileOptions(options), &file, &dbg);
307
0
  if (status.ok()) {
308
0
    r->reset(new CompositeSequentialFileWrapper(file));
309
0
  }
310
0
  return status;
311
0
}
312
313
Status CompositeEnv::NewRandomAccessFile(const std::string& f,
314
                                         std::unique_ptr<RandomAccessFile>* r,
315
0
                                         const EnvOptions& options) {
316
0
  IODebugContext dbg;
317
0
  std::unique_ptr<FSRandomAccessFile> file;
318
0
  Status status;
319
0
  status =
320
0
      file_system_->NewRandomAccessFile(f, FileOptions(options), &file, &dbg);
321
0
  if (status.ok()) {
322
0
    r->reset(new CompositeRandomAccessFileWrapper(file));
323
0
  }
324
0
  return status;
325
0
}
326
327
Status CompositeEnv::NewWritableFile(const std::string& f,
328
                                     std::unique_ptr<WritableFile>* r,
329
0
                                     const EnvOptions& options) {
330
0
  IODebugContext dbg;
331
0
  std::unique_ptr<FSWritableFile> file;
332
0
  Status status;
333
0
  status = file_system_->NewWritableFile(f, FileOptions(options), &file, &dbg);
334
0
  if (status.ok()) {
335
0
    r->reset(new CompositeWritableFileWrapper(file));
336
0
  }
337
0
  return status;
338
0
}
339
340
Status CompositeEnv::ReopenWritableFile(const std::string& fname,
341
                                        std::unique_ptr<WritableFile>* result,
342
0
                                        const EnvOptions& options) {
343
0
  IODebugContext dbg;
344
0
  Status status;
345
0
  std::unique_ptr<FSWritableFile> file;
346
0
  status = file_system_->ReopenWritableFile(fname, FileOptions(options), &file,
347
0
                                            &dbg);
348
0
  if (status.ok()) {
349
0
    result->reset(new CompositeWritableFileWrapper(file));
350
0
  }
351
0
  return status;
352
0
}
353
354
Status CompositeEnv::ReuseWritableFile(const std::string& fname,
355
                                       const std::string& old_fname,
356
                                       std::unique_ptr<WritableFile>* r,
357
0
                                       const EnvOptions& options) {
358
0
  IODebugContext dbg;
359
0
  Status status;
360
0
  std::unique_ptr<FSWritableFile> file;
361
0
  status = file_system_->ReuseWritableFile(fname, old_fname,
362
0
                                           FileOptions(options), &file, &dbg);
363
0
  if (status.ok()) {
364
0
    r->reset(new CompositeWritableFileWrapper(file));
365
0
  }
366
0
  return status;
367
0
}
368
369
Status CompositeEnv::NewRandomRWFile(const std::string& fname,
370
                                     std::unique_ptr<RandomRWFile>* result,
371
0
                                     const EnvOptions& options) {
372
0
  IODebugContext dbg;
373
0
  std::unique_ptr<FSRandomRWFile> file;
374
0
  Status status;
375
0
  status =
376
0
      file_system_->NewRandomRWFile(fname, FileOptions(options), &file, &dbg);
377
0
  if (status.ok()) {
378
0
    result->reset(new CompositeRandomRWFileWrapper(file));
379
0
  }
380
0
  return status;
381
0
}
382
383
Status CompositeEnv::NewDirectory(const std::string& name,
384
0
                                  std::unique_ptr<Directory>* result) {
385
0
  IOOptions io_opts;
386
0
  IODebugContext dbg;
387
0
  std::unique_ptr<FSDirectory> dir;
388
0
  Status status;
389
0
  status = file_system_->NewDirectory(name, io_opts, &dir, &dbg);
390
0
  if (status.ok()) {
391
0
    result->reset(new CompositeDirectoryWrapper(dir));
392
0
  }
393
0
  return status;
394
0
}
395
396
namespace {
397
static std::unordered_map<std::string, OptionTypeInfo> env_wrapper_type_info = {
398
    {"target",
399
     OptionTypeInfo(0, OptionType::kUnknown, OptionVerificationType::kByName,
400
                    OptionTypeFlags::kDontSerialize)
401
         .SetParseFunc([](const ConfigOptions& opts,
402
                          const std::string& /*name*/, const std::string& value,
403
0
                          void* addr) {
404
0
           auto target = static_cast<EnvWrapper::Target*>(addr);
405
0
           return Env::CreateFromString(opts, value, &(target->env),
406
0
                                        &(target->guard));
407
0
         })
408
         .SetEqualsFunc([](const ConfigOptions& opts,
409
                           const std::string& /*name*/, const void* addr1,
410
0
                           const void* addr2, std::string* mismatch) {
411
0
           const auto target1 = static_cast<const EnvWrapper::Target*>(addr1);
412
0
           const auto target2 = static_cast<const EnvWrapper::Target*>(addr2);
413
0
           if (target1->env != nullptr) {
414
0
             return target1->env->AreEquivalent(opts, target2->env, mismatch);
415
0
           } else {
416
0
             return (target2->env == nullptr);
417
0
           }
418
0
         })
419
         .SetPrepareFunc([](const ConfigOptions& opts,
420
0
                            const std::string& /*name*/, void* addr) {
421
0
           auto target = static_cast<EnvWrapper::Target*>(addr);
422
0
           if (target->guard.get() != nullptr) {
423
0
             target->env = target->guard.get();
424
0
           } else if (target->env == nullptr) {
425
0
             target->env = Env::Default();
426
0
           }
427
0
           return target->env->PrepareOptions(opts);
428
0
         })
429
         .SetValidateFunc([](const DBOptions& db_opts,
430
                             const ColumnFamilyOptions& cf_opts,
431
0
                             const std::string& /*name*/, const void* addr) {
432
0
           const auto target = static_cast<const EnvWrapper::Target*>(addr);
433
0
           if (target->env == nullptr) {
434
0
             return Status::InvalidArgument("Target Env not specified");
435
0
           } else {
436
0
             return target->env->ValidateOptions(db_opts, cf_opts);
437
0
           }
438
0
         })},
439
};
440
static std::unordered_map<std::string, OptionTypeInfo>
441
    composite_fs_wrapper_type_info = {
442
        {"file_system",
443
         OptionTypeInfo::AsCustomSharedPtr<FileSystem>(
444
             0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
445
};
446
447
static std::unordered_map<std::string, OptionTypeInfo>
448
    composite_clock_wrapper_type_info = {
449
        {"clock",
450
         OptionTypeInfo::AsCustomSharedPtr<SystemClock>(
451
             0, OptionVerificationType::kByName, OptionTypeFlags::kNone)},
452
};
453
454
}  // namespace
455
456
0
std::unique_ptr<Env> NewCompositeEnv(const std::shared_ptr<FileSystem>& fs) {
457
0
  return std::unique_ptr<Env>(new CompositeEnvWrapper(Env::Default(), fs));
458
0
}
459
460
CompositeEnvWrapper::CompositeEnvWrapper(Env* env,
461
                                         const std::shared_ptr<FileSystem>& fs,
462
                                         const std::shared_ptr<SystemClock>& sc)
463
0
    : CompositeEnv(fs, sc), target_(env) {
464
0
  RegisterOptions("", &target_, &env_wrapper_type_info);
465
0
  RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
466
0
  RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
467
0
}
468
469
CompositeEnvWrapper::CompositeEnvWrapper(const std::shared_ptr<Env>& env,
470
                                         const std::shared_ptr<FileSystem>& fs,
471
                                         const std::shared_ptr<SystemClock>& sc)
472
0
    : CompositeEnv(fs, sc), target_(env) {
473
0
  RegisterOptions("", &target_, &env_wrapper_type_info);
474
0
  RegisterOptions("", &file_system_, &composite_fs_wrapper_type_info);
475
0
  RegisterOptions("", &system_clock_, &composite_clock_wrapper_type_info);
476
0
}
477
478
0
Status CompositeEnvWrapper::PrepareOptions(const ConfigOptions& options) {
479
0
  target_.Prepare();
480
0
  if (file_system_ == nullptr) {
481
0
    file_system_ = target_.env->GetFileSystem();
482
0
  }
483
0
  if (system_clock_ == nullptr) {
484
0
    system_clock_ = target_.env->GetSystemClock();
485
0
  }
486
0
  return Env::PrepareOptions(options);
487
0
}
488
489
std::string CompositeEnvWrapper::SerializeOptions(
490
0
    const ConfigOptions& config_options, const std::string& header) const {
491
0
  auto options = CompositeEnv::SerializeOptions(config_options, header);
492
0
  if (target_.env != nullptr && target_.env != Env::Default()) {
493
0
    options.append("target=");
494
0
    options.append(target_.env->ToString(config_options));
495
0
  }
496
0
  return options;
497
0
}
498
499
0
EnvWrapper::EnvWrapper(Env* t) : target_(t) {
500
0
  RegisterOptions("", &target_, &env_wrapper_type_info);
501
0
}
502
503
0
EnvWrapper::EnvWrapper(std::unique_ptr<Env>&& t) : target_(std::move(t)) {
504
0
  RegisterOptions("", &target_, &env_wrapper_type_info);
505
0
}
506
507
0
EnvWrapper::EnvWrapper(const std::shared_ptr<Env>& t) : target_(t) {
508
0
  RegisterOptions("", &target_, &env_wrapper_type_info);
509
0
}
510
511
0
EnvWrapper::~EnvWrapper() = default;
512
513
0
Status EnvWrapper::PrepareOptions(const ConfigOptions& options) {
514
0
  target_.Prepare();
515
0
  return Env::PrepareOptions(options);
516
0
}
517
518
std::string EnvWrapper::SerializeOptions(const ConfigOptions& config_options,
519
0
                                         const std::string& header) const {
520
0
  auto parent = Env::SerializeOptions(config_options, "");
521
0
  if (config_options.IsShallow() || target_.env == nullptr ||
522
0
      target_.env == Env::Default()) {
523
0
    return parent;
524
0
  } else {
525
0
    std::string result = header;
526
0
    if (!StartsWith(parent, OptionTypeInfo::kIdPropName())) {
527
0
      result.append(OptionTypeInfo::kIdPropName()).append("=");
528
0
    }
529
0
    result.append(parent);
530
0
    if (!EndsWith(result, config_options.delimiter)) {
531
0
      result.append(config_options.delimiter);
532
0
    }
533
0
    result.append("target=").append(target_.env->ToString(config_options));
534
0
    return result;
535
0
  }
536
0
}
537
538
}  // namespace ROCKSDB_NAMESPACE