Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/logging/env_logger.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
//
10
// Logger implementation that uses custom Env object for logging.
11
12
#pragma once
13
14
#include <time.h>
15
16
#include <atomic>
17
#include <memory>
18
19
#include "file/writable_file_writer.h"
20
#include "monitoring/iostats_context_imp.h"
21
#include "port/sys_time.h"
22
#include "rocksdb/env.h"
23
#include "rocksdb/file_system.h"
24
#include "rocksdb/perf_level.h"
25
#include "rocksdb/slice.h"
26
#include "test_util/sync_point.h"
27
#include "util/mutexlock.h"
28
29
namespace ROCKSDB_NAMESPACE {
30
31
class EnvLogger : public Logger {
32
 public:
33
  EnvLogger(std::unique_ptr<FSWritableFile>&& writable_file,
34
            const std::string& fname, const EnvOptions& options, Env* env,
35
            InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
36
68.5k
      : Logger(log_level),
37
68.5k
        env_(env),
38
68.5k
        clock_(env_->GetSystemClock().get()),
39
68.5k
        file_(std::move(writable_file), fname, options, clock_),
40
68.5k
        last_flush_micros_(0),
41
68.5k
        flush_pending_(false) {}
42
43
68.5k
  ~EnvLogger() {
44
68.5k
    if (!closed_) {
45
10.2k
      closed_ = true;
46
10.2k
      CloseHelper().PermitUncheckedError();
47
10.2k
    }
48
68.5k
  }
49
50
 private:
51
  // A guard to prepare file operations, such as mutex and skip
52
  // I/O context.
53
  class FileOpGuard {
54
   public:
55
    explicit FileOpGuard(EnvLogger& logger)
56
27.3M
        : logger_(logger), prev_perf_level_(GetPerfLevel()) {
57
      // Preserve iostats not to pollute writes from user writes. We might
58
      // need a better solution than this.
59
27.3M
      SetPerfLevel(PerfLevel::kDisable);
60
27.3M
      IOSTATS_SET_DISABLE(true);
61
27.3M
      logger.mutex_.Lock();
62
27.3M
    }
63
27.3M
    ~FileOpGuard() {
64
27.3M
      logger_.mutex_.Unlock();
65
27.3M
      IOSTATS_SET_DISABLE(false);
66
27.3M
      SetPerfLevel(prev_perf_level_);
67
27.3M
    }
68
69
   private:
70
    EnvLogger& logger_;
71
    PerfLevel prev_perf_level_;
72
  };
73
74
466k
  void FlushLocked() {
75
466k
    mutex_.AssertHeld();
76
466k
    if (flush_pending_) {
77
419k
      flush_pending_ = false;
78
419k
      file_.Flush(IOOptions()).PermitUncheckedError();
79
419k
      file_.reset_seen_error();
80
419k
    }
81
466k
    last_flush_micros_ = clock_->NowMicros();
82
466k
  }
83
84
397k
  void Flush() override {
85
397k
    TEST_SYNC_POINT("EnvLogger::Flush:Begin1");
86
397k
    TEST_SYNC_POINT("EnvLogger::Flush:Begin2");
87
88
397k
    FileOpGuard guard(*this);
89
397k
    FlushLocked();
90
397k
  }
91
92
58.3k
  Status CloseImpl() override { return CloseHelper(); }
93
94
68.5k
  Status CloseHelper() {
95
68.5k
    FileOpGuard guard(*this);
96
68.5k
    const auto close_status = file_.Close(IOOptions());
97
98
68.5k
    if (close_status.ok()) {
99
68.5k
      return close_status;
100
68.5k
    }
101
0
    return Status::IOError("Close of log file failed with error:" +
102
0
                           (close_status.getState()
103
0
                                ? std::string(close_status.getState())
104
0
                                : std::string()));
105
68.5k
  }
106
107
  using Logger::Logv;
108
26.8M
  void Logv(const char* format, va_list ap) override {
109
26.8M
    IOSTATS_TIMER_GUARD(logger_nanos);
110
111
26.8M
    const uint64_t thread_id = env_->GetThreadID();
112
113
    // We try twice: the first time with a fixed-size stack allocated buffer,
114
    // and the second time with a much larger dynamically allocated buffer.
115
26.8M
    char buffer[500];
116
27.0M
    for (int iter = 0; iter < 2; iter++) {
117
27.0M
      char* base;
118
27.0M
      int bufsize;
119
27.0M
      if (iter == 0) {
120
26.8M
        bufsize = sizeof(buffer);
121
26.8M
        base = buffer;
122
26.8M
      } else {
123
153k
        bufsize = 65536;
124
153k
        base = new char[bufsize];
125
153k
      }
126
27.0M
      char* p = base;
127
27.0M
      char* limit = base + bufsize;
128
129
27.0M
      port::TimeVal now_tv;
130
27.0M
      port::GetTimeOfDay(&now_tv, nullptr);
131
27.0M
      const time_t seconds = now_tv.tv_sec;
132
27.0M
      struct tm t;
133
27.0M
      port::LocalTimeR(&seconds, &t);
134
27.0M
      p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llu ",
135
27.0M
                    t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,
136
27.0M
                    t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec),
137
27.0M
                    static_cast<long long unsigned int>(thread_id));
138
139
      // Print the message
140
27.0M
      if (p < limit) {
141
27.0M
        va_list backup_ap;
142
27.0M
        va_copy(backup_ap, ap);
143
27.0M
        p += vsnprintf(p, limit - p, format, backup_ap);
144
27.0M
        va_end(backup_ap);
145
27.0M
      }
146
147
      // Truncate to available space if necessary
148
27.0M
      if (p >= limit) {
149
153k
        if (iter == 0) {
150
153k
          continue;  // Try again with larger buffer
151
153k
        } else {
152
0
          p = limit - 1;
153
0
        }
154
153k
      }
155
156
      // Add newline if necessary
157
26.8M
      if (p == base || p[-1] != '\n') {
158
25.7M
        *p++ = '\n';
159
25.7M
      }
160
161
26.8M
      assert(p <= limit);
162
26.8M
      {
163
26.8M
        FileOpGuard guard(*this);
164
        // We will ignore any error returned by Append().
165
26.8M
        file_.Append(IOOptions(), Slice(base, p - base)).PermitUncheckedError();
166
26.8M
        file_.reset_seen_error();
167
26.8M
        flush_pending_ = true;
168
26.8M
        const uint64_t now_micros = clock_->NowMicros();
169
26.8M
        if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
170
68.5k
          FlushLocked();
171
68.5k
        }
172
26.8M
      }
173
26.8M
      if (base != buffer) {
174
153k
        delete[] base;
175
153k
      }
176
26.8M
      break;
177
27.0M
    }
178
26.8M
  }
179
180
0
  size_t GetLogFileSize() const override {
181
0
    MutexLock l(&mutex_);
182
0
    return file_.GetFileSize();
183
0
  }
184
185
 private:
186
  Env* env_;
187
  SystemClock* clock_;
188
  WritableFileWriter file_;
189
  mutable port::Mutex mutex_;  // Mutex to protect the shared variables below.
190
  const static uint64_t flush_every_seconds_ = 5;
191
  std::atomic_uint_fast64_t last_flush_micros_;
192
  std::atomic<bool> flush_pending_;
193
};
194
195
}  // namespace ROCKSDB_NAMESPACE