Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/env/env_posix.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors
9
10
#include "port/lang.h"
11
#if !defined(OS_WIN)
12
13
#include <dirent.h>
14
#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
15
#include <dlfcn.h>
16
#endif
17
#include <fcntl.h>
18
19
#include <cerrno>
20
21
#if defined(ROCKSDB_IOURING_PRESENT)
22
#include <liburing.h>
23
#endif
24
#include <pthread.h>
25
#include <sys/mman.h>
26
#include <sys/stat.h>
27
28
#include <csignal>
29
#include <cstdio>
30
#include <cstdlib>
31
#include <cstring>
32
#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
33
#include <sys/statfs.h>
34
#endif
35
#include <sys/statvfs.h>
36
#include <sys/time.h>
37
#include <sys/types.h>
38
#if defined(ROCKSDB_IOURING_PRESENT)
39
#include <sys/uio.h>
40
#endif
41
#include <unistd.h>
42
43
#include <algorithm>
44
#include <ctime>
45
// Get nano time includes
46
#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD)
47
#elif defined(__MACH__)
48
#include <Availability.h>
49
#include <mach/clock.h>
50
#include <mach/mach.h>
51
#else
52
#include <chrono>
53
#endif
54
#include <deque>
55
#include <set>
56
#include <vector>
57
58
#include "env/composite_env_wrapper.h"
59
#include "env/io_posix.h"
60
#include "monitoring/iostats_context_imp.h"
61
#include "monitoring/thread_status_updater.h"
62
#include "port/port.h"
63
#include "port/sys_time.h"
64
#include "rocksdb/env.h"
65
#include "rocksdb/options.h"
66
#include "rocksdb/slice.h"
67
#include "rocksdb/system_clock.h"
68
#include "test_util/sync_point.h"
69
#include "util/coding.h"
70
#include "util/compression_context_cache.h"
71
#include "util/random.h"
72
#include "util/string_util.h"
73
#include "util/thread_local.h"
74
#include "util/threadpool_imp.h"
75
76
#if !defined(TMPFS_MAGIC)
77
#define TMPFS_MAGIC 0x01021994
78
#endif
79
#if !defined(XFS_SUPER_MAGIC)
80
#define XFS_SUPER_MAGIC 0x58465342
81
#endif
82
#if !defined(EXT4_SUPER_MAGIC)
83
#define EXT4_SUPER_MAGIC 0xEF53
84
#endif
85
86
namespace ROCKSDB_NAMESPACE {
87
#if defined(OS_WIN)
88
static const std::string kSharedLibExt = ".dll";
89
[[maybe_unused]] static const char kPathSeparator = ';';
90
#else
91
[[maybe_unused]] static const char kPathSeparator = ':';
92
#if defined(OS_MACOSX)
93
static const std::string kSharedLibExt = ".dylib";
94
#else
95
static const std::string kSharedLibExt = ".so";
96
#endif
97
#endif
98
99
namespace {
100
101
2
ThreadStatusUpdater* CreateThreadStatusUpdater() {
102
2
  return new ThreadStatusUpdater();
103
2
}
104
105
#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
106
class PosixDynamicLibrary : public DynamicLibrary {
107
 public:
108
  PosixDynamicLibrary(const std::string& name, void* handle)
109
0
      : name_(name), handle_(handle) {}
110
0
  ~PosixDynamicLibrary() override { dlclose(handle_); }
111
112
0
  Status LoadSymbol(const std::string& sym_name, void** func) override {
113
0
    assert(nullptr != func);
114
0
    dlerror();  // Clear any old error
115
0
    *func = dlsym(handle_, sym_name.c_str());
116
0
    if (*func != nullptr) {
117
0
      return Status::OK();
118
0
    } else {
119
0
      char* err = dlerror();
120
0
      return Status::NotFound("Error finding symbol: " + sym_name, err);
121
0
    }
122
0
  }
123
124
0
  const char* Name() const override { return name_.c_str(); }
125
126
 private:
127
  std::string name_;
128
  void* handle_;
129
};
130
#endif  // !ROCKSDB_NO_DYNAMIC_EXTENSION
131
132
class PosixClock : public SystemClock {
133
 public:
134
0
  static const char* kClassName() { return "PosixClock"; }
135
0
  const char* Name() const override { return kDefaultName(); }
136
0
  const char* NickName() const override { return kClassName(); }
137
138
2.70M
  uint64_t NowMicros() override {
139
2.70M
    port::TimeVal tv;
140
2.70M
    port::GetTimeOfDay(&tv, nullptr);
141
2.70M
    return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
142
2.70M
  }
143
144
22.1k
  uint64_t NowNanos() override {
145
22.1k
#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
146
22.1k
    defined(OS_AIX)
147
22.1k
    struct timespec ts;
148
22.1k
    clock_gettime(CLOCK_MONOTONIC, &ts);
149
22.1k
    return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
150
#elif defined(OS_SOLARIS)
151
    return gethrtime();
152
#elif defined(__MACH__)
153
    clock_serv_t cclock;
154
    mach_timespec_t ts;
155
    host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
156
    clock_get_time(cclock, &ts);
157
    mach_port_deallocate(mach_task_self(), cclock);
158
    return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
159
#else
160
    return std::chrono::duration_cast<std::chrono::nanoseconds>(
161
               std::chrono::steady_clock::now().time_since_epoch())
162
        .count();
163
#endif
164
22.1k
  }
165
166
640
  uint64_t CPUMicros() override {
167
640
#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
168
640
    defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
169
640
    struct timespec ts;
170
640
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
171
640
    return (static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec) / 1000;
172
0
#endif
173
0
    return 0;
174
640
  }
175
176
0
  uint64_t CPUNanos() override {
177
0
#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_GNU_KFREEBSD) || \
178
0
    defined(OS_AIX) || (defined(__MACH__) && defined(__MAC_10_12))
179
0
    struct timespec ts;
180
0
    clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
181
0
    return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
182
0
#endif
183
0
    return 0;
184
0
  }
185
186
0
  void SleepForMicroseconds(int micros) override { usleep(micros); }
187
188
280k
  Status GetCurrentTime(int64_t* unix_time) override {
189
280k
    time_t ret = time(nullptr);
190
280k
    if (ret == (time_t)-1) {
191
0
      return IOError("GetCurrentTime", "", errno);
192
0
    }
193
280k
    *unix_time = (int64_t)ret;
194
280k
    return Status::OK();
195
280k
  }
196
197
0
  std::string TimeToString(uint64_t secondsSince1970) override {
198
0
    const time_t seconds = (time_t)secondsSince1970;
199
0
    struct tm t;
200
0
    int maxsize = 64;
201
0
    std::string dummy;
202
0
    dummy.reserve(maxsize);
203
0
    dummy.resize(maxsize);
204
0
    char* p = dummy.data();
205
0
    port::LocalTimeR(&seconds, &t);
206
0
    snprintf(p, maxsize, "%04d/%02d/%02d-%02d:%02d:%02d ", t.tm_year + 1900,
207
0
             t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec);
208
0
    return dummy;
209
0
  }
210
};
211
212
class PosixEnv : public CompositeEnv {
213
 public:
214
0
  static const char* kClassName() { return "PosixEnv"; }
215
0
  const char* Name() const override { return kClassName(); }
216
0
  const char* NickName() const override { return kDefaultName(); }
217
218
  struct JoinThreadsOnExit {
219
2
    explicit JoinThreadsOnExit(PosixEnv& _deflt) : deflt(_deflt) {}
220
2
    ~JoinThreadsOnExit() {
221
2
      for (const auto tid : deflt.threads_to_join_) {
222
0
        pthread_join(tid, nullptr);
223
0
      }
224
10
      for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
225
8
        deflt.thread_pools_[pool_id].JoinAllThreads();
226
8
      }
227
      // Do not delete the thread_status_updater_ in order to avoid the
228
      // free after use when Env::Default() is destructed while some other
229
      // child threads are still trying to update thread status. All
230
      // PosixEnv instances use the same thread_status_updater_, so never
231
      // explicitly delete it.
232
2
    }
233
    PosixEnv& deflt;
234
  };
235
236
0
  void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
237
0
    if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
238
0
      fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
239
0
    }
240
0
  }
