Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/event/dispatcher_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/event/dispatcher_impl.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <string>
7
#include <vector>
8
9
#include "envoy/api/api.h"
10
#include "envoy/common/scope_tracker.h"
11
#include "envoy/config/overload/v3/overload.pb.h"
12
#include "envoy/network/client_connection_factory.h"
13
#include "envoy/network/listen_socket.h"
14
#include "envoy/network/listener.h"
15
16
#include "source/common/buffer/buffer_impl.h"
17
#include "source/common/common/assert.h"
18
#include "source/common/common/lock_guard.h"
19
#include "source/common/common/thread.h"
20
#include "source/common/config/utility.h"
21
#include "source/common/event/file_event_impl.h"
22
#include "source/common/event/libevent_scheduler.h"
23
#include "source/common/event/scaled_range_timer_manager_impl.h"
24
#include "source/common/event/signal_impl.h"
25
#include "source/common/event/timer_impl.h"
26
#include "source/common/filesystem/watcher_impl.h"
27
#include "source/common/network/address_impl.h"
28
#include "source/common/network/connection_impl.h"
29
#include "source/common/runtime/runtime_features.h"
30
31
#include "event2/event.h"
32
33
#ifdef ENVOY_HANDLE_SIGNALS
34
#include "source/common/signal/signal_action.h"
35
#endif
36
37
namespace Envoy {
38
namespace Event {
39
40
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
41
                               Event::TimeSystem& time_system)
42
5.52k
    : DispatcherImpl(name, api, time_system, {}) {}
43
44
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
45
                               Event::TimeSystem& time_system,
46
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
47
    : DispatcherImpl(
48
          name, api, time_system,
49
20.5k
          [](Dispatcher& dispatcher) {
50
20.5k
            return std::make_unique<ScaledRangeTimerManagerImpl>(dispatcher);
51
20.5k
          },
52
20.5k
          watermark_factory) {}
53
54
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
55
                               Event::TimeSystem& time_system,
56
                               const ScaledRangeTimerManagerFactory& scaled_timer_factory,
57
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
58
    : DispatcherImpl(name, api.threadFactory(), api.timeSource(), api.fileSystem(), time_system,
59
                     scaled_timer_factory,
60
                     watermark_factory != nullptr
61
                         ? watermark_factory
62
                         : std::make_shared<Buffer::WatermarkBufferFactory>(
63
25.0k
                               api.bootstrap().overload_manager().buffer_factory_config())) {}
64
65
DispatcherImpl::DispatcherImpl(const std::string& name, Thread::ThreadFactory& thread_factory,
66
                               TimeSource& time_source, Filesystem::Instance& file_system,
67
                               Event::TimeSystem& time_system,
68
                               const ScaledRangeTimerManagerFactory& scaled_timer_factory,
69
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
70
    : name_(name), thread_factory_(thread_factory), time_source_(time_source),
71
      file_system_(file_system), buffer_factory_(watermark_factory),
72
      scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
73
      thread_local_delete_cb_(
74
0
          base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
75
      deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
76
7.32k
          [this]() -> void { clearDeferredDeleteList(); })),
77
23.6k
      post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
78
25.0k
      current_to_delete_(&to_delete_1_), scaled_timer_manager_(scaled_timer_factory(*this)) {
79
25.0k
  ASSERT(!name_.empty());
80
25.0k
  FatalErrorHandler::registerFatalErrorHandler(*this);
81
25.0k
  updateApproximateMonotonicTimeInternal();
82
25.0k
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.fix_dispatcher_approximate_now")) {
83
25.0k
    base_scheduler_.registerOnCheckCallback(
84
25.0k
        std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
85
25.0k
  } else {
86
0
    base_scheduler_.registerOnPrepareCallback(
87
0
        std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
88
0
  }
89
25.0k
}
90
91
25.0k
DispatcherImpl::~DispatcherImpl() {
92
25.0k
  ENVOY_LOG(debug, "destroying dispatcher {}", name_);
93
25.0k
  FatalErrorHandler::removeFatalErrorHandler(*this);
94
  // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable
95
  // ASSERT(deletable_in_dispatcher_thread_.empty())
96
25.0k
}
97
98
void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
99
3.95k
                                      std::chrono::milliseconds min_touch_interval) {
100
3.95k
  ASSERT(!watchdog_registration_, "Each dispatcher can have at most one registered watchdog.");
101
3.95k
  watchdog_registration_ =
102
3.95k
      std::make_unique<WatchdogRegistration>(watchdog, *scheduler_, min_touch_interval, *this);
103
3.95k
}
104
105
void DispatcherImpl::initializeStats(Stats::Scope& scope,
106
170
                                     const absl::optional<std::string>& prefix) {
107
170
  const std::string effective_prefix = prefix.has_value() ? *prefix : absl::StrCat(name_, ".");
108
  // This needs to be run in the dispatcher's thread, so that we have a thread id to log.
109
170
  post([this, &scope, effective_prefix] {
110
87
    stats_prefix_ = effective_prefix + "dispatcher";
111
87
    stats_ = std::make_unique<DispatcherStats>(
112
87
        DispatcherStats{ALL_DISPATCHER_STATS(POOL_HISTOGRAM_PREFIX(scope, stats_prefix_ + "."))});
113
87
    base_scheduler_.initializeStats(stats_.get());
114
87
    ENVOY_LOG(debug, "running {} on thread {}", stats_prefix_, run_tid_.debugString());
115
87
  });
116
170
}
117
118
193k
void DispatcherImpl::clearDeferredDeleteList() {
119
193k
  ASSERT(isThreadSafe());
120
193k
  std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;
121
122
193k
  size_t num_to_delete = to_delete->size();
123
193k
  if (deferred_deleting_ || !num_to_delete) {
124
185k
    return;
125
185k
  }
126
127
8.11k
  ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
128
129
  // Swap the current deletion vector so that if we do deferred delete while we are deleting, we
130
  // use the other vector. We will get another callback to delete that vector.
131
8.11k
  if (current_to_delete_ == &to_delete_1_) {
132
5.43k
    current_to_delete_ = &to_delete_2_;
133
5.43k
  } else {
134
2.67k
    current_to_delete_ = &to_delete_1_;
135
2.67k
  }
136
137
8.11k
  touchWatchdog();
138
8.11k
  deferred_deleting_ = true;
139
140
  // Calling clear() on the vector does not specify which order destructors run in. We want to
141
  // destroy in FIFO order so just do it manually. This required 2 passes over the vector which is
142
  // not optimal but can be cleaned up later if needed.
143
31.5k
  for (size_t i = 0; i < num_to_delete; i++) {
144
23.4k
    (*to_delete)[i].reset();
145
23.4k
  }
146
147
8.11k
  to_delete->clear();
148
8.11k
  deferred_deleting_ = false;
149
8.11k
}
150
151
Network::ServerConnectionPtr
152
DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
153
                                       Network::TransportSocketPtr&& transport_socket,
154
3.42k
                                       StreamInfo::StreamInfo& stream_info) {
155
3.42k
  ASSERT(isThreadSafe());
156
3.42k
  return std::make_unique<Network::ServerConnectionImpl>(*this, std::move(socket),
157
3.42k
                                                         std::move(transport_socket), stream_info);
158
3.42k
}
159
160
Network::ClientConnectionPtr DispatcherImpl::createClientConnection(
161
    Network::Address::InstanceConstSharedPtr address,
162
    Network::Address::InstanceConstSharedPtr source_address,
163
    Network::TransportSocketPtr&& transport_socket,
164
    const Network::ConnectionSocket::OptionsSharedPtr& options,
165
3.84k
    const Network::TransportSocketOptionsConstSharedPtr& transport_options) {
166
3.84k
  ASSERT(isThreadSafe());
167
168
3.84k
  auto* factory = Config::Utility::getFactoryByName<Network::ClientConnectionFactory>(
169
3.84k
      std::string(address->addressType()));
170
  // The target address is usually offered by EDS and the EDS api should reject the unsupported
171
  // address.
172
  // TODO(lambdai): Return a closed connection if the factory is not found. Note that the caller
173
  // expects a non-null connection as of today so we cannot gracefully handle unsupported address
174
  // type.
175
3.84k
  return factory->createClientConnection(*this, address, source_address,
176
3.84k
                                         std::move(transport_socket), options, transport_options);
177
3.84k
}
178
179
FileEventPtr DispatcherImpl::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
180
15.3k
                                             uint32_t events) {
181
15.3k
  ASSERT(isThreadSafe());
182
15.3k
  return FileEventPtr{new FileEventImpl(
183
15.3k
      *this, fd,
184
38.6k
      [this, cb](uint32_t events) {
185
38.6k
        touchWatchdog();
186
38.6k
        return cb(events);
187
38.6k
      },
188
15.3k
      trigger, events)};
189
15.3k
}
190
191
2.10k
Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {
192
2.10k
  ASSERT(isThreadSafe());
193
2.10k
  return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this, file_system_)};
