Line data Source code
1 : #include "source/common/event/dispatcher_impl.h"
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <functional>
6 : #include <string>
7 : #include <vector>
8 :
9 : #include "envoy/api/api.h"
10 : #include "envoy/common/scope_tracker.h"
11 : #include "envoy/config/overload/v3/overload.pb.h"
12 : #include "envoy/network/client_connection_factory.h"
13 : #include "envoy/network/listen_socket.h"
14 : #include "envoy/network/listener.h"
15 :
16 : #include "source/common/buffer/buffer_impl.h"
17 : #include "source/common/common/assert.h"
18 : #include "source/common/common/lock_guard.h"
19 : #include "source/common/common/thread.h"
20 : #include "source/common/config/utility.h"
21 : #include "source/common/event/file_event_impl.h"
22 : #include "source/common/event/libevent_scheduler.h"
23 : #include "source/common/event/scaled_range_timer_manager_impl.h"
24 : #include "source/common/event/signal_impl.h"
25 : #include "source/common/event/timer_impl.h"
26 : #include "source/common/filesystem/watcher_impl.h"
27 : #include "source/common/network/address_impl.h"
28 : #include "source/common/network/connection_impl.h"
29 : #include "source/common/runtime/runtime_features.h"
30 :
31 : #include "event2/event.h"
32 :
33 : #ifdef ENVOY_HANDLE_SIGNALS
34 : #include "source/common/signal/signal_action.h"
35 : #endif
36 :
37 : namespace Envoy {
38 : namespace Event {
39 :
40 : DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
41 : Event::TimeSystem& time_system)
42 321 : : DispatcherImpl(name, api, time_system, {}) {}
43 :
44 : DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
45 : Event::TimeSystem& time_system,
46 : const Buffer::WatermarkFactorySharedPtr& watermark_factory)
47 : : DispatcherImpl(
48 : name, api, time_system,
49 1367 : [](Dispatcher& dispatcher) {
50 1367 : return std::make_unique<ScaledRangeTimerManagerImpl>(dispatcher);
51 1367 : },
52 1367 : watermark_factory) {}
53 :
54 : DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
55 : Event::TimeSystem& time_system,
56 : const ScaledRangeTimerManagerFactory& scaled_timer_factory,
57 : const Buffer::WatermarkFactorySharedPtr& watermark_factory)
58 : : DispatcherImpl(name, api.threadFactory(), api.timeSource(), api.randomGenerator(),
59 : api.fileSystem(), time_system, scaled_timer_factory,
60 : watermark_factory != nullptr
61 : ? watermark_factory
62 : : std::make_shared<Buffer::WatermarkBufferFactory>(
63 1498 : api.bootstrap().overload_manager().buffer_factory_config())) {}
64 :
65 : DispatcherImpl::DispatcherImpl(const std::string& name, Thread::ThreadFactory& thread_factory,
66 : TimeSource& time_source, Random::RandomGenerator&,
67 : Filesystem::Instance& file_system, Event::TimeSystem& time_system,
68 : const ScaledRangeTimerManagerFactory& scaled_timer_factory,
69 : const Buffer::WatermarkFactorySharedPtr& watermark_factory)
70 : : name_(name), thread_factory_(thread_factory), time_source_(time_source),
71 : file_system_(file_system), buffer_factory_(watermark_factory),
72 : scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
73 : thread_local_delete_cb_(
74 0 : base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
75 : deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
76 1845 : [this]() -> void { clearDeferredDeleteList(); })),
77 1909 : post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
78 1498 : current_to_delete_(&to_delete_1_), scaled_timer_manager_(scaled_timer_factory(*this)) {
79 1498 : ASSERT(!name_.empty());
80 1498 : FatalErrorHandler::registerFatalErrorHandler(*this);
81 1498 : updateApproximateMonotonicTimeInternal();
82 1498 : base_scheduler_.registerOnPrepareCallback(
83 1498 : std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
84 1498 : }
85 :
86 1498 : DispatcherImpl::~DispatcherImpl() {
87 1498 : ENVOY_LOG(debug, "destroying dispatcher {}", name_);
88 1498 : FatalErrorHandler::removeFatalErrorHandler(*this);
89 : // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable
90 : // ASSERT(deletable_in_dispatcher_thread_.empty())
91 1498 : }
92 :
93 : void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
94 192 : std::chrono::milliseconds min_touch_interval) {
95 192 : ASSERT(!watchdog_registration_, "Each dispatcher can have at most one registered watchdog.");
96 192 : watchdog_registration_ =
97 192 : std::make_unique<WatchdogRegistration>(watchdog, *scheduler_, min_touch_interval, *this);
98 192 : }
99 :
100 : void DispatcherImpl::initializeStats(Stats::Scope& scope,
101 2 : const absl::optional<std::string>& prefix) {
102 2 : const std::string effective_prefix = prefix.has_value() ? *prefix : absl::StrCat(name_, ".");
103 : // This needs to be run in the dispatcher's thread, so that we have a thread id to log.
104 2 : post([this, &scope, effective_prefix] {
105 2 : stats_prefix_ = effective_prefix + "dispatcher";
106 2 : stats_ = std::make_unique<DispatcherStats>(
107 2 : DispatcherStats{ALL_DISPATCHER_STATS(POOL_HISTOGRAM_PREFIX(scope, stats_prefix_ + "."))});
108 2 : base_scheduler_.initializeStats(stats_.get());
109 2 : ENVOY_LOG(debug, "running {} on thread {}", stats_prefix_, run_tid_.debugString());
110 2 : });
111 2 : }
112 :
113 23361 : void DispatcherImpl::clearDeferredDeleteList() {
114 23361 : ASSERT(isThreadSafe());
115 23361 : std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;
116 :
117 23361 : size_t num_to_delete = to_delete->size();
118 23361 : if (deferred_deleting_ || !num_to_delete) {
119 21509 : return;
120 21509 : }
121 :
122 1852 : ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
123 :
124 : // Swap the current deletion vector so that if we do deferred delete while we are deleting, we
125 : // use the other vector. We will get another callback to delete that vector.
126 1852 : if (current_to_delete_ == &to_delete_1_) {
127 1173 : current_to_delete_ = &to_delete_2_;
128 1173 : } else {
129 679 : current_to_delete_ = &to_delete_1_;
130 679 : }
131 :
132 1852 : touchWatchdog();
133 1852 : deferred_deleting_ = true;
134 :
135 : // Calling clear() on the vector does not specify which order destructors run in. We want to
136 : // destroy in FIFO order so just do it manually. This required 2 passes over the vector which is
137 : // not optimal but can be cleaned up later if needed.
138 6787 : for (size_t i = 0; i < num_to_delete; i++) {
139 4935 : (*to_delete)[i].reset();
140 4935 : }
141 :
142 1852 : to_delete->clear();
143 1852 : deferred_deleting_ = false;
144 1852 : }
145 :
146 : Network::ServerConnectionPtr
147 : DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
148 : Network::TransportSocketPtr&& transport_socket,
149 1042 : StreamInfo::StreamInfo& stream_info) {
150 1042 : ASSERT(isThreadSafe());
151 1042 : return std::make_unique<Network::ServerConnectionImpl>(*this, std::move(socket),
152 1042 : std::move(transport_socket), stream_info);
153 1042 : }
154 :
155 : Network::ClientConnectionPtr DispatcherImpl::createClientConnection(
156 : Network::Address::InstanceConstSharedPtr address,
157 : Network::Address::InstanceConstSharedPtr source_address,
158 : Network::TransportSocketPtr&& transport_socket,
159 : const Network::ConnectionSocket::OptionsSharedPtr& options,
160 1328 : const Network::TransportSocketOptionsConstSharedPtr& transport_options) {
161 1328 : ASSERT(isThreadSafe());
162 :
163 1328 : auto* factory = Config::Utility::getFactoryByName<Network::ClientConnectionFactory>(
164 1328 : std::string(address->addressType()));
165 : // The target address is usually offered by EDS and the EDS api should reject the unsupported
166 : // address.
167 : // TODO(lambdai): Return a closed connection if the factory is not found. Note that the caller
168 : // expects a non-null connection as of today so we cannot gracefully handle unsupported address
169 : // type.
170 1328 : return factory->createClientConnection(*this, address, source_address,
171 1328 : std::move(transport_socket), options, transport_options);
172 1328 : }
173 :
174 : FileEventPtr DispatcherImpl::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
175 3411 : uint32_t events) {
176 3411 : ASSERT(isThreadSafe());
177 3411 : return FileEventPtr{new FileEventImpl(
178 3411 : *this, fd,
179 8612 : [this, cb](uint32_t events) {
180 8612 : touchWatchdog();
181 8612 : cb(events);
182 8612 : },
183 3411 : trigger, events)};
184 3411 : }
185 :
186 70 : Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {
187 70 : ASSERT(isThreadSafe());
188 70 : return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this, file_system_)};
189 70 : }
190 :
191 11196 : TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
192 11196 : ASSERT(isThreadSafe());
193 11196 : return createTimerInternal(cb);
194 11196 : }
195 :
196 1298 : TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerType timer_type, TimerCb cb) {
197 1298 : ASSERT(isThreadSafe());
198 1298 : return scaled_timer_manager_->createTimer(timer_type, std::move(cb));
199 1298 : }
200 :
201 0 : TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) {
202 0 : ASSERT(isThreadSafe());
203 0 : return scaled_timer_manager_->createTimer(minimum, std::move(cb));
204 0 : }
205 :
206 3832 : Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function<void()> cb) {
207 3832 : ASSERT(isThreadSafe());
208 4625 : return base_scheduler_.createSchedulableCallback([this, cb]() {
209 2918 : touchWatchdog();
210 2918 : cb();
211 2918 : });
212 3832 : }
213 :
214 11196 : TimerPtr DispatcherImpl::createTimerInternal(TimerCb cb) {
215 11196 : return scheduler_->createTimer(
216 11196 : [this, cb]() {
217 1143 : touchWatchdog();
218 1143 : cb();
219 1143 : },
220 11196 : *this);
221 11196 : }
222 :
223 4982 : void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
224 4982 : ASSERT(isThreadSafe());
225 4982 : if (to_delete != nullptr) {
226 4935 : to_delete->deleteIsPending();
227 4935 : current_to_delete_->emplace_back(std::move(to_delete));
228 4935 : ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
229 4935 : if (current_to_delete_->size() == 1) {
230 1852 : deferred_delete_cb_->scheduleCallbackCurrentIteration();
231 1852 : }
232 4935 : }
233 4982 : }
234 :
235 1633 : void DispatcherImpl::exit() { base_scheduler_.loopExit(); }
236 :
237 392 : SignalEventPtr DispatcherImpl::listenForSignal(signal_t signal_num, SignalCb cb) {
238 392 : ASSERT(isThreadSafe());
239 392 : return SignalEventPtr{new SignalEventImpl(*this, signal_num, cb)};
240 392 : }
241 :
242 3739 : void DispatcherImpl::post(PostCb callback) {
243 3739 : bool do_post;
244 3739 : {
245 3739 : Thread::LockGuard lock(post_lock_);
246 3739 : do_post = post_callbacks_.empty();
247 3739 : post_callbacks_.push_back(std::move(callback));
248 3739 : }
249 :
250 3739 : if (do_post) {
251 2046 : post_cb_->scheduleCallbackCurrentIteration();
252 2046 : }
253 3739 : }
254 :
255 159 : void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) {
256 159 : bool need_schedule;
257 159 : {
258 159 : Thread::LockGuard lock(thread_local_deletable_lock_);
259 159 : need_schedule = deletables_in_dispatcher_thread_.empty();
260 159 : deletables_in_dispatcher_thread_.emplace_back(std::move(deletable));
261 : // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072
262 : // ASSERT(!shutdown_called_, "inserted after shutdown");
263 159 : }
264 :
265 159 : if (need_schedule) {
266 98 : thread_local_delete_cb_->scheduleCallbackCurrentIteration();
267 98 : }
268 159 : }
269 :
270 18604 : void DispatcherImpl::run(RunType type) {
271 18604 : run_tid_ = thread_factory_.currentThreadId();
272 : // Flush all post callbacks before we run the event loop. We do this because there are post
273 : // callbacks that have to get run before the initial event loop starts running. libevent does
274 : // not guarantee that events are run in any particular order. So even if we post() and call
275 : // event_base_once() before some other event, the other event might get called first.
276 18604 : runPostCallbacks();
277 18604 : base_scheduler_.run(type);
278 18604 : }
279 :
280 17348 : MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
281 17348 : return approximate_monotonic_time_;
282 17348 : }
283 :
284 549 : void DispatcherImpl::shutdown() {
285 : // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below
286 : // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post
287 : // callbacks and dispatcher thread deletable objects.
288 549 : ASSERT(isThreadSafe());
289 549 : auto deferred_deletables_size = current_to_delete_->size();
290 549 : std::list<std::function<void()>>::size_type post_callbacks_size;
291 549 : {
292 549 : Thread::LockGuard lock(post_lock_);
293 549 : post_callbacks_size = post_callbacks_.size();
294 549 : }
295 :
296 549 : std::list<DispatcherThreadDeletableConstPtr> local_deletables;
297 549 : {
298 549 : Thread::LockGuard lock(thread_local_deletable_lock_);
299 549 : local_deletables = std::move(deletables_in_dispatcher_thread_);
300 549 : }
301 549 : auto thread_local_deletables_size = local_deletables.size();
302 708 : while (!local_deletables.empty()) {
303 159 : local_deletables.pop_front();
304 159 : }
305 549 : ASSERT(!shutdown_called_);
306 549 : shutdown_called_ = true;
307 549 : ENVOY_LOG(
308 549 : trace,
309 549 : "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ",
310 549 : __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size);
311 549 : }
312 :
313 31884 : void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); }
314 :
315 33380 : void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
316 33380 : approximate_monotonic_time_ = time_source_.monotonicTime();
317 33380 : }
318 :
319 0 : void DispatcherImpl::runThreadLocalDelete() {
320 0 : std::list<DispatcherThreadDeletableConstPtr> to_be_delete;
321 0 : {
322 0 : Thread::LockGuard lock(thread_local_deletable_lock_);
323 0 : to_be_delete = std::move(deletables_in_dispatcher_thread_);
324 0 : ASSERT(deletables_in_dispatcher_thread_.empty());
325 0 : }
326 0 : while (!to_be_delete.empty()) {
327 : // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when
328 : // executing complicated destruction.
329 0 : touchWatchdog();
330 : // Delete in FIFO order.
331 0 : to_be_delete.pop_front();
332 0 : }
333 0 : }
334 20513 : void DispatcherImpl::runPostCallbacks() {
335 : // Clear the deferred delete list before running post callbacks to reduce non-determinism in
336 : // callback processing, and more easily detect if a scheduled post callback refers to one of the
337 : // objects that is being deferred deleted.
338 20513 : clearDeferredDeleteList();
339 :
340 20513 : std::list<PostCb> callbacks;
341 20513 : {
342 : // Take ownership of the callbacks under the post_lock_. The lock must be released before
343 : // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
344 : // later in the event loop.
345 20513 : Thread::LockGuard lock(post_lock_);
346 20513 : callbacks = std::move(post_callbacks_);
347 : // post_callbacks_ should be empty after the move.
348 20513 : ASSERT(post_callbacks_.empty());
349 20513 : }
350 : // It is important that the execution and deletion of the callback happen while post_lock_ is not
351 : // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
352 23998 : while (!callbacks.empty()) {
353 : // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
354 : // executing a long list of callbacks.
355 3485 : touchWatchdog();
356 : // Run the callback.
357 3485 : callbacks.front()();
358 : // Pop the front so that the destructor of the callback that just executed runs before the next
359 : // callback executes.
360 3485 : callbacks.pop_front();
361 3485 : }
362 20513 : }
363 :
364 0 : void DispatcherImpl::onFatalError(std::ostream& os) const {
365 : // Dump the state of the tracked objects in the dispatcher if thread safe. This generally
366 : // results in dumping the active state only for the thread which caused the fatal error.
367 0 : if (isThreadSafe()) {
368 0 : for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) {
369 0 : (*iter)->dumpState(os);
370 0 : }
371 0 : }
372 0 : }
373 :
374 : void DispatcherImpl::runFatalActionsOnTrackedObject(
375 0 : const FatalAction::FatalActionPtrList& actions) const {
376 : // Only run if this is the dispatcher of the current thread and
377 : // DispatcherImpl::Run has been called.
378 0 : if (run_tid_.isEmpty() || (run_tid_ != thread_factory_.currentThreadId())) {
379 0 : return;
380 0 : }
381 :
382 0 : for (const auto& action : actions) {
383 0 : action->run(tracked_object_stack_);
384 0 : }
385 0 : }
386 :
387 18008 : void DispatcherImpl::touchWatchdog() {
388 18008 : if (watchdog_registration_) {
389 7018 : watchdog_registration_->touchWatchdog();
390 7018 : }
391 18008 : }
392 :
393 11809 : void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) {
394 11809 : ASSERT(isThreadSafe());
395 11809 : ASSERT(object != nullptr);
396 11809 : tracked_object_stack_.push_back(object);
397 11809 : ASSERT(tracked_object_stack_.size() <= ExpectedMaxTrackedObjectStackDepth);
398 11809 : }
399 :
400 11811 : void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) {
401 11811 : ASSERT(isThreadSafe());
402 11811 : ASSERT(expected_object != nullptr);
403 11811 : RELEASE_ASSERT(!tracked_object_stack_.empty(), "Tracked Object Stack is empty, nothing to pop!");
404 :
405 11811 : const ScopeTrackedObject* top = tracked_object_stack_.back();
406 11811 : tracked_object_stack_.pop_back();
407 11811 : ASSERT(top == expected_object,
408 11811 : "Popped the top of the tracked object stack, but it wasn't the expected object!");
409 11811 : }
410 :
411 : } // namespace Event
412 : } // namespace Envoy
|