/src/osquery/plugins/database/rocksdb.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | /** |
2 | | * Copyright (c) 2014-present, The osquery authors |
3 | | * |
4 | | * This source code is licensed as defined by the LICENSE file found in the |
5 | | * root directory of this source tree. |
6 | | * |
7 | | * SPDX-License-Identifier: (Apache-2.0 OR GPL-2.0-only) |
8 | | */ |
9 | | |
10 | | #include <sys/stat.h> |
11 | | |
12 | | #include <rocksdb/db.h> |
13 | | #include <rocksdb/env.h> |
14 | | #include <rocksdb/options.h> |
15 | | |
16 | | #include <osquery/core/flags.h> |
17 | | #include <osquery/filesystem/fileops.h> |
18 | | #include <osquery/filesystem/filesystem.h> |
19 | | #include <osquery/logger/logger.h> |
20 | | #include <osquery/registry/registry_factory.h> |
21 | | #include <osquery/utils/conversions/tryto.h> |
22 | | #include <plugins/database/rocksdb.h> |
23 | | |
24 | | namespace fs = boost::filesystem; |
25 | | |
26 | | namespace osquery { |
27 | | |
28 | | /// Hidden flags created for internal stress testing. |
29 | | HIDDEN_FLAG(int32, rocksdb_write_buffer, 16, "Max write buffer number"); |
30 | | HIDDEN_FLAG(int32, rocksdb_merge_number, 4, "Min write buffer number to merge"); |
31 | | HIDDEN_FLAG(int32, rocksdb_background_flushes, 4, "Max background flushes"); |
32 | | HIDDEN_FLAG(int32, rocksdb_buffer_blocks, 256, "Write buffer blocks (4k)"); |
33 | | |
34 | | DECLARE_string(database_path); |
35 | | |
36 | | /** |
37 | | * @brief Track external systems marking the RocksDB database as corrupted. |
38 | | * |
39 | | * This can be set using the RocksDBDatabasePlugin's static methods. |
40 | | * The two primary external systems are the RocksDB logger plugin and tests. |
41 | | */ |
42 | | std::atomic<bool> kRocksDBCorruptionIndicator{false}; |
43 | | |
44 | | /// Backing-storage provider for osquery internal/core. |
45 | | REGISTER_INTERNAL(RocksDBDatabasePlugin, "database", "rocksdb"); |
46 | | |
47 | 0 | void GlogRocksDBLogger::Logv(const char* format, va_list ap) { |
48 | | // Convert RocksDB log to string and check if header or level-ed log. |
49 | 0 | std::string log_line; |
50 | 0 | { |
51 | 0 | char buffer[501] = {0}; |
52 | 0 | vsnprintf(buffer, 500, format, ap); |
53 | 0 | va_end(ap); |
54 | 0 | if (buffer[0] != '[' || (buffer[1] != 'E' && buffer[1] != 'W')) { |
55 | 0 | return; |
56 | 0 | } |
57 | | |
58 | 0 | log_line = buffer; |
59 | 0 | } |
60 | | |
61 | | // There is a spurious warning on first open. |
62 | 0 | if (log_line.find("Error when reading") == std::string::npos) { |
63 | | // RocksDB calls are non-reentrant. Since this callback is made in the |
64 | | // context of a RocksDB API call, turn log forwarding off to prevent the |
65 | | // logger from trying to make a call back into RocksDB and causing a |
66 | | // deadlock. |
67 | 0 | LOG(INFO) << "RocksDB: " << log_line; |
68 | 0 | } |
69 | | |
70 | | // If the callback includes 'Corruption' then set the corruption indicator. |
71 | 0 | if (log_line.find("Corruption:") != std::string::npos) { |
72 | 0 | RocksDBDatabasePlugin::setCorrupted(); |
73 | 0 | } |
74 | 0 | } |
75 | | |
76 | 0 | Status RocksDBDatabasePlugin::setUp() { |
77 | 0 | if (!allowOpen()) { |
78 | 0 | LOG(WARNING) << RLOG(1629) << "Not allowed to set up database plugin"; |
79 | 0 | } |
80 | | |
81 | | // Consume the current settings. |
82 | | // A configuration update may change them, but that does not affect state. |
83 | 0 | path_ = fs::path(FLAGS_database_path).make_preferred().string(); |
84 | |
|
85 | 0 | if (pathExists(path_).ok() && !isReadable(path_).ok()) { |
86 | 0 | return Status(1, "Cannot read RocksDB path: " + path_); |
87 | 0 | } |
88 | | |
89 | 0 | if (!checkingDB()) { |
90 | 0 | VLOG(1) << "Opening RocksDB handle: " << path_; |
91 | 0 | } |
92 | |
|
93 | 0 | if (!initialized_) { |
94 | 0 | initialized_ = true; |
95 | | |
96 | | // Set meta-data (mostly) handling options. |
97 | 0 | options_.create_if_missing = true; |
98 | 0 | options_.create_missing_column_families = true; |
99 | 0 | options_.info_log_level = rocksdb::ERROR_LEVEL; |
100 | 0 | options_.log_file_time_to_roll = 0; |
101 | 0 | options_.keep_log_file_num = 10; |
102 | 0 | options_.max_log_file_size = 1024 * 1024 * 1; |
103 | 0 | options_.max_open_files = 128; |
104 | 0 | options_.stats_dump_period_sec = 0; |
105 | 0 | options_.max_manifest_file_size = 1024 * 500; |
106 | | |
107 | | // Performance and optimization settings. |
108 | | // Use rocksdb::kZSTD to use ZSTD database compression |
109 | 0 | options_.compression = rocksdb::kNoCompression; |
110 | 0 | options_.compaction_style = rocksdb::kCompactionStyleLevel; |
111 | 0 | options_.arena_block_size = (4 * 1024); |
112 | 0 | options_.write_buffer_size = (4 * 1024) * FLAGS_rocksdb_buffer_blocks; |
113 | 0 | options_.max_write_buffer_number = |
114 | 0 | static_cast<int>(FLAGS_rocksdb_write_buffer); |
115 | 0 | options_.min_write_buffer_number_to_merge = |
116 | 0 | static_cast<int>(FLAGS_rocksdb_merge_number); |
117 | 0 | options_.max_background_flushes = |
118 | 0 | static_cast<int>(FLAGS_rocksdb_background_flushes); |
119 | | |
120 | | // Create an environment to replace the default logger. |
121 | 0 | if (logger_ == nullptr) { |
122 | 0 | logger_ = std::make_shared<GlogRocksDBLogger>(); |
123 | 0 | } |
124 | 0 | options_.info_log = logger_; |
125 | |
|
126 | 0 | std::set<std::string> domain_set; |
127 | 0 | column_families_.push_back(rocksdb::ColumnFamilyDescriptor( |
128 | 0 | rocksdb::kDefaultColumnFamilyName, options_)); |
129 | 0 | domain_set.insert(rocksdb::kDefaultColumnFamilyName); |
130 | |
|
131 | 0 | for (const auto& cf_name : kDomains) { |
132 | 0 | column_families_.push_back( |
133 | 0 | rocksdb::ColumnFamilyDescriptor(cf_name, options_)); |
134 | 0 | domain_set.insert(cf_name); |
135 | 0 | } |
136 | | |
137 | | // To support osquery rollbacks, meaning running with a database |
138 | | // written/used by a newer version of osquery that introduced a new column |
139 | | // family, we need to open with all column families known by the database. |
140 | | // This is a limitation of RocksDB documented here: |
141 | | // https://github.com/facebook/rocksdb/wiki/Column-Families#reference. |
142 | | // "When opening a DB in a read-write mode, you need to specify all Column |
143 | | // Families that currently exist in a DB. If that's not the case, DB::Open |
144 | | // call will return Status::InvalidArgument()" |
145 | | // |
146 | | // Thus, we load all column families known by the database first and use |
147 | | // them in the rocksdb::DB::Open call. |
148 | 0 | std::vector<std::string> column_families_in_db; |
149 | 0 | auto s = rocksdb::DB::ListColumnFamilies( |
150 | 0 | options_, path_, &column_families_in_db); |
151 | | // It is possible the DB doesn't exist yet, for "create if not |
152 | | // existing" case. The failure is ignored here. We rely on DB::Open() |
153 | | // to give us the correct error message for problem with opening |
154 | | // existing DB. |
155 | 0 | if (s.ok()) { |
156 | 0 | for (const auto& column_family_in_db : column_families_in_db) { |
157 | 0 | if (domain_set.find(column_family_in_db) == domain_set.end()) { |
158 | 0 | VLOG(1) << "Adding unknown column family from DB: " |
159 | 0 | << column_family_in_db; |
160 | 0 | column_families_.push_back( |
161 | 0 | rocksdb::ColumnFamilyDescriptor(column_family_in_db, options_)); |
162 | 0 | } |
163 | 0 | } |
164 | 0 | } |
165 | 0 | } |
166 | | |
167 | | // Tests may trash calls to setUp, make sure subsequent calls do not leak. |
168 | 0 | close(); |
169 | | |
170 | | // Attempt to create a RocksDB instance and handles. |
171 | 0 | auto s = |
172 | 0 | rocksdb::DB::Open(options_, path_, column_families_, &handles_, &db_); |
173 | |
|
174 | 0 | if (s.IsCorruption()) { |
175 | | // The database is corrupt - try to repair it |
176 | 0 | repairDB(); |
177 | 0 | s = rocksdb::DB::Open(options_, path_, column_families_, &handles_, &db_); |
178 | 0 | } |
179 | |
|
180 | 0 | if (!s.ok() || db_ == nullptr) { |
181 | 0 | LOG(INFO) << "Rocksdb open failed (" << static_cast<uint32_t>(s.code()) |
182 | 0 | << ":" << static_cast<uint32_t>(s.subcode()) << ") " |
183 | 0 | << s.ToString(); |
184 | | // A failed open in R/W mode is a runtime error. |
185 | 0 | return Status(1, s.ToString()); |
186 | 0 | } |
187 | | |
188 | | // RocksDB may not create/append a directory with acceptable permissions. |
189 | 0 | if (platformSetSafeDbPerms(path_) == false) { |
190 | 0 | return Status(1, "Cannot set permissions on RocksDB path: " + path_); |
191 | 0 | } |
192 | | |
193 | 0 | for (const auto& cf_name : kDomains) { |
194 | 0 | if (cf_name != kEvents) { |
195 | 0 | auto compact_status = compactFiles(cf_name); |
196 | 0 | if (!compact_status.ok()) { |
197 | 0 | LOG(INFO) << "Cannot compact column family " << cf_name << ": " |
198 | 0 | << compact_status.getMessage(); |
199 | 0 | } |
200 | 0 | } |
201 | 0 | } |
202 | |
|
203 | 0 | return Status(0); |
204 | 0 | } |
205 | | |
206 | 0 | Status RocksDBDatabasePlugin::compactFiles(const std::string& domain) { |
207 | 0 | auto handle = getHandleForColumnFamily(domain); |
208 | 0 | if (handle == nullptr) { |
209 | 0 | return Status::failure(1, "Handle does not exist"); |
210 | 0 | } |
211 | | |
212 | 0 | rocksdb::ColumnFamilyMetaData cf_meta; |
213 | 0 | db_->GetColumnFamilyMetaData(handle, &cf_meta); |
214 | |
|
215 | 0 | for (const auto& level : cf_meta.levels) { |
216 | 0 | std::vector<std::string> input_file_names; |
217 | 0 | for (const auto& file : level.files) { |
218 | 0 | if (file.being_compacted) { |
219 | 0 | return Status::success(); |
220 | 0 | } |
221 | 0 | input_file_names.push_back(file.name); |
222 | 0 | } |
223 | | |
224 | 0 | if (!input_file_names.empty()) { |
225 | 0 | auto s = db_->CompactFiles( |
226 | 0 | rocksdb::CompactionOptions(), handle, input_file_names, level.level); |
227 | 0 | if (!s.ok()) { |
228 | 0 | return Status::failure(s.ToString()); |
229 | 0 | } |
230 | 0 | } |
231 | 0 | } |
232 | | |
233 | 0 | db_->CompactRange(rocksdb::CompactRangeOptions(), handle, nullptr, nullptr); |
234 | |
|
235 | 0 | return Status::success(); |
236 | 0 | } |
237 | | |
238 | 0 | void RocksDBDatabasePlugin::tearDown() { |
239 | 0 | close(); |
240 | 0 | } |
241 | | |
242 | 2 | void RocksDBDatabasePlugin::close() { |
243 | 2 | WriteLock lock(close_mutex_); |
244 | 2 | for (auto handle : handles_) { |
245 | 0 | delete handle; |
246 | 0 | } |
247 | 2 | handles_.clear(); |
248 | | |
249 | 2 | if (db_ != nullptr) { |
250 | 0 | delete db_; |
251 | 0 | db_ = nullptr; |
252 | 0 | } |
253 | | |
254 | 2 | if (isCorrupted()) { |
255 | 0 | repairDB(); |
256 | 0 | setCorrupted(false); |
257 | 0 | } |
258 | 2 | } |
259 | | |
260 | 2 | bool RocksDBDatabasePlugin::isCorrupted() { |
261 | 2 | return kRocksDBCorruptionIndicator; |
262 | 2 | } |
263 | | |
264 | 0 | void RocksDBDatabasePlugin::setCorrupted(bool corrupted) { |
265 | 0 | kRocksDBCorruptionIndicator = corrupted; |
266 | 0 | } |
267 | | |
268 | 0 | void RocksDBDatabasePlugin::repairDB() { |
269 | | // Try to backup the existing database. |
270 | 0 | auto bpath = path_ + ".backup"; |
271 | 0 | if (pathExists(bpath).ok()) { |
272 | 0 | if (!removePath(bpath).ok()) { |
273 | 0 | LOG(ERROR) << "Cannot remove previous RocksDB database backup: " << bpath; |
274 | 0 | return; |
275 | 0 | } else { |
276 | 0 | LOG(WARNING) << "Removed previous RocksDB database backup: " << bpath; |
277 | 0 | } |
278 | 0 | } |
279 | | |
280 | 0 | if (movePath(path_, bpath).ok()) { |
281 | 0 | LOG(WARNING) << "Backing up RocksDB database: " << bpath; |
282 | 0 | } else { |
283 | 0 | LOG(ERROR) << "Cannot backup the RocksDB database: " << bpath; |
284 | 0 | return; |
285 | 0 | } |
286 | | |
287 | | // ROCKSDB_LITE does not have a RepairDB method. |
288 | 0 | LOG(WARNING) << "Destroying RocksDB database due to corruption"; |
289 | 0 | } |
290 | | |
291 | 0 | rocksdb::DB* RocksDBDatabasePlugin::getDB() const { |
292 | 0 | return db_; |
293 | 0 | } |
294 | | |
295 | | rocksdb::ColumnFamilyHandle* RocksDBDatabasePlugin::getHandleForColumnFamily( |
296 | 0 | const std::string& cf) const { |
297 | 0 | size_t i = std::find(kDomains.begin(), kDomains.end(), cf) - kDomains.begin(); |
298 | 0 | if (i != kDomains.size()) { |
299 | 0 | return handles_[i]; |
300 | 0 | } else { |
301 | 0 | return nullptr; |
302 | 0 | } |
303 | 0 | } |
304 | | |
305 | | Status RocksDBDatabasePlugin::get(const std::string& domain, |
306 | | const std::string& key, |
307 | 0 | std::string& value) const { |
308 | 0 | if (getDB() == nullptr) { |
309 | 0 | return Status(1, "Database not opened"); |
310 | 0 | } |
311 | 0 | auto cfh = getHandleForColumnFamily(domain); |
312 | 0 | if (cfh == nullptr) { |
313 | 0 | return Status(1, "Could not get column family for " + domain); |
314 | 0 | } |
315 | 0 | auto s = getDB()->Get(rocksdb::ReadOptions(), cfh, key, &value); |
316 | 0 | return Status(s.code(), s.ToString()); |
317 | 0 | } |
318 | | |
319 | | Status RocksDBDatabasePlugin::get(const std::string& domain, |
320 | | const std::string& key, |
321 | 0 | int& value) const { |
322 | 0 | std::string result; |
323 | 0 | auto s = this->get(domain, key, result); |
324 | 0 | if (s.ok()) { |
325 | 0 | auto expectedValue = tryTo<int>(result); |
326 | 0 | if (expectedValue.isError()) { |
327 | 0 | return Status::failure("Could not deserialize str to int"); |
328 | 0 | } else { |
329 | 0 | value = expectedValue.take(); |
330 | 0 | } |
331 | 0 | } |
332 | 0 | return s; |
333 | 0 | } |
334 | | Status RocksDBDatabasePlugin::put(const std::string& domain, |
335 | | const std::string& key, |
336 | 0 | const std::string& value) { |
337 | 0 | return putBatch(domain, {std::make_pair(key, value)}); |
338 | 0 | } |
339 | | |
340 | 0 | inline bool skipWal(const std::string& domain) { |
341 | 0 | return (kEvents == domain); |
342 | 0 | } |
343 | | |
344 | | Status RocksDBDatabasePlugin::putBatch(const std::string& domain, |
345 | 0 | const DatabaseStringValueList& data) { |
346 | 0 | auto cfh = getHandleForColumnFamily(domain); |
347 | 0 | if (cfh == nullptr) { |
348 | 0 | return Status(1, "Could not get column family for " + domain); |
349 | 0 | } |
350 | | |
351 | | // Events should be fast, and do not need to force syncs. |
352 | 0 | auto options = rocksdb::WriteOptions(); |
353 | 0 | if (skipWal(domain)) { |
354 | 0 | options.disableWAL = true; |
355 | 0 | } else { |
356 | 0 | options.sync = false; |
357 | 0 | } |
358 | |
|
359 | 0 | rocksdb::WriteBatch batch; |
360 | 0 | for (const auto& p : data) { |
361 | 0 | const auto& key = p.first; |
362 | 0 | const auto& value = p.second; |
363 | |
|
364 | 0 | batch.Put(cfh, key, value); |
365 | 0 | } |
366 | |
|
367 | 0 | auto s = getDB()->Write(options, &batch); |
368 | 0 | if (s.code() != 0 && s.IsIOError()) { |
369 | | // An error occurred, check if it is an IO error and remove the offending |
370 | | // specific filename or log name. |
371 | 0 | std::string error_string = s.ToString(); |
372 | 0 | size_t error_pos = error_string.find_last_of(":"); |
373 | 0 | if (error_pos != std::string::npos) { |
374 | 0 | return Status(s.code(), "IOError: " + error_string.substr(error_pos + 2)); |
375 | 0 | } |
376 | 0 | } |
377 | | |
378 | 0 | return Status(s.code(), s.ToString()); |
379 | 0 | } |
380 | | |
381 | | Status RocksDBDatabasePlugin::put(const std::string& domain, |
382 | | const std::string& key, |
383 | 0 | int value) { |
384 | 0 | return putBatch(domain, {std::make_pair(key, std::to_string(value))}); |
385 | 0 | } |
386 | | |
387 | | Status RocksDBDatabasePlugin::remove(const std::string& domain, |
388 | 0 | const std::string& key) { |
389 | 0 | auto cfh = getHandleForColumnFamily(domain); |
390 | 0 | if (cfh == nullptr) { |
391 | 0 | return Status(1, "Could not get column family for " + domain); |
392 | 0 | } |
393 | 0 | auto options = rocksdb::WriteOptions(); |
394 | | |
395 | | // We could sync here, but large deletes will cause multi-syncs. |
396 | | // For example: event record expirations found in an expired index. |
397 | 0 | if (skipWal(domain)) { |
398 | 0 | options.disableWAL = true; |
399 | 0 | } else { |
400 | 0 | options.sync = false; |
401 | 0 | } |
402 | 0 | auto s = getDB()->Delete(options, cfh, key); |
403 | 0 | return Status(s.code(), s.ToString()); |
404 | 0 | } |
405 | | |
406 | | Status RocksDBDatabasePlugin::removeRange(const std::string& domain, |
407 | | const std::string& low, |
408 | 0 | const std::string& high) { |
409 | | // The new RocksDB version will return an error if our range |
410 | | // is not correct |
411 | 0 | if (low > high) { |
412 | 0 | return Status::failure("Invalid range: low > high"); |
413 | 0 | } |
414 | | |
415 | 0 | auto cfh = getHandleForColumnFamily(domain); |
416 | 0 | if (cfh == nullptr) { |
417 | 0 | return Status(1, "Could not get column family for " + domain); |
418 | 0 | } |
419 | 0 | auto options = rocksdb::WriteOptions(); |
420 | | |
421 | | // We could sync here, but large deletes will cause multi-syncs. |
422 | | // For example: event record expirations found in an expired index. |
423 | 0 | if (skipWal(domain)) { |
424 | 0 | options.disableWAL = true; |
425 | 0 | } else { |
426 | 0 | options.sync = false; |
427 | 0 | } |
428 | 0 | auto s = getDB()->DeleteRange(options, cfh, low, high); |
429 | 0 | if (low <= high) { |
430 | 0 | s = getDB()->Delete(options, cfh, high); |
431 | 0 | } |
432 | 0 | return Status(s.code(), s.ToString()); |
433 | 0 | } |
434 | | |
435 | | Status RocksDBDatabasePlugin::scan(const std::string& domain, |
436 | | std::vector<std::string>& results, |
437 | | const std::string& prefix, |
438 | 0 | uint64_t max) const { |
439 | 0 | if (getDB() == nullptr) { |
440 | 0 | return Status(1, "Database not opened"); |
441 | 0 | } |
442 | | |
443 | 0 | auto cfh = getHandleForColumnFamily(domain); |
444 | 0 | if (cfh == nullptr) { |
445 | 0 | return Status(1, "Could not get column family for " + domain); |
446 | 0 | } |
447 | 0 | auto options = rocksdb::ReadOptions(); |
448 | 0 | options.verify_checksums = false; |
449 | 0 | options.fill_cache = false; |
450 | 0 | auto it = getDB()->NewIterator(options, cfh); |
451 | 0 | if (it == nullptr) { |
452 | 0 | return Status(1, "Could not get iterator for " + domain); |
453 | 0 | } |
454 | | |
455 | 0 | size_t count = 0; |
456 | 0 | for (it->SeekToFirst(); it->Valid(); it->Next()) { |
457 | 0 | auto key = it->key().ToString(); |
458 | 0 | if (key.find(prefix) == 0) { |
459 | 0 | results.push_back(std::move(key)); |
460 | 0 | if (max > 0 && ++count >= max) { |
461 | 0 | break; |
462 | 0 | } |
463 | 0 | } |
464 | 0 | } |
465 | 0 | delete it; |
466 | 0 | return Status::success(); |
467 | 0 | } |
468 | | } // namespace osquery |