Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/event/dispatcher_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/event/dispatcher_impl.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <string>
7
#include <vector>
8
9
#include "envoy/api/api.h"
10
#include "envoy/common/scope_tracker.h"
11
#include "envoy/config/overload/v3/overload.pb.h"
12
#include "envoy/network/client_connection_factory.h"
13
#include "envoy/network/listen_socket.h"
14
#include "envoy/network/listener.h"
15
16
#include "source/common/buffer/buffer_impl.h"
17
#include "source/common/common/assert.h"
18
#include "source/common/common/lock_guard.h"
19
#include "source/common/common/thread.h"
20
#include "source/common/config/utility.h"
21
#include "source/common/event/file_event_impl.h"
22
#include "source/common/event/libevent_scheduler.h"
23
#include "source/common/event/scaled_range_timer_manager_impl.h"
24
#include "source/common/event/signal_impl.h"
25
#include "source/common/event/timer_impl.h"
26
#include "source/common/filesystem/watcher_impl.h"
27
#include "source/common/network/address_impl.h"
28
#include "source/common/network/connection_impl.h"
29
#include "source/common/network/tcp_listener_impl.h"
30
#include "source/common/runtime/runtime_features.h"
31
32
#include "event2/event.h"
33
34
#ifdef ENVOY_HANDLE_SIGNALS
35
#include "source/common/signal/signal_action.h"
36
#endif
37
38
namespace Envoy {
39
namespace Event {
40
41
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
42
                               Event::TimeSystem& time_system)
43
6.68k
    : DispatcherImpl(name, api, time_system, {}) {}
44
45
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
46
                               Event::TimeSystem& time_system,
47
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
48
    : DispatcherImpl(
49
          name, api, time_system,
50
27.7k
          [](Dispatcher& dispatcher) {
51
27.7k
            return std::make_unique<ScaledRangeTimerManagerImpl>(dispatcher);
52
27.7k
          },
53
27.7k
          watermark_factory) {}
54
55
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
56
                               Event::TimeSystem& time_system,
57
                               const ScaledRangeTimerManagerFactory& scaled_timer_factory,
58
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
59
    : DispatcherImpl(name, api.threadFactory(), api.timeSource(), api.randomGenerator(),
60
                     api.fileSystem(), time_system, scaled_timer_factory,
61
                     watermark_factory != nullptr
62
                         ? watermark_factory
63
                         : std::make_shared<Buffer::WatermarkBufferFactory>(
64
33.1k
                               api.bootstrap().overload_manager().buffer_factory_config())) {}
65
66
DispatcherImpl::DispatcherImpl(const std::string& name, Thread::ThreadFactory& thread_factory,
67
                               TimeSource& time_source, Random::RandomGenerator& random_generator,
68
                               Filesystem::Instance& file_system, Event::TimeSystem& time_system,
69
                               const ScaledRangeTimerManagerFactory& scaled_timer_factory,
70
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
71
    : name_(name), thread_factory_(thread_factory), time_source_(time_source),
72
      random_generator_(random_generator), file_system_(file_system),
73
      buffer_factory_(watermark_factory),
74
      scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
75
      thread_local_delete_cb_(
76
0
          base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
77
      deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
78
17.3k
          [this]() -> void { clearDeferredDeleteList(); })),
79
28.1k
      post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
80
33.1k
      current_to_delete_(&to_delete_1_), scaled_timer_manager_(scaled_timer_factory(*this)) {
81
33.1k
  ASSERT(!name_.empty());
82
33.1k
  FatalErrorHandler::registerFatalErrorHandler(*this);
83
33.1k
  updateApproximateMonotonicTimeInternal();
84
33.1k
  base_scheduler_.registerOnPrepareCallback(
85
33.1k
      std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
86
33.1k
}
87
88
33.1k
DispatcherImpl::~DispatcherImpl() {
89
33.1k
  ENVOY_LOG(debug, "destroying dispatcher {}", name_);
90
33.1k
  FatalErrorHandler::removeFatalErrorHandler(*this);
91
  // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable
92
  // ASSERT(deletable_in_dispatcher_thread_.empty())
93
33.1k
}
94
95
void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
96
5.29k
                                      std::chrono::milliseconds min_touch_interval) {
97
5.29k
  ASSERT(!watchdog_registration_, "Each dispatcher can have at most one registered watchdog.");
98
5.29k
  watchdog_registration_ =
99
5.29k
      std::make_unique<WatchdogRegistration>(watchdog, *scheduler_, min_touch_interval, *this);
100
5.29k
}
101
102
void DispatcherImpl::initializeStats(Stats::Scope& scope,
103
148
                                     const absl::optional<std::string>& prefix) {
104
148
  const std::string effective_prefix = prefix.has_value() ? *prefix : absl::StrCat(name_, ".");
105
  // This needs to be run in the dispatcher's thread, so that we have a thread id to log.
106
148
  post([this, &scope, effective_prefix] {
107
78
    stats_prefix_ = effective_prefix + "dispatcher";
108
78
    stats_ = std::make_unique<DispatcherStats>(
109
78
        DispatcherStats{ALL_DISPATCHER_STATS(POOL_HISTOGRAM_PREFIX(scope, stats_prefix_ + "."))});
110
78
    base_scheduler_.initializeStats(stats_.get());
111
78
    ENVOY_LOG(debug, "running {} on thread {}", stats_prefix_, run_tid_.debugString());
112
78
  });
113
148
}
114
115
141k
void DispatcherImpl::clearDeferredDeleteList() {
116
141k
  ASSERT(isThreadSafe());
117
141k
  std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;
118
119
141k
  size_t num_to_delete = to_delete->size();
120
141k
  if (deferred_deleting_ || !num_to_delete) {
121
122k
    return;
122
122k
  }
123
124
18.2k
  ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
125
126
  // Swap the current deletion vector so that if we do deferred delete while we are deleting, we
127
  // use the other vector. We will get another callback to delete that vector.
128
18.2k
  if (current_to_delete_ == &to_delete_1_) {
129
12.0k
    current_to_delete_ = &to_delete_2_;
130
12.0k
  } else {
131
6.16k
    current_to_delete_ = &to_delete_1_;
132
6.16k
  }
133
134
18.2k
  touchWatchdog();
135
18.2k
  deferred_deleting_ = true;
136
137
  // Calling clear() on the vector does not specify which order destructors run in. We want to
138
  // destroy in FIFO order so just do it manually. This required 2 passes over the vector which is
139
  // not optimal but can be cleaned up later if needed.
140
61.4k
  for (size_t i = 0; i < num_to_delete; i++) {
141
43.2k
    (*to_delete)[i].reset();
142
43.2k
  }
143
144
18.2k
  to_delete->clear();
145
18.2k
  deferred_deleting_ = false;
146
18.2k
}
147
148
Network::ServerConnectionPtr
149
DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
150
                                       Network::TransportSocketPtr&& transport_socket,
151
6.99k
                                       StreamInfo::StreamInfo& stream_info) {
152
6.99k
  ASSERT(isThreadSafe());
153
6.99k
  return std::make_unique<Network::ServerConnectionImpl>(*this, std::move(socket),
154
6.99k
                                                         std::move(transport_socket), stream_info);
155
6.99k
}
156
157
Network::ClientConnectionPtr DispatcherImpl::createClientConnection(
158
    Network::Address::InstanceConstSharedPtr address,
159
    Network::Address::InstanceConstSharedPtr source_address,
160
    Network::TransportSocketPtr&& transport_socket,
161
    const Network::ConnectionSocket::OptionsSharedPtr& options,
162
7.60k
    const Network::TransportSocketOptionsConstSharedPtr& transport_options) {
163
7.60k
  ASSERT(isThreadSafe());
164
165
7.60k
  auto* factory = Config::Utility::getFactoryByName<Network::ClientConnectionFactory>(
166
7.60k
      std::string(address->addressType()));
167
  // The target address is usually offered by EDS and the EDS api should reject the unsupported
168
  // address.
169
  // TODO(lambdai): Return a closed connection if the factory is not found. Note that the caller
170
  // expects a non-null connection as of today so we cannot gracefully handle unsupported address
171
  // type.
172
7.60k
  return factory->createClientConnection(*this, address, source_address,
173
7.60k
                                         std::move(transport_socket), options, transport_options);
174
7.60k
}
175
176
FileEventPtr DispatcherImpl::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
177
25.3k
                                             uint32_t events) {
178
25.3k
  ASSERT(isThreadSafe());
179
25.3k
  return FileEventPtr{new FileEventImpl(
180
25.3k
      *this, fd,
181
66.4k
      [this, cb](uint32_t events) {
182
66.4k
        touchWatchdog();
183
66.4k
        cb(events);
184
66.4k
      },
185
25.3k
      trigger, events)};
186
25.3k
}
187
188
2.77k
Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {
189
2.77k
  ASSERT(isThreadSafe());
190
2.77k
  return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this, file_system_)};
