Coverage Report

Created: 2025-01-09 07:36

/src/osquery/osquery/events/eventfactory.cpp
Line
Count
Source (jump to first uncovered line)
1
/**
2
 * Copyright (c) 2014-present, The osquery authors
3
 *
4
 * This source code is licensed as defined by the LICENSE file found in the
5
 * root directory of this source tree.
6
 *
7
 * SPDX-License-Identifier: (Apache-2.0 OR GPL-2.0-only)
8
 */
9
10
#include <osquery/config/config.h>
11
#include <osquery/core/flags.h>
12
#include <osquery/core/system.h>
13
#include <osquery/events/eventfactory.h>
14
#include <osquery/events/eventsubscriber.h>
15
#include <osquery/logger/logger.h>
16
#include <osquery/registry/registry.h>
17
#include <osquery/sql/sql.h>
18
19
namespace osquery {
20
21
namespace {
22
23
/**
24
 * @brief Details for each subscriber as it relates to the schedule.
25
 *
26
 * This is populated for each configuration update by scanning the schedule.
27
 */
28
struct SubscriberExpirationDetails {
29
 public:
30
  /// The max internal is the minimum wait time for expiring subscriber data.
31
  size_t max_interval{0};
32
33
  /// The number of queries that should run between intervals.
34
  size_t query_count{0};
35
};
36
37
} // namespace
38
39
FLAG(bool, disable_events, false, "Disable osquery publish/subscribe system");
40
41
// There's no reason for the event factory to keep multiple instances.
42
14
EventFactory& EventFactory::getInstance() {
43
14
  static EventFactory ef;
44
14
  return ef;
45
14
}
46
47
0
Status EventFactory::registerEventSubscriber(const PluginRef& sub) {
48
0
  auto base_sub = std::dynamic_pointer_cast<EventSubscriberPlugin>(sub);
49
50
0
  if (base_sub == nullptr) {
51
0
    return Status::failure("Invalid subscriber type: " + sub->getName());
52
0
  }
53
54
  // The config may use an "events" key to explicitly enabled or disable
55
  // event subscribers. See EventSubscriber::disable.
56
0
  auto name = base_sub->getName();
57
0
  if (name.empty()) {
58
    // This subscriber did not override its name.
59
0
    return Status::failure("Subscribers must have set a name");
60
0
  }
61
62
0
  auto plugin = Config::get().getParser("events");
63
0
  if (plugin != nullptr && plugin.get() != nullptr) {
64
0
    const auto& data = plugin->getData().doc();
65
    // First perform explicit enabling.
66
0
    if (data["events"].HasMember("enable_subscribers") &&
67
0
        data["events"]["enable_subscribers"].IsArray()) {
68
0
      for (const auto& item : data["events"]["enable_subscribers"].GetArray()) {
69
0
        if (item.GetString() == name) {
70
0
          VLOG(1) << "Enabling event subscriber: " << name;
71
0
          base_sub->disabled = false;
72
0
        }
73
0
      }
74
0
    }
75
    // Then use explicit disabling as an ultimate override.
76
0
    if (data["events"].HasMember("disable_subscribers") &&
77
0
        data["events"]["disable_subscribers"].IsArray()) {
78
0
      for (const auto& item :
79
0
           data["events"]["disable_subscribers"].GetArray()) {
80
0
        if (item.GetString() == name) {
81
0
          VLOG(1) << "Disabling event subscriber: " << name;
82
0
          base_sub->disabled = true;
83
0
        }
84
0
      }
85
0
    }
86
0
  }
87
88
0
  if (base_sub->state() != EventState::EVENT_NONE) {
89
0
    base_sub->tearDown();
90
0
  }
91
92
  // Allow subscribers a configure-time setup to determine if they should run.
93
0
  auto status = base_sub->setUp();
94
0
  if (!status) {
95
0
    base_sub->disabled = true;
96
0
  }
97
0
  base_sub->state(EventState::EVENT_SETUP);
98
99
  // Let the subscriber initialize any Subscriptions.
100
0
  if (!FLAGS_disable_events && !base_sub->disabled) {
101
0
    status = base_sub->init();
102
0
    base_sub->state(EventState::EVENT_RUNNING);
103
0
  } else {
104
0
    base_sub->state(EventState::EVENT_PAUSED);
105
0
  }
106
107
0
  auto& ef = EventFactory::getInstance();
108
0
  {
109
0
    RecursiveLock lock(ef.factory_lock_);
110
0
    ef.event_subs_[name] = base_sub;
111
0
  }
112
113
  // Set state of subscriber.
114
0
  if (!status.ok()) {
115
0
    base_sub->state(EventState::EVENT_FAILED);
116
0
    return Status::failure(status.getMessage());
117
0
  } else {
118
0
    return Status::success();
119
0
  }
120
0
}
121
122
0
Status EventFactory::registerEventPublisher(const PluginRef& pub) {
123
0
  auto base_pub = std::dynamic_pointer_cast<EventPublisherPlugin>(pub);
124
125
0
  if (base_pub == nullptr) {
126
0
    return Status::failure("Invalid publisher type: " + pub->getName());
127
0
  }
128
129
0
  auto type_id = base_pub->type();
130
0
  if (type_id.empty()) {
131
    // This publisher did not override its name.
132
0
    return Status::failure("Publishers must have a type");
133
0
  }
134
135
0
  auto& ef = EventFactory::getInstance();
136
0
  {
137
0
    RecursiveLock lock(ef.factory_lock_);
138
0
    if (ef.event_pubs_.count(type_id) != 0) {
139
      // This is a duplicate event publisher.
140
0
      return Status::failure("Duplicate publisher type");
141
0
    }
142
143
0
    ef.event_pubs_[type_id] = base_pub;
144
0
  }
145
146
  // Do not set up event publisher if events are disabled.
147
0
  if (!FLAGS_disable_events) {
148
0
    if (base_pub->state() != EventState::EVENT_NONE) {
149
0
      base_pub->tearDown();
150
0
    }
151
152
0
    auto status = base_pub->setUp();
153
0
    base_pub->state(EventState::EVENT_SETUP);
154
0
    if (!status.ok()) {
155
      // Only start event loop if setUp succeeds.
156
0
      LOG(INFO) << "Event publisher not enabled: " << type_id << ": "
157
0
                << status.what();
158
0
      base_pub->isEnding(true);
159
0
      return status;
160
0
    }
161
0
  }
162
163
0
  return Status::success();
164
0
}
165
166
0
Status EventFactory::deregisterEventSubscriber(const std::string& sub) {
167
0
  auto& ef = EventFactory::getInstance();
168
169
0
  RecursiveLock lock(ef.factory_lock_);
170
171
0
  auto subscriber_it = ef.event_subs_.find(sub);
172
0
  if (subscriber_it == ef.event_subs_.end()) {
173
0
    return Status::failure("Event subscriber is missing");
174
0
  }
175
176
0
  auto subscriber = subscriber_it->second;
177
0
  ef.event_subs_.erase(subscriber_it);
178
179
0
  subscriber->tearDown();
180
0
  subscriber->state(EventState::EVENT_NONE);
181
182
0
  return Status(0);
183
0
}
184
185
Status EventFactory::addSubscription(const std::string& type_id,
186
                                     const std::string& name_id,
187
                                     const SubscriptionContextRef& mc,
188
0
                                     EventCallback cb) {
189
0
  auto subscription = Subscription::create(name_id, mc, cb);
190
0
  return EventFactory::addSubscription(type_id, subscription);
191
0
}
192
193
Status EventFactory::addSubscription(const std::string& type_id,
194
0
                                     const SubscriptionRef& subscription) {
195
0
  EventPublisherRef publisher = getInstance().getEventPublisher(type_id);
196
0
  if (publisher == nullptr) {
197
0
    return Status::failure("Unknown event publisher");
198
0
  }
199
200
  // The event factory is responsible for configuring the event types.
201
0
  return publisher->addSubscription(subscription);
202
0
}
203
204
0
size_t EventFactory::numSubscriptions(const std::string& type_id) {
205
0
  EventPublisherRef publisher;
206
0
  try {
207
0
    publisher = EventFactory::getInstance().getEventPublisher(type_id);
208
0
  } catch (std::out_of_range& /* e */) {
209
0
    return 0;
210
0
  }
211
0
  if (publisher == nullptr) {
212
0
    return 0;
213
0
  }
214
0
  return publisher->numSubscriptions();
215
0
}
216
217
0
size_t EventFactory::numEventPublishers() {
218
0
  return EventFactory::getInstance().event_pubs_.size();
219
0
}
220
221
0
Status EventFactory::deregisterEventPublisher(const EventPublisherRef& pub) {
222
0
  return EventFactory::deregisterEventPublisher(pub->type());
223
0
}
224
225
0
Status EventFactory::deregisterEventPublisher(const std::string& type_id) {
226
0
  auto& ef = EventFactory::getInstance();
227
228
0
  RecursiveLock lock(ef.factory_lock_);
229
0
  EventPublisherRef publisher = ef.getEventPublisher(type_id);
230
0
  if (publisher == nullptr) {
231
0
    return Status::failure("No event publisher to deregister");
232
0
  }
233
234
0
  if (!FLAGS_disable_events) {
235
0
    publisher->isEnding(true);
236
0
    if (!publisher->hasStarted()) {
237
      // If a publisher's run loop was not started, call tearDown since
238
      // the setUp happened at publisher registration time.
239
0
      publisher->tearDown();
240
0
      publisher->state(EventState::EVENT_NONE);
241
      // If the run loop did run the tear down and erase will happen in the
242
      // event thread wrapper when isEnding is next checked.
243
0
      ef.event_pubs_.erase(type_id);
244
0
    } else {
245
0
      publisher->stop();
246
0
    }
247
0
  }
248
0
  return Status::success();
249
0
}
250
251
0
EventPublisherRef EventFactory::getEventPublisher(const std::string& type_id) {
252
0
  auto& ef = EventFactory::getInstance();
253
254
0
  RecursiveLock lock(ef.factory_lock_);
255
0
  if (ef.event_pubs_.count(type_id) == 0) {
256
0
    LOG(ERROR) << "Requested unknown/failed event publisher: " << type_id;
257
0
    return nullptr;
258
0
  }
259
0
  return ef.event_pubs_.at(type_id);
260
0
}
261
262
EventSubscriberRef EventFactory::getEventSubscriber(
263
0
    const std::string& name_id) {
264
0
  auto& ef = EventFactory::getInstance();
265
266
0
  RecursiveLock lock(ef.factory_lock_);
267
0
  if (!exists(name_id)) {
268
0
    LOG(ERROR) << "Requested unknown event subscriber: " << name_id;
269
0
    return nullptr;
270
0
  }
271
0
  return ef.event_subs_.at(name_id);
272
0
}
273
274
2
bool EventFactory::exists(const std::string& name_id) {
275
2
  return (getInstance().event_subs_.count(name_id) > 0);
276
2
}
277
278
3
std::set<std::string> EventFactory::publisherTypes() {
279
3
  RecursiveLock lock(getInstance().factory_lock_);
280
3
  std::set<std::string> types;
281
3
  for (const auto& publisher : getInstance().event_pubs_) {
282
0
    types.insert(publisher.first);
283
0
  }
284
3
  return types;
285
3
}
286
287
3
std::set<std::string> EventFactory::subscriberNames() {
288
3
  RecursiveLock lock(getInstance().factory_lock_);
289
3
  std::set<std::string> names;
290
3
  for (const auto& subscriber : getInstance().event_subs_) {
291
0
    names.insert(subscriber.first);
292
0
  }
293
3
  return names;
294
3
}
295
296
0
void EventFactory::addForwarder(const std::string& logger) {
297
0
  getInstance().loggers_.push_back(logger);
298
0
}
299
300
0
void EventFactory::forwardEvent(const std::string& event) {
301
0
  for (const auto& logger : getInstance().loggers_) {
302
0
    Registry::call("logger", logger, {{"event", event}});
303
0
  }
304
0
}
305
306
0
void EventFactory::configUpdate() {
307
  // Scan the schedule for queries that touch "_events" tables.
308
  // We will count the queries
309
0
  std::map<std::string, SubscriberExpirationDetails> subscriber_details;
310
311
0
  Config::get().scheduledQueries(
312
0
      [&subscriber_details](std::string name, const ScheduledQuery& query) {
313
0
        std::vector<std::string> tables;
314
        // Convert query string into a list of virtual tables effected.
315
0
        if (!getQueryTables(query.query, tables)) {
316
0
          VLOG(1) << "Cannot get tables from query: " << name;
317
0
          return;
318
0
        }
319
320
        // Remove duplicates and select only the subscriber tables.
321
0
        std::set<std::string> subscribers;
322
0
        for (const auto& table : tables) {
323
0
          if (Registry::get().exists("event_subscriber", table)) {
324
0
            subscribers.insert(table);
325
0
          }
326
0
        }
327
328
0
        for (const auto& subscriber : subscribers) {
329
0
          auto& details = subscriber_details[subscriber];
330
0
          details.max_interval = (query.interval > details.max_interval)
331
0
                                     ? query.interval
332
0
                                     : details.max_interval;
333
0
          details.query_count++;
334
0
        }
335
0
      });
336
337
0
  auto& ef = EventFactory::getInstance();
338
0
  for (const auto& details : subscriber_details) {
339
0
    if (!ef.exists(details.first)) {
340
0
      continue;
341
0
    }
342
343
0
    RecursiveLock lock(ef.factory_lock_);
344
0
    auto subscriber = ef.getEventSubscriber(details.first);
345
0
    auto min_expiry = details.second.max_interval * 3;
346
0
    min_expiry += (60 - (min_expiry % 60));
347
0
    subscriber->setMinExpiry(min_expiry);
348
349
    // Emit a warning for each subscriber affected by the small expiration.
350
0
    auto expiry = subscriber->getEventsExpiry();
351
0
    if (expiry > 0 && min_expiry > expiry) {
352
0
      LOG(INFO) << "The minimum events expiration timeout for "
353
0
                << subscriber->getName()
354
0
                << " has been adjusted: " << min_expiry;
355
0
    }
356
0
    subscriber->resetQueryCount(details.second.query_count);
357
0
  }
358
359
  // If events are enabled configure the subscribers before publishers.
360
0
  if (!FLAGS_disable_events) {
361
0
    RegistryFactory::get().registry("event_subscriber")->configure();
362
0
    RegistryFactory::get().registry("event_publisher")->configure();
363
0
  }
364
0
}
365
366
0
Status EventFactory::run(const std::string& type_id) {
367
0
  if (FLAGS_disable_events) {
368
0
    return Status::success();
369
0
  }
370
371
  // An interesting take on an event dispatched entrypoint.
372
  // There is little introspection into the event type.
373
  // Assume it can either make use of an entrypoint poller/selector or
374
  // take care of async callback registrations in setUp/configure/run
375
  // only once and handle event queuing/firing in callbacks.
376
0
  EventPublisherRef publisher = nullptr;
377
0
  {
378
0
    auto& ef = EventFactory::getInstance();
379
0
    RecursiveLock lock(ef.factory_lock_);
380
0
    publisher = ef.getEventPublisher(type_id);
381
0
  }
382
383
0
  if (publisher == nullptr) {
384
0
    return Status::failure("Event publisher is missing");
385
0
  } else if (publisher->hasStarted()) {
386
0
    return Status::failure("Cannot restart an event publisher");
387
0
  }
388
389
0
  setThreadName(publisher->name());
390
0
  VLOG(1) << "Starting event publisher run loop: " + type_id;
391
0
  publisher->hasStarted(true);
392
0
  publisher->state(EventState::EVENT_RUNNING);
393
394
0
  auto status = Status(0, "OK");
395
0
  while (!publisher->isEnding()) {
396
    // Can optionally implement a global cooloff latency here.
397
0
    status = publisher->run();
398
0
    if (!status.ok()) {
399
0
      break;
400
0
    }
401
0
    publisher->restart_count_++;
402
    // This is a 'default' cool-off implemented in InterruptibleRunnable.
403
    // If a publisher fails to perform some sort of interruption point, this
404
    // prevents the thread from thrashing through exiting checks.
405
0
    publisher->pause(std::chrono::milliseconds(200));
406
0
  }
407
408
0
  if (!status.ok()) {
409
    // The runloop status is not reflective of the event type's.
410
0
    VLOG(1) << "Event publisher " << publisher->type()
411
0
            << " run loop terminated for reason: " << status.getMessage();
412
    // Publishers auto tear down when their run loop stops.
413
0
  }
414
0
  publisher->tearDown();
415
0
  publisher->state(EventState::EVENT_NONE);
416
417
  // Do not remove the publisher from the event factory.
418
  // If the event factory's `end` method was called these publishers will be
419
  // cleaned up after their thread context is removed; otherwise, a removed
420
  // thread context and failed publisher will remain available for stats.
421
0
  return Status::success();
422
0
}
423
424
0
void EventFactory::delay() {
425
  // Caller may disable event publisher threads.
426
0
  if (FLAGS_disable_events) {
427
0
    return;
428
0
  }
429
430
  // Create a thread for each event publisher.
431
0
  auto& ef = EventFactory::getInstance();
432
0
  for (const auto& publisher : EventFactory::getInstance().event_pubs_) {
433
    // Publishers that did not set up correctly are put into an ending state.
434
0
    if (!publisher.second->isEnding()) {
435
0
      auto thread_ = std::make_shared<std::thread>(
436
0
          std::bind(&EventFactory::run, publisher.first));
437
0
      ef.threads_.push_back(thread_);
438
0
    }
439
0
  }
440
0
}
441
442
0
void EventFactory::end(bool join) {
443
0
  auto& ef = EventFactory::getInstance();
444
445
  // Call deregister on each publisher.
446
0
  for (const auto& publisher : ef.publisherTypes()) {
447
0
    deregisterEventPublisher(publisher);
448
0
  }
449
450
  // Stop handling exceptions for the publisher threads.
451
0
  for (const auto& thread : ef.threads_) {
452
0
    if (join) {
453
0
      thread->join();
454
0
    } else {
455
0
      thread->detach();
456
0
    }
457
0
  }
458
459
0
  {
460
0
    RecursiveLock lock(ef.factory_lock_);
461
    // A small cool off helps OS API event publisher flushing.
462
0
    if (!FLAGS_disable_events) {
463
0
      ef.threads_.clear();
464
0
    }
465
466
    // Threads may still be executing, when they finish, release publishers.
467
0
    ef.event_pubs_.clear();
468
0
    ef.event_subs_.clear();
469
0
  }
470
0
}
471
472
} // namespace osquery