1
#include "source/common/event/dispatcher_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <string>
7
#include <vector>
8

            
9
#include "envoy/api/api.h"
10
#include "envoy/common/scope_tracker.h"
11
#include "envoy/config/overload/v3/overload.pb.h"
12
#include "envoy/network/client_connection_factory.h"
13
#include "envoy/network/listen_socket.h"
14
#include "envoy/network/listener.h"
15

            
16
#include "source/common/buffer/buffer_impl.h"
17
#include "source/common/common/assert.h"
18
#include "source/common/common/lock_guard.h"
19
#include "source/common/common/thread.h"
20
#include "source/common/config/utility.h"
21
#include "source/common/event/file_event_impl.h"
22
#include "source/common/event/libevent_scheduler.h"
23
#include "source/common/event/scaled_range_timer_manager_impl.h"
24
#include "source/common/event/signal_impl.h"
25
#include "source/common/event/timer_impl.h"
26
#include "source/common/filesystem/watcher_impl.h"
27
#include "source/common/network/address_impl.h"
28
#include "source/common/network/connection_impl.h"
29
#include "source/common/network/utility.h"
30
#include "source/common/runtime/runtime_features.h"
31

            
32
#include "event2/event.h"
33

            
34
#ifdef ENVOY_HANDLE_SIGNALS
35
#include "source/common/signal/signal_action.h"
36
#endif
37

            
38
namespace Envoy {
39
namespace Event {
40

            
41
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
42
                               Event::TimeSystem& time_system)
43
87
    : DispatcherImpl(name, api, time_system, {}) {}
44

            
45
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
46
                               Event::TimeSystem& time_system,
47
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
48
66944
    : DispatcherImpl(
49
66944
          name, api, time_system,
50
66944
          [](Dispatcher& dispatcher) {
51
66944
            return std::make_unique<ScaledRangeTimerManagerImpl>(dispatcher);
52
66944
          },
53
66944
          watermark_factory) {}
54

            
55
DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
56
                               Event::TimeSystem& time_system,
57
                               const ScaledRangeTimerManagerFactory& scaled_timer_factory,
58
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
59
77806
    : DispatcherImpl(name, api.threadFactory(), api.timeSource(), api.fileSystem(), time_system,
60
77806
                     scaled_timer_factory,
61
77806
                     watermark_factory != nullptr
62
77806
                         ? watermark_factory
63
77806
                         : std::make_shared<Buffer::WatermarkBufferFactory>(
64
77806
                               api.bootstrap().overload_manager().buffer_factory_config())) {}
65

            
66
DispatcherImpl::DispatcherImpl(const std::string& name, Thread::ThreadFactory& thread_factory,
67
                               TimeSource& time_source, Filesystem::Instance& file_system,
68
                               Event::TimeSystem& time_system,
69
                               const ScaledRangeTimerManagerFactory& scaled_timer_factory,
70
                               const Buffer::WatermarkFactorySharedPtr& watermark_factory)
71
77806
    : name_(name), thread_factory_(thread_factory), time_source_(time_source),
72
77806
      file_system_(file_system), buffer_factory_(watermark_factory),
73
77806
      scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
74
      thread_local_delete_cb_(
75
77806
          base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
76
77806
      deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
77
191775
          [this]() -> void { clearDeferredDeleteList(); })),
78
352675
      post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