194
2.10k
}
195
196
38.7k
TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
197
38.7k
  ASSERT(isThreadSafe());
198
38.7k
  return createTimerInternal(cb);
199
38.7k
}
200
201
4.44k
TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerType timer_type, TimerCb cb) {
202
4.44k
  ASSERT(isThreadSafe());
203
4.44k
  return scaled_timer_manager_->createTimer(timer_type, std::move(cb));
204
4.44k
}
205
206
0
TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) {
207
0
  ASSERT(isThreadSafe());
208
0
  return scaled_timer_manager_->createTimer(minimum, std::move(cb));
209
0
}
210
211
17.5k
Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function<void()> cb) {
212
17.5k
  ASSERT(isThreadSafe());
213
18.4k
  return base_scheduler_.createSchedulableCallback([this, cb]() {
214
18.4k
    touchWatchdog();
215
18.4k
    cb();
216
18.4k
  });
217
17.5k
}
218
219
38.7k
TimerPtr DispatcherImpl::createTimerInternal(TimerCb cb) {
220
38.7k
  return scheduler_->createTimer(
221
38.7k
      [this, cb]() {
222
35.6k
        touchWatchdog();
223
35.6k
        cb();
224
35.6k
      },
225
38.7k
      *this);
226
38.7k
}
227
228
23.4k
void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
229
23.4k
  ASSERT(isThreadSafe());