191
2.77k
}
192
193
Network::ListenerPtr
194
DispatcherImpl::createListener(Network::SocketSharedPtr&& socket, Network::TcpListenerCallbacks& cb,
195
                               Runtime::Loader& runtime,
196
                               const Network::ListenerConfig& listener_config,
197
7.97k
                               Server::ThreadLocalOverloadStateOptRef overload_state) {
198
7.97k
  ASSERT(isThreadSafe());
199
7.97k
  return std::make_unique<Network::TcpListenerImpl>(
200
7.97k
      *this, random_generator_, runtime, std::move(socket), cb, listener_config.bindToPort(),
201
7.97k
      listener_config.ignoreGlobalConnLimit(),
202
7.97k
      listener_config.maxConnectionsToAcceptPerSocketEvent(), overload_state);
203
7.97k
}
204
205
275k
TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
206
275k
  ASSERT(isThreadSafe());
207
275k
  return createTimerInternal(cb);
208
275k
}
209
210
5.46k
TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerType timer_type, TimerCb cb) {
211
5.46k
  ASSERT(isThreadSafe());
212
5.46k
  return scaled_timer_manager_->createTimer(timer_type, std::move(cb));
213
5.46k
}
214
215
0
TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) {
216
0
  ASSERT(isThreadSafe());
217
0
  return scaled_timer_manager_->createTimer(minimum, std::move(cb));
218
0
}
219
220
48.5k
Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function<void()> cb) {
221
48.5k
  ASSERT(isThreadSafe());
222
48.5k
  return base_scheduler_.createSchedulableCallback([this, cb]() {
223
24.6k
    touchWatchdog();
224
24.6k
    cb();
225
24.6k
  });
226
48.5k
}
227
228
275k
TimerPtr DispatcherImpl::createTimerInternal(TimerCb cb) {
229
275k
  return scheduler_->createTimer(
230
275k
      [this, cb]() {
231
33.3k
        touchWatchdog();
232
33.3k
        cb();
233
33.3k
      },
234
275k
      *this);
235
275k
}
236
237
43.2k
void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
238
43.2k
  ASSERT(isThreadSafe());