79
77806
      current_to_delete_(&to_delete_1_), scaled_timer_manager_(scaled_timer_factory(*this)) {
80
77806
  ASSERT(!name_.empty());
81
77806
  FatalErrorHandler::registerFatalErrorHandler(*this);
82
77806
  updateApproximateMonotonicTimeInternal();
83
77806
  base_scheduler_.registerOnCheckCallback(
84
77806
      std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
85
77806
}
86

            
87
77806
DispatcherImpl::~DispatcherImpl() {
88
77806
  ENVOY_LOG(debug, "destroying dispatcher {}", name_);
89
77806
  FatalErrorHandler::removeFatalErrorHandler(*this);
90
  // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable
91
  // ASSERT(deletable_in_dispatcher_thread_.empty())
92
77806
}
93

            
94
void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
95
21247
                                      std::chrono::milliseconds min_touch_interval) {
96
21247
  ASSERT(!watchdog_registration_, "Each dispatcher can have at most one registered watchdog.");
97
21247
  watchdog_registration_ =
98
21247
      std::make_unique<WatchdogRegistration>(watchdog, *scheduler_, min_touch_interval, *this);
99
21247
}
100

            
101
void DispatcherImpl::initializeStats(Stats::Scope& scope,
102
15
                                     const absl::optional<std::string>& prefix) {
103
15
  const std::string effective_prefix = prefix.has_value() ? *prefix : absl::StrCat(name_, ".");
104
  // This needs to be run in the dispatcher's thread, so that we have a thread id to log.
105
15
  post([this, &scope, effective_prefix] {
106
15
    stats_prefix_ = effective_prefix + "dispatcher";
107
15
    stats_ = std::make_unique<DispatcherStats>(
108
15
        DispatcherStats{ALL_DISPATCHER_STATS(POOL_HISTOGRAM_PREFIX(scope, stats_prefix_ + "."))});
109
15
    base_scheduler_.initializeStats(stats_.get());
110
15
    ENVOY_LOG(debug, "running {} on thread {}", stats_prefix_, run_tid_.debugString());
111
15
  });
112
15
}
113

            
114
256351342
void DispatcherImpl::clearDeferredDeleteList() {
115
256351342
  ASSERT(isThreadSafe());
116
256351342
  std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;
117

            
118
256351342
  size_t num_to_delete = to_delete->size();
119
256351342
  if (deferred_deleting_ || !num_to_delete) {
120
256174191
    return;
121
256174191
  }
122

            
123
177151
  ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
124

            
125
  // Swap the current deletion vector so that if we do deferred delete while we are deleting, we
126
  // use the other vector. We will get another callback to delete that vector.
127
177151
  if (current_to_delete_ == &to_delete_1_) {
128
99059
    current_to_delete_ = &to_delete_2_;
129
99059
  } else {
130
78092
    current_to_delete_ = &to_delete_1_;
131
78092
  }
132

            
133
177151
  touchWatchdog();
134
177151
  deferred_deleting_ = true;
135

            
136
  // Calling clear() on the vector does not specify which order destructors run in. We want to
137
  // destroy in FIFO order so just do it manually. This required 2 passes over the vector which is
138
  // not optimal but can be cleaned up later if needed.
139
733619
  for (size_t i = 0; i < num_to_delete; i++) {
140
556468
    (*to_delete)[i].reset();
141
556468
  }
142

            
143
177151
  to_delete->clear();
144
177151
  deferred_deleting_ = false;
145
177151
}
146

            
147
Network::ServerConnectionPtr
148
DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
149
                                       Network::TransportSocketPtr&& transport_socket,
150
54453
                                       StreamInfo::StreamInfo& stream_info) {
151
54453
  ASSERT(isThreadSafe());
152
54453
  return std::make_unique<Network::ServerConnectionImpl>(*this, std::move(socket),
153
54453
                                                         std::move(transport_socket), stream_info);
154
54453
}
155

            
156
Network::ClientConnectionPtr DispatcherImpl::createClientConnection(
157
    Network::Address::InstanceConstSharedPtr address,
158
    Network::Address::InstanceConstSharedPtr source_address,
159
    Network::TransportSocketPtr&& transport_socket,
160
    const Network::ConnectionSocket::OptionsSharedPtr& options,
161
53556
    const Network::TransportSocketOptionsConstSharedPtr& transport_options) {
162
53556
  ASSERT(isThreadSafe());
163

            
164
53556
  auto* factory = Config::Utility::getFactoryByName<Network::ClientConnectionFactory>(
165
53556
      std::string(address->addressType()));
166

            
167
  // The target address is usually offered by EDS and the EDS api should reject the unsupported
168
  // address.
169
  // TODO(lambdai): Return a closed connection if the factory is not found. Note that the caller
170
  // expects a non-null connection as of today so we cannot gracefully handle unsupported address
171
  // type.
172
53556
#if defined(__linux__)
173
  // For Linux, the source address' network namespace is relevant for client connections, since that
174
  // is where the netns would be specified.
175
53556
  if (source_address && source_address->networkNamespace().has_value()) {
176
    auto f = [&]() -> Network::ClientConnectionPtr {
177
      return factory->createClientConnection(
178
          *this, address, source_address, std::move(transport_socket), options, transport_options);
179
    };
180
    auto result = Network::Utility::execInNetworkNamespace(
181
        std::move(f), source_address->networkNamespace()->c_str());
182
    if (!result.ok()) {
183
      ENVOY_LOG(error, "failed to create connection in network namespace {}: {}",
184
                source_address->networkNamespace().value(), result.status().ToString());
185
      return nullptr;
186
    }
187
    return *std::move(result);
188
  }
189
53556
#endif
190

            
191
53556
  return factory->createClientConnection(*this, address, source_address,
192
53556
                                         std::move(transport_socket), options, transport_options);
193
53556
}
194

            
195
FileEventPtr DispatcherImpl::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
196
161941
                                             uint32_t events) {
197
161941
  ASSERT(isThreadSafe());
198
161941
  return FileEventPtr{new FileEventImpl(
199
161941
      *this, fd,
200
1479600
      [this, cb](uint32_t events) {
201
1478800
        touchWatchdog();
202
1478800
        return cb(events);
203
1478800
      },
204
161941
      trigger, events)};
205
161941
}
206

            
207
10392
Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {
208
10392
  ASSERT(isThreadSafe());
209
10392
  return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this, file_system_)};