230
23.4k
  if (to_delete != nullptr) {
231
23.4k
    to_delete->deleteIsPending();
232
23.4k
    current_to_delete_->emplace_back(std::move(to_delete));
233
23.4k
    ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
234
23.4k
    if (current_to_delete_->size() == 1) {
235
8.11k
      deferred_delete_cb_->scheduleCallbackCurrentIteration();
236
8.11k
    }
237
23.4k
  }
238
23.4k
}
239
240
13.8k
void DispatcherImpl::exit() { base_scheduler_.loopExit(); }
241
242
7.90k
SignalEventPtr DispatcherImpl::listenForSignal(signal_t signal_num, SignalCb cb) {
243
7.90k
  ASSERT(isThreadSafe());
244
7.90k
  return SignalEventPtr{new SignalEventImpl(*this, signal_num, cb)};
245
7.90k
}
246
247
73.5k
void DispatcherImpl::post(PostCb callback) {
248
73.5k
  bool do_post;
249
73.5k
  {
250
73.5k
    Thread::LockGuard lock(post_lock_);
251
73.5k
    do_post = post_callbacks_.empty();
252
73.5k
    post_callbacks_.push_back(std::move(callback));
253
73.5k
  }
254
255
73.5k
  if (do_post) {
256
28.5k
    post_cb_->scheduleCallbackCurrentIteration();
257
28.5k
  }
258
73.5k
}
259
260
2.40k
void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) {
261
2.40k
  bool need_schedule;
262
2.40k
  {
263
2.40k
    Thread::LockGuard lock(thread_local_deletable_lock_);
264
2.40k
    need_schedule = deletables_in_dispatcher_thread_.empty();
265
2.40k
    deletables_in_dispatcher_thread_.emplace_back(std::move(deletable));
266
    // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072
267
    // ASSERT(!shutdown_called_, "inserted after shutdown");
268
2.40k
  }
269
270
2.40k
  if (need_schedule) {
271
1.97k
    thread_local_delete_cb_->scheduleCallbackCurrentIteration();
272
1.97k
  }
273
2.40k
}
274
275
149k
void DispatcherImpl::run(RunType type) {
276
149k
  run_tid_ = thread_factory_.currentThreadId();
277
  // Flush all post callbacks before we run the event loop. We do this because there are post
278
  // callbacks that have to get run before the initial event loop starts running. libevent does
279
  // not guarantee that events are run in any particular order. So even if we post() and call
280
  // event_base_once() before some other event, the other event might get called first.
281
149k
  runPostCallbacks();
282
149k
  base_scheduler_.run(type);
283
149k
}
284
285
15.0k
MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
286
15.0k
  return approximate_monotonic_time_;
287
15.0k
}
288
289
12.4k
void DispatcherImpl::shutdown() {
290
  // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below
291
  // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post
292
  // callbacks and dispatcher thread deletable objects.
293
12.4k
  ASSERT(isThreadSafe());
294
12.4k
  auto deferred_deletables_size = current_to_delete_->size();
295
12.4k
  std::list<std::function<void()>>::size_type post_callbacks_size;
296
12.4k
  {
297
12.4k
    Thread::LockGuard lock(post_lock_);
298
12.4k
    post_callbacks_size = post_callbacks_.size();
299
12.4k
  }
300
301
12.4k
  std::list<DispatcherThreadDeletableConstPtr> local_deletables;
302
12.4k
  {
303
12.4k
    Thread::LockGuard lock(thread_local_deletable_lock_);
304
12.4k
    local_deletables = std::move(deletables_in_dispatcher_thread_);
305
12.4k
  }
306
12.4k
  auto thread_local_deletables_size = local_deletables.size();
307
14.8k
  while (!local_deletables.empty()) {
308
2.40k
    local_deletables.pop_front();
309
2.40k
  }
310
12.4k
  ASSERT(!shutdown_called_);
311
12.4k
  shutdown_called_ = true;
312
12.4k
  ENVOY_LOG(
313
12.4k
      trace,
314
12.4k
      "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ",
315
12.4k
      __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size);
316
12.4k
}
317
318
156k
void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); }
319
320
181k
void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
321
181k
  approximate_monotonic_time_ = time_source_.monotonicTime();