239
43.2k
  if (to_delete != nullptr) {
240
43.2k
    to_delete->deleteIsPending();
241
43.2k
    current_to_delete_->emplace_back(std::move(to_delete));
242
43.2k
    ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
243
43.2k
    if (current_to_delete_->size() == 1) {
244
18.2k
      deferred_delete_cb_->scheduleCallbackCurrentIteration();
245
18.2k
    }
246
43.2k
  }
247
43.2k
}
248
249
20.5k
void DispatcherImpl::exit() { base_scheduler_.loopExit(); }
250
251
10.5k
SignalEventPtr DispatcherImpl::listenForSignal(signal_t signal_num, SignalCb cb) {
252
10.5k
  ASSERT(isThreadSafe());
253
10.5k
  return SignalEventPtr{new SignalEventImpl(*this, signal_num, cb)};
254
10.5k
}
255
256
101k
void DispatcherImpl::post(PostCb callback) {
257
101k
  bool do_post;
258
101k
  {
259
101k
    Thread::LockGuard lock(post_lock_);
260
101k
    do_post = post_callbacks_.empty();
261
101k
    post_callbacks_.push_back(std::move(callback));
262
101k
  }
263
264
101k
  if (do_post) {
265
33.7k
    post_cb_->scheduleCallbackCurrentIteration();
266
33.7k
  }
267
101k
}
268
269
3.26k
void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) {
270
3.26k
  bool need_schedule;
271
3.26k
  {
272
3.26k
    Thread::LockGuard lock(thread_local_deletable_lock_);
273
3.26k
    need_schedule = deletables_in_dispatcher_thread_.empty();
274
3.26k
    deletables_in_dispatcher_thread_.emplace_back(std::move(deletable));
275
    // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072
276
    // ASSERT(!shutdown_called_, "inserted after shutdown");
277
3.26k
  }
278
279
3.26k
  if (need_schedule) {
280
2.64k
    thread_local_delete_cb_->scheduleCallbackCurrentIteration();
281
2.64k
  }
282
3.26k
}
283
284
78.5k
void DispatcherImpl::run(RunType type) {
285
78.5k
  run_tid_ = thread_factory_.currentThreadId();
286
  // Flush all post callbacks before we run the event loop. We do this because there are post
287
  // callbacks that have to get run before the initial event loop starts running. libevent does
288
  // not guarantee that events are run in any particular order. So even if we post() and call
289
  // event_base_once() before some other event, the other event might get called first.
290
78.5k
  runPostCallbacks();
291
78.5k
  base_scheduler_.run(type);
292
78.5k
}
293
294
953k
MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
295
953k
  return approximate_monotonic_time_;