241
242
#ifndef ROCKSDB_NO_DYNAMIC_EXTENSION
243
  // Loads the named library into the result.
244
  // If the input name is empty, the current executable is loaded
245
  // On *nix systems, a "lib" prefix is added to the name if one is not supplied
246
  // Comparably, the appropriate shared library extension is added to the name
247
  // if not supplied. If search_path is not specified, the shared library will
248
  // be loaded using the default path (LD_LIBRARY_PATH) If search_path is
249
  // specified, the shared library will be searched for in the directories
250
  // provided by the search path
251
  Status LoadLibrary(const std::string& name, const std::string& path,
252
0
                     std::shared_ptr<DynamicLibrary>* result) override {
253
0
    assert(result != nullptr);
254
0
    if (name.empty()) {
255
0
      void* hndl = dlopen(NULL, RTLD_NOW);
256
0
      if (hndl != nullptr) {
257
0
        result->reset(new PosixDynamicLibrary(name, hndl));
258
0
        return Status::OK();
259
0
      }
260
0
    } else {
261
0
      std::string library_name = name;
262
0
      if (library_name.find(kSharedLibExt) == std::string::npos) {
263
0
        library_name = library_name + kSharedLibExt;
264
0
      }
265
0
#if !defined(OS_WIN)
266
0
      if (library_name.find('/') == std::string::npos &&
267
0
          library_name.compare(0, 3, "lib") != 0) {
268
0
        library_name = "lib" + library_name;
269
0
      }
270
0
#endif
271
0
      if (path.empty()) {
272
0
        void* hndl = dlopen(library_name.c_str(), RTLD_NOW);
273
0
        if (hndl != nullptr) {
274
0
          result->reset(new PosixDynamicLibrary(library_name, hndl));
275
0
          return Status::OK();
276
0
        }
277
0
      } else {
278
0
        std::string local_path;
279
0
        std::stringstream ss(path);
280
0
        while (getline(ss, local_path, kPathSeparator)) {
281
0
          if (!path.empty()) {
282
0
            std::string full_name = local_path + "/" + library_name;
283
0
            void* hndl = dlopen(full_name.c_str(), RTLD_NOW);
284
0
            if (hndl != nullptr) {
285
0
              result->reset(new PosixDynamicLibrary(full_name, hndl));
286
0
              return Status::OK();
287
0
            }
288
0
          }
289
0
        }
290
0
      }
291
0
    }
292
0
    return Status::IOError(
293
0
        IOErrorMsg("Failed to open shared library: xs", name), dlerror());
294
0
  }
