Coverage Report

Created: 2025-07-23 08:14

/src/osquery/plugins/database/rocksdb.cpp
Line
Count
Source (jump to first uncovered line)
1
/**
2
 * Copyright (c) 2014-present, The osquery authors
3
 *
4
 * This source code is licensed as defined by the LICENSE file found in the
5
 * root directory of this source tree.
6
 *
7
 * SPDX-License-Identifier: (Apache-2.0 OR GPL-2.0-only)
8
 */
9
10
#include <sys/stat.h>
11
12
#include <rocksdb/db.h>
13
#include <rocksdb/env.h>
14
#include <rocksdb/options.h>
15
16
#include <osquery/core/flags.h>
17
#include <osquery/filesystem/fileops.h>
18
#include <osquery/filesystem/filesystem.h>
19
#include <osquery/logger/logger.h>
20
#include <osquery/registry/registry_factory.h>
21
#include <osquery/utils/conversions/tryto.h>
22
#include <plugins/database/rocksdb.h>
23
24
namespace fs = boost::filesystem;
25
26
namespace osquery {
27
28
/// Hidden flags created for internal stress testing.
29
HIDDEN_FLAG(int32, rocksdb_write_buffer, 16, "Max write buffer number");
30
HIDDEN_FLAG(int32, rocksdb_merge_number, 4, "Min write buffer number to merge");
31
HIDDEN_FLAG(int32, rocksdb_background_flushes, 4, "Max background flushes");
32
HIDDEN_FLAG(int32, rocksdb_buffer_blocks, 256, "Write buffer blocks (4k)");
33
34
DECLARE_string(database_path);
35
36
/**
37
 * @brief Track external systems marking the RocksDB database as corrupted.
38
 *
39
 * This can be set using the RocksDBDatabasePlugin's static methods.
40
 * The two primary external systems are the RocksDB logger plugin and tests.
41
 */
42
std::atomic<bool> kRocksDBCorruptionIndicator{false};
43
44
/// Backing-storage provider for osquery internal/core.
45
REGISTER_INTERNAL(RocksDBDatabasePlugin, "database", "rocksdb");
46
47
0
void GlogRocksDBLogger::Logv(const char* format, va_list ap) {
48
  // Convert RocksDB log to string and check if header or level-ed log.
49
0
  std::string log_line;
50
0
  {
51
0
    char buffer[501] = {0};
52
0
    vsnprintf(buffer, 500, format, ap);
53
0
    va_end(ap);
54
0
    if (buffer[0] != '[' || (buffer[1] != 'E' && buffer[1] != 'W')) {
55
0
      return;
56
0
    }
57
58
0
    log_line = buffer;
59
0
  }
60
61
  // There is a spurious warning on first open.
62
0
  if (log_line.find("Error when reading") == std::string::npos) {
63
    // RocksDB calls are non-reentrant. Since this callback is made in the
64
    // context of a RocksDB API call, turn log forwarding off to prevent the
65
    // logger from trying to make a call back into RocksDB and causing a
66
    // deadlock.
67
0
    LOG(INFO) << "RocksDB: " << log_line;
68
0
  }
69
70
  // If the callback includes 'Corruption' then set the corruption indicator.
71
0
  if (log_line.find("Corruption:") != std::string::npos) {
72
0
    RocksDBDatabasePlugin::setCorrupted();
73
0
  }
74
0
}
75
76
0
Status RocksDBDatabasePlugin::setUp() {
77
0
  if (!allowOpen()) {
78
0
    LOG(WARNING) << RLOG(1629) << "Not allowed to set up database plugin";
79
0
  }
80
81
  // Consume the current settings.
82
  // A configuration update may change them, but that does not affect state.
83
0
  path_ = fs::path(FLAGS_database_path).make_preferred().string();
84
85
0
  if (pathExists(path_).ok() && !isReadable(path_).ok()) {
86
0
    return Status(1, "Cannot read RocksDB path: " + path_);
87
0
  }
88
89
0
  if (!checkingDB()) {
90
0
    VLOG(1) << "Opening RocksDB handle: " << path_;
91
0
  }
92
93
0
  if (!initialized_) {
94
0
    initialized_ = true;
95
96
    // Set meta-data (mostly) handling options.
97
0
    options_.create_if_missing = true;
98
0
    options_.create_missing_column_families = true;
99
0
    options_.info_log_level = rocksdb::ERROR_LEVEL;
100
0
    options_.log_file_time_to_roll = 0;
101
0
    options_.keep_log_file_num = 10;
102
0
    options_.max_log_file_size = 1024 * 1024 * 1;
103
0
    options_.max_open_files = 128;
104
0
    options_.stats_dump_period_sec = 0;
105
0
    options_.max_manifest_file_size = 1024 * 500;
106
107
    // Performance and optimization settings.
108
    // Use rocksdb::kZSTD to use ZSTD database compression
109
0
    options_.compression = rocksdb::kNoCompression;
110
0
    options_.compaction_style = rocksdb::kCompactionStyleLevel;
111
0
    options_.arena_block_size = (4 * 1024);
112
0
    options_.write_buffer_size = (4 * 1024) * FLAGS_rocksdb_buffer_blocks;
113
0
    options_.max_write_buffer_number =
114
0
        static_cast<int>(FLAGS_rocksdb_write_buffer);
115
0
    options_.min_write_buffer_number_to_merge =
116
0
        static_cast<int>(FLAGS_rocksdb_merge_number);
117
0
    options_.max_background_flushes =
118
0
        static_cast<int>(FLAGS_rocksdb_background_flushes);
119
120
    // Create an environment to replace the default logger.
121
0
    if (logger_ == nullptr) {
122
0
      logger_ = std::make_shared<GlogRocksDBLogger>();
123
0
    }
124
0
    options_.info_log = logger_;
125
126
0
    std::set<std::string> domain_set;
127
0
    column_families_.push_back(rocksdb::ColumnFamilyDescriptor(
128
0
        rocksdb::kDefaultColumnFamilyName, options_));
129
0
    domain_set.insert(rocksdb::kDefaultColumnFamilyName);
130
131
0
    for (const auto& cf_name : kDomains) {
132
0
      column_families_.push_back(
133
0
          rocksdb::ColumnFamilyDescriptor(cf_name, options_));
134
0
      domain_set.insert(cf_name);
135
0
    }
136
137
    // To support osquery rollbacks, meaning running with a database
138
    // written/used by a newer version of osquery that introduced a new column
139
    // family, we need to open with all column families known by the database.
140
    // This is a limitation of RocksDB documented here:
141
    // https://github.com/facebook/rocksdb/wiki/Column-Families#reference.
142
    // "When opening a DB in a read-write mode, you need to specify all Column
143
    // Families that currently exist in a DB. If that's not the case, DB::Open
144
    // call will return Status::InvalidArgument()"
145
    //
146
    // Thus, we load all column families known by the database first and use
147
    // them in the rocksdb::DB::Open call.
148
0
    std::vector<std::string> column_families_in_db;
149
0
    auto s = rocksdb::DB::ListColumnFamilies(
150
0
        options_, path_, &column_families_in_db);
151
    // It is possible the DB doesn't exist yet, for "create if not
152
    // existing" case. The failure is ignored here. We rely on DB::Open()
153
    // to give us the correct error message for problem with opening
154
    // existing DB.
155
0
    if (s.ok()) {
156
0
      for (const auto& column_family_in_db : column_families_in_db) {
157
0
        if (domain_set.find(column_family_in_db) == domain_set.end()) {
158
0
          VLOG(1) << "Adding unknown column family from DB: "
159
0
                  << column_family_in_db;
160
0
          column_families_.push_back(
161
0
              rocksdb::ColumnFamilyDescriptor(column_family_in_db, options_));
162
0
        }
163
0
      }
164
0
    }
165
0
  }
166
167
  // Tests may trash calls to setUp, make sure subsequent calls do not leak.
168
0
  close();
169
170
  // Attempt to create a RocksDB instance and handles.
171
0
  auto s =
172
0
      rocksdb::DB::Open(options_, path_, column_families_, &handles_, &db_);
173
174
0
  if (s.IsCorruption()) {
175
    // The database is corrupt - try to repair it
176
0
    repairDB();
177
0
    s = rocksdb::DB::Open(options_, path_, column_families_, &handles_, &db_);
178
0
  }
179
180
0
  if (!s.ok() || db_ == nullptr) {
181
0
    LOG(INFO) << "Rocksdb open failed (" << static_cast<uint32_t>(s.code())
182
0
              << ":" << static_cast<uint32_t>(s.subcode()) << ") "
183
0
              << s.ToString();
184
    // A failed open in R/W mode is a runtime error.
185
0
    return Status(1, s.ToString());
186
0
  }
187
188
  // RocksDB may not create/append a directory with acceptable permissions.
189
0
  if (platformSetSafeDbPerms(path_) == false) {
190
0
    return Status(1, "Cannot set permissions on RocksDB path: " + path_);
191
0
  }
192
193
0
  for (const auto& cf_name : kDomains) {
194
0
    if (cf_name != kEvents) {
195
0
      auto compact_status = compactFiles(cf_name);
196
0
      if (!compact_status.ok()) {
197
0
        LOG(INFO) << "Cannot compact column family " << cf_name << ": "
198
0
                  << compact_status.getMessage();
199
0
      }
200
0
    }
201
0
  }
202
203
0
  return Status(0);
204
0
}
205
206
0
Status RocksDBDatabasePlugin::compactFiles(const std::string& domain) {
207
0
  auto handle = getHandleForColumnFamily(domain);
208
0
  if (handle == nullptr) {
209
0
    return Status::failure(1, "Handle does not exist");
210
0
  }
211
212
0
  rocksdb::ColumnFamilyMetaData cf_meta;
213
0
  db_->GetColumnFamilyMetaData(handle, &cf_meta);
214
215
0
  for (const auto& level : cf_meta.levels) {
216
0
    std::vector<std::string> input_file_names;
217
0
    for (const auto& file : level.files) {
218
0
      if (file.being_compacted) {
219
0
        return Status::success();
220
0
      }
221
0
      input_file_names.push_back(file.name);
222
0
    }
223
224
0
    if (!input_file_names.empty()) {
225
0
      auto s = db_->CompactFiles(
226
0
          rocksdb::CompactionOptions(), handle, input_file_names, level.level);
227
0
      if (!s.ok()) {
228
0
        return Status::failure(s.ToString());
229
0
      }
230
0
    }
231
0
  }
232
233
0
  db_->CompactRange(rocksdb::CompactRangeOptions(), handle, nullptr, nullptr);
234
235
0
  return Status::success();
236
0
}
237
238
0
void RocksDBDatabasePlugin::tearDown() {
239
0
  close();
240
0
}
241
242
2
void RocksDBDatabasePlugin::close() {
243
2
  WriteLock lock(close_mutex_);
244
2
  for (auto handle : handles_) {
245
0
    delete handle;
246
0
  }
247
2
  handles_.clear();
248
249
2
  if (db_ != nullptr) {
250
0
    delete db_;
251
0
    db_ = nullptr;
252
0
  }
253
254
2
  if (isCorrupted()) {
255
0
    repairDB();
256
0
    setCorrupted(false);
257
0
  }
258
2
}
259
260
2
bool RocksDBDatabasePlugin::isCorrupted() {
261
2
  return kRocksDBCorruptionIndicator;
262
2
}
263
264
0
void RocksDBDatabasePlugin::setCorrupted(bool corrupted) {
265
0
  kRocksDBCorruptionIndicator = corrupted;
266
0
}
267
268
0
void RocksDBDatabasePlugin::repairDB() {
269
  // Try to backup the existing database.
270
0
  auto bpath = path_ + ".backup";
271
0
  if (pathExists(bpath).ok()) {
272
0
    if (!removePath(bpath).ok()) {
273
0
      LOG(ERROR) << "Cannot remove previous RocksDB database backup: " << bpath;
274
0
      return;
275
0
    } else {
276
0
      LOG(WARNING) << "Removed previous RocksDB database backup: " << bpath;
277
0
    }
278
0
  }
279
280
0
  if (movePath(path_, bpath).ok()) {
281
0
    LOG(WARNING) << "Backing up RocksDB database: " << bpath;
282
0
  } else {
283
0
    LOG(ERROR) << "Cannot backup the RocksDB database: " << bpath;
284
0
    return;
285
0
  }
286
287
  // ROCKSDB_LITE does not have a RepairDB method.
288
0
  LOG(WARNING) << "Destroying RocksDB database due to corruption";
289
0
}
290
291
0
rocksdb::DB* RocksDBDatabasePlugin::getDB() const {
292
0
  return db_;
293
0
}
294
295
rocksdb::ColumnFamilyHandle* RocksDBDatabasePlugin::getHandleForColumnFamily(
296
0
    const std::string& cf) const {
297
0
  size_t i = std::find(kDomains.begin(), kDomains.end(), cf) - kDomains.begin();
298
0
  if (i != kDomains.size()) {
299
0
    return handles_[i];
300
0
  } else {
301
0
    return nullptr;
302
0
  }
303
0
}
304
305
Status RocksDBDatabasePlugin::get(const std::string& domain,
306
                                  const std::string& key,
307
0
                                  std::string& value) const {
308
0
  if (getDB() == nullptr) {
309
0
    return Status(1, "Database not opened");
310
0
  }
311
0
  auto cfh = getHandleForColumnFamily(domain);
312
0
  if (cfh == nullptr) {
313
0
    return Status(1, "Could not get column family for " + domain);
314
0
  }
315
0
  auto s = getDB()->Get(rocksdb::ReadOptions(), cfh, key, &value);
316
0
  return Status(s.code(), s.ToString());
317
0
}
318
319
Status RocksDBDatabasePlugin::get(const std::string& domain,
320
                                  const std::string& key,
321
0
                                  int& value) const {
322
0
  std::string result;
323
0
  auto s = this->get(domain, key, result);
324
0
  if (s.ok()) {
325
0
    auto expectedValue = tryTo<int>(result);
326
0
    if (expectedValue.isError()) {
327
0
      return Status::failure("Could not deserialize str to int");
328
0
    } else {
329
0
      value = expectedValue.take();
330
0
    }
331
0
  }
332
0
  return s;
333
0
}
334
Status RocksDBDatabasePlugin::put(const std::string& domain,
335
                                  const std::string& key,
336
0
                                  const std::string& value) {
337
0
  return putBatch(domain, {std::make_pair(key, value)});
338
0
}
339
340
0
inline bool skipWal(const std::string& domain) {
341
0
  return (kEvents == domain);
342
0
}
343
344
Status RocksDBDatabasePlugin::putBatch(const std::string& domain,
345
0
                                       const DatabaseStringValueList& data) {
346
0
  auto cfh = getHandleForColumnFamily(domain);
347
0
  if (cfh == nullptr) {
348
0
    return Status(1, "Could not get column family for " + domain);
349
0
  }
350
351
  // Events should be fast, and do not need to force syncs.
352
0
  auto options = rocksdb::WriteOptions();
353
0
  if (skipWal(domain)) {
354
0
    options.disableWAL = true;
355
0
  } else {
356
0
    options.sync = false;
357
0
  }
358
359
0
  rocksdb::WriteBatch batch;
360
0
  for (const auto& p : data) {
361
0
    const auto& key = p.first;
362
0
    const auto& value = p.second;
363
364
0
    batch.Put(cfh, key, value);
365
0
  }
366
367
0
  auto s = getDB()->Write(options, &batch);
368
0
  if (s.code() != 0 && s.IsIOError()) {
369
    // An error occurred, check if it is an IO error and remove the offending
370
    // specific filename or log name.
371
0
    std::string error_string = s.ToString();
372
0
    size_t error_pos = error_string.find_last_of(":");
373
0
    if (error_pos != std::string::npos) {
374
0
      return Status(s.code(), "IOError: " + error_string.substr(error_pos + 2));
375
0
    }
376
0
  }
377
378
0
  return Status(s.code(), s.ToString());
379
0
}
380
381
Status RocksDBDatabasePlugin::put(const std::string& domain,
382
                                  const std::string& key,
383
0
                                  int value) {
384
0
  return putBatch(domain, {std::make_pair(key, std::to_string(value))});
385
0
}
386
387
Status RocksDBDatabasePlugin::remove(const std::string& domain,
388
0
                                     const std::string& key) {
389
0
  auto cfh = getHandleForColumnFamily(domain);
390
0
  if (cfh == nullptr) {
391
0
    return Status(1, "Could not get column family for " + domain);
392
0
  }
393
0
  auto options = rocksdb::WriteOptions();
394
395
  // We could sync here, but large deletes will cause multi-syncs.
396
  // For example: event record expirations found in an expired index.
397
0
  if (skipWal(domain)) {
398
0
    options.disableWAL = true;
399
0
  } else {
400
0
    options.sync = false;
401
0
  }
402
0
  auto s = getDB()->Delete(options, cfh, key);
403
0
  return Status(s.code(), s.ToString());
404
0
}
405
406
Status RocksDBDatabasePlugin::removeRange(const std::string& domain,
407
                                          const std::string& low,
408
0
                                          const std::string& high) {
409
  // The new RocksDB version will return an error if our range
410
  // is not correct
411
0
  if (low > high) {
412
0
    return Status::failure("Invalid range: low > high");
413
0
  }
414
415
0
  auto cfh = getHandleForColumnFamily(domain);
416
0
  if (cfh == nullptr) {
417
0
    return Status(1, "Could not get column family for " + domain);
418
0
  }
419
0
  auto options = rocksdb::WriteOptions();
420
421
  // We could sync here, but large deletes will cause multi-syncs.
422
  // For example: event record expirations found in an expired index.
423
0
  if (skipWal(domain)) {
424
0
    options.disableWAL = true;
425
0
  } else {
426
0
    options.sync = false;
427
0
  }
428
0
  auto s = getDB()->DeleteRange(options, cfh, low, high);
429
0
  if (low <= high) {
430
0
    s = getDB()->Delete(options, cfh, high);
431
0
  }
432
0
  return Status(s.code(), s.ToString());
433
0
}
434
435
Status RocksDBDatabasePlugin::scan(const std::string& domain,
436
                                   std::vector<std::string>& results,
437
                                   const std::string& prefix,
438
0
                                   uint64_t max) const {
439
0
  if (getDB() == nullptr) {
440
0
    return Status(1, "Database not opened");
441
0
  }
442
443
0
  auto cfh = getHandleForColumnFamily(domain);
444
0
  if (cfh == nullptr) {
445
0
    return Status(1, "Could not get column family for " + domain);
446
0
  }
447
0
  auto options = rocksdb::ReadOptions();
448
0
  options.verify_checksums = false;
449
0
  options.fill_cache = false;
450
0
  auto it = getDB()->NewIterator(options, cfh);
451
0
  if (it == nullptr) {
452
0
    return Status(1, "Could not get iterator for " + domain);
453
0
  }
454
455
0
  size_t count = 0;
456
0
  for (it->SeekToFirst(); it->Valid(); it->Next()) {
457
0
    auto key = it->key().ToString();
458
0
    if (key.find(prefix) == 0) {
459
0
      results.push_back(std::move(key));
460
0
      if (max > 0 && ++count >= max) {
461
0
        break;
462
0
      }
463
0
    }
464
0
  }
465
0
  delete it;
466
0
  return Status::success();
467
0
}
468
} // namespace osquery