296
953k
}
297
298
14.8k
void DispatcherImpl::shutdown() {
299
  // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below
300
  // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post
301
  // callbacks and dispatcher thread deletable objects.
302
14.8k
  ASSERT(isThreadSafe());
303
14.8k
  auto deferred_deletables_size = current_to_delete_->size();
304
14.8k
  std::list<std::function<void()>>::size_type post_callbacks_size;
305
14.8k
  {
306
14.8k
    Thread::LockGuard lock(post_lock_);
307
14.8k
    post_callbacks_size = post_callbacks_.size();
308
14.8k
  }
309
310
14.8k
  std::list<DispatcherThreadDeletableConstPtr> local_deletables;
311
14.8k
  {
312
14.8k
    Thread::LockGuard lock(thread_local_deletable_lock_);
313
14.8k
    local_deletables = std::move(deletables_in_dispatcher_thread_);
314
14.8k
  }
315
14.8k
  auto thread_local_deletables_size = local_deletables.size();
316
18.0k
  while (!local_deletables.empty()) {
317
3.26k
    local_deletables.pop_front();
318
3.26k
  }
319
14.8k
  ASSERT(!shutdown_called_);
320
14.8k
  shutdown_called_ = true;
321
14.8k
  ENVOY_LOG(
322
14.8k
      trace,
323
14.8k
      "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ",
324
14.8k
      __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size);
325
14.8k
}
326
327
354k
void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); }
328
329
387k
void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
330
387k
  approximate_monotonic_time_ = time_source_.monotonicTime();