210
10392
}
211

            
212
404121
TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
213
404121
  ASSERT(isThreadSafe());
214
404121
  return createTimerInternal(cb);
215
404121
}
216

            
217
115526
TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerType timer_type, TimerCb cb) {
218
115526
  ASSERT(isThreadSafe());
219
115526
  return scaled_timer_manager_->createTimer(timer_type, std::move(cb));
220
115526
}
221

            
222
2
TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) {
223
2
  ASSERT(isThreadSafe());
224
2
  return scaled_timer_manager_->createTimer(minimum, std::move(cb));
225
2
}
226

            
227
188202
Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function<void()> cb) {
228
188202
  ASSERT(isThreadSafe());
229
1043187
  return base_scheduler_.createSchedulableCallback([this, cb]() {
230
1015710
    touchWatchdog();
231
1015710
    cb();
232
1015710
  });
233
188202
}
234

            
235
404121
TimerPtr DispatcherImpl::createTimerInternal(TimerCb cb) {
236
404121
  return scheduler_->createTimer(
237
6632101
      [this, cb]() {
238
6402502
        touchWatchdog();
239
6402502
        cb();
240
6402502
      },
241
404121
      *this);
242
404121
}
243

            
244
577747
void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
245
577747
  ASSERT(isThreadSafe());
246
577747
  if (to_delete != nullptr) {
247
556974
    to_delete->deleteIsPending();
248
556974
    current_to_delete_->emplace_back(std::move(to_delete));
249
556974
    ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
250
556974
    if (current_to_delete_->size() == 1) {
251
177566
      deferred_delete_cb_->scheduleCallbackCurrentIteration();
252
177566
    }
253
556974
  }
254
577747
}
255

            
256
123275
void DispatcherImpl::exit() { base_scheduler_.loopExit(); }
257

            
258
42420
SignalEventPtr DispatcherImpl::listenForSignal(signal_t signal_num, SignalCb cb) {
259
42420
  ASSERT(isThreadSafe());
260
42420
  return SignalEventPtr{new SignalEventImpl(*this, signal_num, cb)};
261
42420
}
262

            
263
627712
void DispatcherImpl::post(PostCb callback) {
264
627712
  bool do_post;
265
627712
  {
266
627712
    Thread::LockGuard lock(post_lock_);
267
627712
    do_post = post_callbacks_.empty();
268
627712
    post_callbacks_.push_back(std::move(callback));
269
627712
  }
270

            
271
627712
  if (do_post) {
272
362124
    post_cb_->scheduleCallbackCurrentIteration();
273
362124
  }
274
627712
}
275

            
276
17204
void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) {
277
17204
  bool need_schedule;
278
17204
  {
279
17204
    Thread::LockGuard lock(thread_local_deletable_lock_);
280
17204
    need_schedule = deletables_in_dispatcher_thread_.empty();
281
17204
    deletables_in_dispatcher_thread_.emplace_back(std::move(deletable));
282
    // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072
283
    // ASSERT(!shutdown_called_, "inserted after shutdown");
284
17204
  }
285

            
286
17204
  if (need_schedule) {
287
11302
    thread_local_delete_cb_->scheduleCallbackCurrentIteration();
288
11302
  }
289
17204
}
290

            
291
255753680
void DispatcherImpl::run(RunType type) {
292
255753680
  run_tid_ = thread_factory_.currentThreadId();
293
  // Flush all post callbacks before we run the event loop. We do this because there are post
294
  // callbacks that have to get run before the initial event loop starts running. libevent does
295
  // not guarantee that events are run in any particular order. So even if we post() and call
296
  // event_base_once() before some other event, the other event might get called first.
297
255753680
  runPostCallbacks();
298
255753680
  base_scheduler_.run(type);
299
255753680
}
300

            
301
8818489
MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
302
8818489
  return approximate_monotonic_time_;
303
8818489
}
304

            
305
21405
void DispatcherImpl::shutdown() {
306
  // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below
307
  // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post
308
  // callbacks and dispatcher thread deletable objects.
309
21405
  ASSERT(isThreadSafe());
310
21405
  auto deferred_deletables_size = current_to_delete_->size();
311
21405
  std::list<std::function<void()>>::size_type post_callbacks_size;
312
21405
  {
313
21405
    Thread::LockGuard lock(post_lock_);
314
21405
    post_callbacks_size = post_callbacks_.size();
315
21405
  }
316

            
317
21405
  std::list<DispatcherThreadDeletableConstPtr> local_deletables;
318
21405
  {
319
21405
    Thread::LockGuard lock(thread_local_deletable_lock_);
320
21405
    local_deletables = std::move(deletables_in_dispatcher_thread_);
321
21405
  }
322
21405
  auto thread_local_deletables_size = local_deletables.size();
323
37812
  while (!local_deletables.empty()) {
324
16407
    local_deletables.pop_front();
325
16407
  }
326
21405
  ASSERT(!shutdown_called_);
327
21405
  shutdown_called_ = true;
328
21405
  ENVOY_LOG(
329
21405
      trace,
330
21405
      "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ",
331
21405
      __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size);
332
21405
}
333

            
334
266611833
void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); }
335

            
336
266689624
void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
337
266689624
  approximate_monotonic_time_ = time_source_.monotonicTime();