295
#endif  // !ROCKSDB_NO_DYNAMIC_EXTENSION
296
297
  void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
298
                void* tag = nullptr,
299
                void (*unschedFunction)(void* arg) = nullptr) override;
300
301
  int UnSchedule(void* arg, Priority pri) override;
302
303
  void StartThread(void (*function)(void* arg), void* arg) override;
304
305
  void WaitForJoin() override;
306
307
  unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
308
309
  int ReserveThreads(int threads_to_be_reserved, Priority pri) override;
310
311
  int ReleaseThreads(int threads_to_be_released, Priority pri) override;
312
313
0
  Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
314
0
    assert(thread_status_updater_);
315
0
    return thread_status_updater_->GetThreadList(thread_list);
316
0
  }
317
318
2.55M
  uint64_t GetThreadID() const override {
319
2.55M
    uint64_t thread_id = 0;
320
2.55M
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
321
2.55M
#if __GLIBC_PREREQ(2, 30)
322
2.55M
    thread_id = ::gettid();
323
#else   // __GLIBC_PREREQ(2, 30)
324
    pthread_t tid = pthread_self();
325
    memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
326
#endif  // __GLIBC_PREREQ(2, 30)
327
#else   // defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
328
    pthread_t tid = pthread_self();
329
    memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
330
#endif  // defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
331
2.55M
    return thread_id;
332
2.55M
  }