331
387k
}
332
333
0
void DispatcherImpl::runThreadLocalDelete() {
334
0
  std::list<DispatcherThreadDeletableConstPtr> to_be_delete;
335
0
  {
336
0
    Thread::LockGuard lock(thread_local_deletable_lock_);
337
0
    to_be_delete = std::move(deletables_in_dispatcher_thread_);
338
0
    ASSERT(deletables_in_dispatcher_thread_.empty());
339
0
  }
340
0
  while (!to_be_delete.empty()) {
341
    // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when
342
    // executing complicated destruction.
343
0
    touchWatchdog();
344
    // Delete in FIFO order.
345
0
    to_be_delete.pop_front();
346
0
  }
347
0
}
348
106k
void DispatcherImpl::runPostCallbacks() {
349
  // Clear the deferred delete list before running post callbacks to reduce non-determinism in
350
  // callback processing, and more easily detect if a scheduled post callback refers to one of the
351
  // objects that is being deferred deleted.
352
106k
  clearDeferredDeleteList();
353
354
106k
  std::list<PostCb> callbacks;
355
106k
  {
356
    // Take ownership of the callbacks under the post_lock_. The lock must be released before
357
    // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
358
    // later in the event loop.
359
106k
    Thread::LockGuard lock(post_lock_);
360
106k
    callbacks = std::move(post_callbacks_);
361
    // post_callbacks_ should be empty after the move.
362
106k
    ASSERT(post_callbacks_.empty());
363
106k
  }
364
  // It is important that the execution and deletion of the callback happen while post_lock_ is not
365
  // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
366
191k
  while (!callbacks.empty()) {
367
    // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
368
    // executing a long list of callbacks.
369
84.4k
    touchWatchdog();
370
    // Run the callback.
371
84.4k
    callbacks.front()();
372
    // Pop the front so that the destructor of the callback that just executed runs before the next
373
    // callback executes.
374
84.4k
    callbacks.pop_front();
375
84.4k
  }
376
106k
}
377
378
0
void DispatcherImpl::onFatalError(std::ostream& os) const {
379
  // Dump the state of the tracked objects in the dispatcher if thread safe. This generally
380
  // results in dumping the active state only for the thread which caused the fatal error.
381
0
  if (isThreadSafe()) {
382
0
    for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) {
383
0
      (*iter)->dumpState(os);
384
0
    }
385
0
  }
386
0
}
387
388
void DispatcherImpl::runFatalActionsOnTrackedObject(
389
0
    const FatalAction::FatalActionPtrList& actions) const {
390
  // Only run if this is the dispatcher of the current thread and
391
  // DispatcherImpl::Run has been called.
392
0
  if (run_tid_.isEmpty() || (run_tid_ != thread_factory_.currentThreadId())) {
393
0
    return;
394
0
  }
395
396
0
  for (const auto& action : actions) {
397
0
    action->run(tracked_object_stack_);
398
0
  }
399
0
}
400
401
227k
void DispatcherImpl::touchWatchdog() {
402
227k
  if (watchdog_registration_) {
403
88.3k
    watchdog_registration_->touchWatchdog();
404
88.3k
  }
405
227k
}
406
407
106k
void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) {
408
106k
  ASSERT(isThreadSafe());
409
106k
  ASSERT(object != nullptr);
410
106k
  tracked_object_stack_.push_back(object);
411
106k
  ASSERT(tracked_object_stack_.size() <= ExpectedMaxTrackedObjectStackDepth);
412
106k
}
413
414
106k
void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) {
415
106k
  ASSERT(isThreadSafe());
416
106k
  ASSERT(expected_object != nullptr);
417
106k
  RELEASE_ASSERT(!tracked_object_stack_.empty(), "Tracked Object Stack is empty, nothing to pop!");
418
419
106k
  const ScopeTrackedObject* top = tracked_object_stack_.back();
420
106k
  tracked_object_stack_.pop_back();
421
106k
  ASSERT(top == expected_object,
422
106k
         "Popped the top of the tracked object stack, but it wasn't the expected object!");
423
106k
}
424
425
} // namespace Event
426
} // namespace Envoy