322
181k
}
323
324
0
void DispatcherImpl::runThreadLocalDelete() {
325
0
  std::list<DispatcherThreadDeletableConstPtr> to_be_delete;
326
0
  {
327
0
    Thread::LockGuard lock(thread_local_deletable_lock_);
328
0
    to_be_delete = std::move(deletables_in_dispatcher_thread_);
329
0
    ASSERT(deletables_in_dispatcher_thread_.empty());
330
0
  }
331
0
  while (!to_be_delete.empty()) {
332
    // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when
333
    // executing complicated destruction.
334
0
    touchWatchdog();
335
    // Delete in FIFO order.
336
0
    to_be_delete.pop_front();
337
0
  }
338
0
}
339
172k
void DispatcherImpl::runPostCallbacks() {
340
  // Clear the deferred delete list before running post callbacks to reduce non-determinism in
341
  // callback processing, and more easily detect if a scheduled post callback refers to one of the
342
  // objects that is being deferred deleted.
343
172k
  clearDeferredDeleteList();
344
345
172k
  std::list<PostCb> callbacks;
346
172k
  {
347
    // Take ownership of the callbacks under the post_lock_. The lock must be released before
348
    // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
349
    // later in the event loop.
350
172k
    Thread::LockGuard lock(post_lock_);
351
172k
    callbacks = std::move(post_callbacks_);
352
    // post_callbacks_ should be empty after the move.
353
172k
    ASSERT(post_callbacks_.empty());
354
172k
  }
355
  // It is important that the execution and deletion of the callback happen while post_lock_ is not
356
  // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
357
230k
  while (!callbacks.empty()) {
358
    // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
359
    // executing a long list of callbacks.
360
57.8k
    touchWatchdog();
361
    // Run the callback.
362
57.8k
    callbacks.front()();
363
    // Pop the front so that the destructor of the callback that just executed runs before the next
364
    // callback executes.
365
57.8k
    callbacks.pop_front();
366
57.8k
  }
367
172k
}
368
369
0
void DispatcherImpl::onFatalError(std::ostream& os) const {
370
  // Dump the state of the tracked objects in the dispatcher if thread safe. This generally
371
  // results in dumping the active state only for the thread which caused the fatal error.
372
0
  if (isThreadSafe()) {
373
0
    for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) {
374
0
      (*iter)->dumpState(os);
375
0
    }
376
0
  }
377
0
}
378
379
void DispatcherImpl::runFatalActionsOnTrackedObject(
380
0
    const FatalAction::FatalActionPtrList& actions) const {
381
  // Only run if this is the dispatcher of the current thread and
382
  // DispatcherImpl::Run has been called.
383
0
  if (run_tid_.isEmpty() || (run_tid_ != thread_factory_.currentThreadId())) {
384
0
    return;
385
0
  }
386
387
0
  for (const auto& action : actions) {
388
0
    action->run(tracked_object_stack_);
389
0
  }
390
0
}
391
392
158k
void DispatcherImpl::touchWatchdog() {
393
158k
  if (watchdog_registration_) {
394
48.6k
    watchdog_registration_->touchWatchdog();
395
48.6k
  }
396
158k
}
397
398
62.9k
void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) {
399
62.9k
  ASSERT(isThreadSafe());
400
62.9k
  ASSERT(object != nullptr);
401
62.9k
  tracked_object_stack_.push_back(object);
402
62.9k
  ASSERT(tracked_object_stack_.size() <= ExpectedMaxTrackedObjectStackDepth);
403
62.9k
}
404
405
62.9k
void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) {
406
62.9k
  ASSERT(isThreadSafe());
407
62.9k
  ASSERT(expected_object != nullptr);
408
62.9k
  RELEASE_ASSERT(!tracked_object_stack_.empty(), "Tracked Object Stack is empty, nothing to pop!");
409
410
62.9k
  const ScopeTrackedObject* top = tracked_object_stack_.back();
411
62.9k
  tracked_object_stack_.pop_back();
412
62.9k
  ASSERT(top == expected_object,
413
62.9k
         "Popped the top of the tracked object stack, but it wasn't the expected object!");
414
62.9k
}
415
416
} // namespace Event
417
} // namespace Envoy