333
334
142k
  Status GetHostName(char* name, uint64_t len) override {
335
142k
    const size_t max_len = static_cast<size_t>(len);
336
142k
    int ret = gethostname(name, max_len);
337
142k
    if (ret < 0) {
338
0
      if (errno == EFAULT || errno == EINVAL) {
339
0
        return Status::InvalidArgument(errnoStr(errno).c_str());
340
0
      } else if (errno == ENAMETOOLONG) {
341
0
        return IOError("GetHostName", std::string(name, strnlen(name, max_len)),
342
0
                       errno);
343
0
      } else {
344
0
        return IOError("GetHostName", "", errno);
345
0
      }
346
0
    }
347
142k
    return Status::OK();
348
142k
  }
349
350
4
  ThreadStatusUpdater* GetThreadStatusUpdater() const override {
351
4
    return Env::GetThreadStatusUpdater();
352
4
  }
353
354
5.22k
  std::string GenerateUniqueId() override { return Env::GenerateUniqueId(); }
355
356
  // Allow increasing the number of worker threads.
357
0
  void SetBackgroundThreads(int num, Priority pri) override {
358
0
    assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
359
0
    thread_pools_[pri].SetBackgroundThreads(num);
360
0
  }
361
362
12.8k
  int GetBackgroundThreads(Priority pri) override {
363
12.8k
    assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
364
12.8k
    return thread_pools_[pri].GetBackgroundThreads();
365
12.8k
  }
366
367
0
  Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
368
0
    allow_non_owner_access_ = allow_non_owner_access;
369
0
    return Status::OK();
370
0
  }
371
372
  // Allow increasing the number of worker threads.
373
32.6k
  void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
374
32.6k
    assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
375
32.6k
    thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
376
32.6k
  }
377
378
0
  void LowerThreadPoolIOPriority(Priority pool) override {
379
0
    assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
380
0
#ifdef OS_LINUX
381
0
    thread_pools_[pool].LowerIOPriority();
382
#else
383
    (void)pool;
384
#endif
385
0
  }
386
387
0
  void LowerThreadPoolCPUPriority(Priority pool) override {
388
0
    assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
389
0
    thread_pools_[pool].LowerCPUPriority(CpuPriority::kLow);
390
0
  }
391
392
0
  Status LowerThreadPoolCPUPriority(Priority pool, CpuPriority pri) override {
393
0
    assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
394
0
    thread_pools_[pool].LowerCPUPriority(pri);
395
0
    return Status::OK();
396
0
  }
397
398
 private:
399
  friend Env* Env::Default();
400
  // Constructs the default Env, a singleton
401
  PosixEnv();
402
403
  // The below 4 members are only used by the default PosixEnv instance.
404
  // Non-default instances simply maintain references to the backing
405
  // members in te default instance
406
  std::vector<ThreadPoolImpl> thread_pools_storage_;
407
  pthread_mutex_t mu_storage_;
408
  std::vector<pthread_t> threads_to_join_storage_;
409
  bool allow_non_owner_access_storage_;
410
411
  std::vector<ThreadPoolImpl>& thread_pools_;
412
  pthread_mutex_t& mu_;
413
  std::vector<pthread_t>& threads_to_join_;
414
  // If true, allow non owner read access for db files. Otherwise, non-owner
415
  //  has no access to db files.
416
  bool& allow_non_owner_access_;
417
};
418
419
PosixEnv::PosixEnv()
420
    : CompositeEnv(FileSystem::Default(), SystemClock::Default()),
421
      thread_pools_storage_(Priority::TOTAL),
422
      allow_non_owner_access_storage_(true),
423
      thread_pools_(thread_pools_storage_),
424
      mu_(mu_storage_),
425
      threads_to_join_(threads_to_join_storage_),