338
266689624
}
339

            
340
638
void DispatcherImpl::runThreadLocalDelete() {
341
638
  std::list<DispatcherThreadDeletableConstPtr> to_be_delete;
342
638
  {
343
638
    Thread::LockGuard lock(thread_local_deletable_lock_);
344
638
    to_be_delete = std::move(deletables_in_dispatcher_thread_);
345
638
    ASSERT(deletables_in_dispatcher_thread_.empty());
346
638
  }
347
1291
  while (!to_be_delete.empty()) {
348
    // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when
349
    // executing complicated destruction.
350
653
    touchWatchdog();
351
    // Delete in FIFO order.
352
653
    to_be_delete.pop_front();
353
653
  }
354
638
}
355
256104551
void DispatcherImpl::runPostCallbacks() {
356
  // Clear the deferred delete list before running post callbacks to reduce non-determinism in
357
  // callback processing, and more easily detect if a scheduled post callback refers to one of the
358
  // objects that is being deferred deleted.
359
256104551
  clearDeferredDeleteList();
360

            
361
256104551
  std::list<PostCb> callbacks;
362
256104551
  {
363
    // Take ownership of the callbacks under the post_lock_. The lock must be released before
364
    // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
365
    // later in the event loop.
366
256104551
    Thread::LockGuard lock(post_lock_);
367
256104551
    callbacks = std::move(post_callbacks_);
368
    // post_callbacks_ should be empty after the move.
369
256104551
    ASSERT(post_callbacks_.empty());
370
256104551
  }
371
  // It is important that the execution and deletion of the callback happen while post_lock_ is not
372
  // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
373
256729860
  while (!callbacks.empty()) {
374
    // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
375
    // executing a long list of callbacks.
376
625309
    touchWatchdog();
377
    // Run the callback.
378
625309
    callbacks.front()();
379
    // Pop the front so that the destructor of the callback that just executed runs before the next
380
    // callback executes.
381
625309
    callbacks.pop_front();
382
625309
  }
383
256104551
}
384

            
385
4
void DispatcherImpl::onFatalError(std::ostream& os) const {
386
  // Dump the state of the tracked objects in the dispatcher if thread safe. This generally
387
  // results in dumping the active state only for the thread which caused the fatal error.
388
4
  if (isThreadSafe()) {
389
8
    for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) {
390
4
      (*iter)->dumpState(os);
391
4
    }
392
4
  }
393
4
}
394

            
395
void DispatcherImpl::runFatalActionsOnTrackedObject(
396
3
    const FatalAction::FatalActionPtrList& actions) const {
397
  // Only run if this is the dispatcher of the current thread and
398
  // DispatcherImpl::Run has been called.
399
3
  if (run_tid_.isEmpty() || (run_tid_ != thread_factory_.currentThreadId())) {
400
2
    return;
401
2
  }
402

            
403
1
  for (const auto& action : actions) {
404
1
    action->run(tracked_object_stack_);
405
1
  }
406
1
}
407

            
408
9700102
void DispatcherImpl::touchWatchdog() {
409
9700102
  if (watchdog_registration_) {
410
8114184
    watchdog_registration_->touchWatchdog();
411
8114184
  }
412
9700102
}
413

            
414
3774949
void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) {
415
3774949
  ASSERT(isThreadSafe());
416
3774949
  ASSERT(object != nullptr);
417
3774949
  tracked_object_stack_.push_back(object);
418
3774949
  ASSERT(tracked_object_stack_.size() <= ExpectedMaxTrackedObjectStackDepth);
419
3774949
}
420

            
421
3774929
void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) {
422
3774929
  ASSERT(isThreadSafe());
423
3774929
  ASSERT(expected_object != nullptr);
424
3774929
  RELEASE_ASSERT(!tracked_object_stack_.empty(), "Tracked Object Stack is empty, nothing to pop!");
425

            
426
3774929
  const ScopeTrackedObject* top = tracked_object_stack_.back();
427
3774929
  tracked_object_stack_.pop_back();
428
3774929
  ASSERT(top == expected_object,
429
3774929
         "Popped the top of the tracked object stack, but it wasn't the expected object!");
430
3774929
}
431

            
432
} // namespace Event
433
} // namespace Envoy