426
2
      allow_non_owner_access_(allow_non_owner_access_storage_) {
427
2
  ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
428
10
  for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
429
8
    thread_pools_[pool_id].SetThreadPriority(
430
8
        static_cast<Env::Priority>(pool_id));
431
    // This allows later initializing the thread-local-env of each thread.
432
8
    thread_pools_[pool_id].SetHostEnv(this);
433
8
  }
434
2
  thread_status_updater_ = CreateThreadStatusUpdater();
435
2
}
436
437
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
438
394
                        void* tag, void (*unschedFunction)(void* arg)) {
439
394
  assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
440
394
  thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
441
394
}
442
443
66.5k
int PosixEnv::UnSchedule(void* arg, Priority pri) {
444
66.5k
  return thread_pools_[pri].UnSchedule(arg);
445
66.5k
}
446
447
0
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
448
0
  assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
449
0
  return thread_pools_[pri].GetQueueLen();
450
0
}
451
452
0
int PosixEnv::ReserveThreads(int threads_to_reserved, Priority pri) {
453
0
  assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
454
0
  return thread_pools_[pri].ReserveThreads(threads_to_reserved);
455
0
}
456
457
0
int PosixEnv::ReleaseThreads(int threads_to_released, Priority pri) {
458
0
  assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
459
0
  return thread_pools_[pri].ReleaseThreads(threads_to_released);
460
0
}
461
462
struct StartThreadState {
463
  void (*user_function)(void*);
464
  void* arg;
465
};
466
467
0
static void* StartThreadWrapper(void* arg) {
468
0
  StartThreadState* state = static_cast<StartThreadState*>(arg);
469
0
  state->user_function(state->arg);
470
0
  delete state;
471
0
  return nullptr;
472
0
}
473
474
0
void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
475
0
  pthread_t t;
476
0
  StartThreadState* state = new StartThreadState;
477
0
  state->user_function = function;
478
0
  state->arg = arg;
479
0
  ThreadPoolImpl::PthreadCall(
480
0
      "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
481
0
  ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
482
0
  threads_to_join_.push_back(t);
483
0
  ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
484
0
}
485
486
0
void PosixEnv::WaitForJoin() {
487
0
  for (const auto tid : threads_to_join_) {
488
0
    pthread_join(tid, nullptr);
489
0
  }
490
0
  threads_to_join_.clear();
491
0
}
492
493
}  // namespace
494
495
//
496
// Default Posix Env
497
//
498
468k
Env* Env::Default() {
499
  // The following function call initializes the singletons of ThreadLocalPtr
500
  // right before the static default_env.  This guarantees default_env will
501
  // always being destructed before the ThreadLocalPtr singletons get
502
  // destructed as C++ guarantees that the destructions of static variables
503
  // is in the reverse order of their constructions.
504
  //
505
  // Since static members are destructed in the reverse order
506
  // of their construction, having this call here guarantees that
507
  // the destructor of static PosixEnv will go first, then the
508
  // the singletons of ThreadLocalPtr.
509
468k
  ThreadLocalPtr::InitSingletons();
510
468k
  CompressionContextCache::InitSingleton();
511
468k
  INIT_SYNC_POINT_SINGLETONS();
512
  // Avoid problems with accessing most members of Env::Default() during
513
  // static destruction.
514
468k
  STATIC_AVOID_DESTRUCTION(PosixEnv, default_env);
515
  // This destructor must be called on exit
516
468k
  static PosixEnv::JoinThreadsOnExit thread_joiner(default_env);
517
468k
  return &default_env;
518
468k
}
519
520
//
521
// Default Posix SystemClock
522
//
523
158k
const std::shared_ptr<SystemClock>& SystemClock::Default() {
524
158k
  STATIC_AVOID_DESTRUCTION(std::shared_ptr<SystemClock>, instance)
525
158k
  (std::make_shared<PosixClock>());
526
158k
  return instance;
527
158k
}
528
}  // namespace ROCKSDB_NAMESPACE
529
530
#endif