/proc/self/cwd/source/common/event/dispatcher_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/event/dispatcher_impl.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <functional> |
6 | | #include <string> |
7 | | #include <vector> |
8 | | |
9 | | #include "envoy/api/api.h" |
10 | | #include "envoy/common/scope_tracker.h" |
11 | | #include "envoy/config/overload/v3/overload.pb.h" |
12 | | #include "envoy/network/client_connection_factory.h" |
13 | | #include "envoy/network/listen_socket.h" |
14 | | #include "envoy/network/listener.h" |
15 | | |
16 | | #include "source/common/buffer/buffer_impl.h" |
17 | | #include "source/common/common/assert.h" |
18 | | #include "source/common/common/lock_guard.h" |
19 | | #include "source/common/common/thread.h" |
20 | | #include "source/common/config/utility.h" |
21 | | #include "source/common/event/file_event_impl.h" |
22 | | #include "source/common/event/libevent_scheduler.h" |
23 | | #include "source/common/event/scaled_range_timer_manager_impl.h" |
24 | | #include "source/common/event/signal_impl.h" |
25 | | #include "source/common/event/timer_impl.h" |
26 | | #include "source/common/filesystem/watcher_impl.h" |
27 | | #include "source/common/network/address_impl.h" |
28 | | #include "source/common/network/connection_impl.h" |
29 | | #include "source/common/runtime/runtime_features.h" |
30 | | |
31 | | #include "event2/event.h" |
32 | | |
33 | | #ifdef ENVOY_HANDLE_SIGNALS |
34 | | #include "source/common/signal/signal_action.h" |
35 | | #endif |
36 | | |
37 | | namespace Envoy { |
38 | | namespace Event { |
39 | | |
40 | | DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, |
41 | | Event::TimeSystem& time_system) |
42 | 5.52k | : DispatcherImpl(name, api, time_system, {}) {} |
43 | | |
44 | | DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, |
45 | | Event::TimeSystem& time_system, |
46 | | const Buffer::WatermarkFactorySharedPtr& watermark_factory) |
47 | | : DispatcherImpl( |
48 | | name, api, time_system, |
49 | 20.5k | [](Dispatcher& dispatcher) { |
50 | 20.5k | return std::make_unique<ScaledRangeTimerManagerImpl>(dispatcher); |
51 | 20.5k | }, |
52 | 20.5k | watermark_factory) {} |
53 | | |
54 | | DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api, |
55 | | Event::TimeSystem& time_system, |
56 | | const ScaledRangeTimerManagerFactory& scaled_timer_factory, |
57 | | const Buffer::WatermarkFactorySharedPtr& watermark_factory) |
58 | | : DispatcherImpl(name, api.threadFactory(), api.timeSource(), api.fileSystem(), time_system, |
59 | | scaled_timer_factory, |
60 | | watermark_factory != nullptr |
61 | | ? watermark_factory |
62 | | : std::make_shared<Buffer::WatermarkBufferFactory>( |
63 | 25.0k | api.bootstrap().overload_manager().buffer_factory_config())) {} |
64 | | |
65 | | DispatcherImpl::DispatcherImpl(const std::string& name, Thread::ThreadFactory& thread_factory, |
66 | | TimeSource& time_source, Filesystem::Instance& file_system, |
67 | | Event::TimeSystem& time_system, |
68 | | const ScaledRangeTimerManagerFactory& scaled_timer_factory, |
69 | | const Buffer::WatermarkFactorySharedPtr& watermark_factory) |
70 | | : name_(name), thread_factory_(thread_factory), time_source_(time_source), |
71 | | file_system_(file_system), buffer_factory_(watermark_factory), |
72 | | scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)), |
73 | | thread_local_delete_cb_( |
74 | 0 | base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })), |
75 | | deferred_delete_cb_(base_scheduler_.createSchedulableCallback( |
76 | 7.32k | [this]() -> void { clearDeferredDeleteList(); })), |
77 | 23.6k | post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })), |
78 | 25.0k | current_to_delete_(&to_delete_1_), scaled_timer_manager_(scaled_timer_factory(*this)) { |
79 | 25.0k | ASSERT(!name_.empty()); |
80 | 25.0k | FatalErrorHandler::registerFatalErrorHandler(*this); |
81 | 25.0k | updateApproximateMonotonicTimeInternal(); |
82 | 25.0k | if (Runtime::runtimeFeatureEnabled("envoy.restart_features.fix_dispatcher_approximate_now")) { |
83 | 25.0k | base_scheduler_.registerOnCheckCallback( |
84 | 25.0k | std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this)); |
85 | 25.0k | } else { |
86 | 0 | base_scheduler_.registerOnPrepareCallback( |
87 | 0 | std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this)); |
88 | 0 | } |
89 | 25.0k | } |
90 | | |
91 | 25.0k | DispatcherImpl::~DispatcherImpl() { |
92 | 25.0k | ENVOY_LOG(debug, "destroying dispatcher {}", name_); |
93 | 25.0k | FatalErrorHandler::removeFatalErrorHandler(*this); |
94 | | // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable |
95 | | // ASSERT(deletable_in_dispatcher_thread_.empty()) |
96 | 25.0k | } |
97 | | |
98 | | void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog, |
99 | 3.95k | std::chrono::milliseconds min_touch_interval) { |
100 | 3.95k | ASSERT(!watchdog_registration_, "Each dispatcher can have at most one registered watchdog."); |
101 | 3.95k | watchdog_registration_ = |
102 | 3.95k | std::make_unique<WatchdogRegistration>(watchdog, *scheduler_, min_touch_interval, *this); |
103 | 3.95k | } |
104 | | |
105 | | void DispatcherImpl::initializeStats(Stats::Scope& scope, |
106 | 170 | const absl::optional<std::string>& prefix) { |
107 | 170 | const std::string effective_prefix = prefix.has_value() ? *prefix : absl::StrCat(name_, "."); |
108 | | // This needs to be run in the dispatcher's thread, so that we have a thread id to log. |
109 | 170 | post([this, &scope, effective_prefix] { |
110 | 87 | stats_prefix_ = effective_prefix + "dispatcher"; |
111 | 87 | stats_ = std::make_unique<DispatcherStats>( |
112 | 87 | DispatcherStats{ALL_DISPATCHER_STATS(POOL_HISTOGRAM_PREFIX(scope, stats_prefix_ + "."))}); |
113 | 87 | base_scheduler_.initializeStats(stats_.get()); |
114 | 87 | ENVOY_LOG(debug, "running {} on thread {}", stats_prefix_, run_tid_.debugString()); |
115 | 87 | }); |
116 | 170 | } |
117 | | |
118 | 193k | void DispatcherImpl::clearDeferredDeleteList() { |
119 | 193k | ASSERT(isThreadSafe()); |
120 | 193k | std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_; |
121 | | |
122 | 193k | size_t num_to_delete = to_delete->size(); |
123 | 193k | if (deferred_deleting_ || !num_to_delete) { |
124 | 185k | return; |
125 | 185k | } |
126 | | |
127 | 8.11k | ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete); |
128 | | |
129 | | // Swap the current deletion vector so that if we do deferred delete while we are deleting, we |
130 | | // use the other vector. We will get another callback to delete that vector. |
131 | 8.11k | if (current_to_delete_ == &to_delete_1_) { |
132 | 5.43k | current_to_delete_ = &to_delete_2_; |
133 | 5.43k | } else { |
134 | 2.67k | current_to_delete_ = &to_delete_1_; |
135 | 2.67k | } |
136 | | |
137 | 8.11k | touchWatchdog(); |
138 | 8.11k | deferred_deleting_ = true; |
139 | | |
140 | | // Calling clear() on the vector does not specify which order destructors run in. We want to |
141 | | // destroy in FIFO order so just do it manually. This required 2 passes over the vector which is |
142 | | // not optimal but can be cleaned up later if needed. |
143 | 31.5k | for (size_t i = 0; i < num_to_delete; i++) { |
144 | 23.4k | (*to_delete)[i].reset(); |
145 | 23.4k | } |
146 | | |
147 | 8.11k | to_delete->clear(); |
148 | 8.11k | deferred_deleting_ = false; |
149 | 8.11k | } |
150 | | |
151 | | Network::ServerConnectionPtr |
152 | | DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket, |
153 | | Network::TransportSocketPtr&& transport_socket, |
154 | 3.42k | StreamInfo::StreamInfo& stream_info) { |
155 | 3.42k | ASSERT(isThreadSafe()); |
156 | 3.42k | return std::make_unique<Network::ServerConnectionImpl>(*this, std::move(socket), |
157 | 3.42k | std::move(transport_socket), stream_info); |
158 | 3.42k | } |
159 | | |
160 | | Network::ClientConnectionPtr DispatcherImpl::createClientConnection( |
161 | | Network::Address::InstanceConstSharedPtr address, |
162 | | Network::Address::InstanceConstSharedPtr source_address, |
163 | | Network::TransportSocketPtr&& transport_socket, |
164 | | const Network::ConnectionSocket::OptionsSharedPtr& options, |
165 | 3.84k | const Network::TransportSocketOptionsConstSharedPtr& transport_options) { |
166 | 3.84k | ASSERT(isThreadSafe()); |
167 | | |
168 | 3.84k | auto* factory = Config::Utility::getFactoryByName<Network::ClientConnectionFactory>( |
169 | 3.84k | std::string(address->addressType())); |
170 | | // The target address is usually offered by EDS and the EDS api should reject the unsupported |
171 | | // address. |
172 | | // TODO(lambdai): Return a closed connection if the factory is not found. Note that the caller |
173 | | // expects a non-null connection as of today so we cannot gracefully handle unsupported address |
174 | | // type. |
175 | 3.84k | return factory->createClientConnection(*this, address, source_address, |
176 | 3.84k | std::move(transport_socket), options, transport_options); |
177 | 3.84k | } |
178 | | |
179 | | FileEventPtr DispatcherImpl::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger, |
180 | 15.3k | uint32_t events) { |
181 | 15.3k | ASSERT(isThreadSafe()); |
182 | 15.3k | return FileEventPtr{new FileEventImpl( |
183 | 15.3k | *this, fd, |
184 | 38.6k | [this, cb](uint32_t events) { |
185 | 38.6k | touchWatchdog(); |
186 | 38.6k | return cb(events); |
187 | 38.6k | }, |
188 | 15.3k | trigger, events)}; |
189 | 15.3k | } |
190 | | |
191 | 2.10k | Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() { |
192 | 2.10k | ASSERT(isThreadSafe()); |
193 | 2.10k | return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this, file_system_)}; |
194 | 2.10k | } |
195 | | |
196 | 38.7k | TimerPtr DispatcherImpl::createTimer(TimerCb cb) { |
197 | 38.7k | ASSERT(isThreadSafe()); |
198 | 38.7k | return createTimerInternal(cb); |
199 | 38.7k | } |
200 | | |
201 | 4.44k | TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerType timer_type, TimerCb cb) { |
202 | 4.44k | ASSERT(isThreadSafe()); |
203 | 4.44k | return scaled_timer_manager_->createTimer(timer_type, std::move(cb)); |
204 | 4.44k | } |
205 | | |
206 | 0 | TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) { |
207 | 0 | ASSERT(isThreadSafe()); |
208 | 0 | return scaled_timer_manager_->createTimer(minimum, std::move(cb)); |
209 | 0 | } |
210 | | |
211 | 17.5k | Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function<void()> cb) { |
212 | 17.5k | ASSERT(isThreadSafe()); |
213 | 18.4k | return base_scheduler_.createSchedulableCallback([this, cb]() { |
214 | 18.4k | touchWatchdog(); |
215 | 18.4k | cb(); |
216 | 18.4k | }); |
217 | 17.5k | } |
218 | | |
219 | 38.7k | TimerPtr DispatcherImpl::createTimerInternal(TimerCb cb) { |
220 | 38.7k | return scheduler_->createTimer( |
221 | 38.7k | [this, cb]() { |
222 | 35.6k | touchWatchdog(); |
223 | 35.6k | cb(); |
224 | 35.6k | }, |
225 | 38.7k | *this); |
226 | 38.7k | } |
227 | | |
228 | 23.4k | void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) { |
229 | 23.4k | ASSERT(isThreadSafe()); |
230 | 23.4k | if (to_delete != nullptr) { |
231 | 23.4k | to_delete->deleteIsPending(); |
232 | 23.4k | current_to_delete_->emplace_back(std::move(to_delete)); |
233 | 23.4k | ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size()); |
234 | 23.4k | if (current_to_delete_->size() == 1) { |
235 | 8.11k | deferred_delete_cb_->scheduleCallbackCurrentIteration(); |
236 | 8.11k | } |
237 | 23.4k | } |
238 | 23.4k | } |
239 | | |
240 | 13.8k | void DispatcherImpl::exit() { base_scheduler_.loopExit(); } |
241 | | |
242 | 7.90k | SignalEventPtr DispatcherImpl::listenForSignal(signal_t signal_num, SignalCb cb) { |
243 | 7.90k | ASSERT(isThreadSafe()); |
244 | 7.90k | return SignalEventPtr{new SignalEventImpl(*this, signal_num, cb)}; |
245 | 7.90k | } |
246 | | |
247 | 73.5k | void DispatcherImpl::post(PostCb callback) { |
248 | 73.5k | bool do_post; |
249 | 73.5k | { |
250 | 73.5k | Thread::LockGuard lock(post_lock_); |
251 | 73.5k | do_post = post_callbacks_.empty(); |
252 | 73.5k | post_callbacks_.push_back(std::move(callback)); |
253 | 73.5k | } |
254 | | |
255 | 73.5k | if (do_post) { |
256 | 28.5k | post_cb_->scheduleCallbackCurrentIteration(); |
257 | 28.5k | } |
258 | 73.5k | } |
259 | | |
260 | 2.40k | void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) { |
261 | 2.40k | bool need_schedule; |
262 | 2.40k | { |
263 | 2.40k | Thread::LockGuard lock(thread_local_deletable_lock_); |
264 | 2.40k | need_schedule = deletables_in_dispatcher_thread_.empty(); |
265 | 2.40k | deletables_in_dispatcher_thread_.emplace_back(std::move(deletable)); |
266 | | // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072 |
267 | | // ASSERT(!shutdown_called_, "inserted after shutdown"); |
268 | 2.40k | } |
269 | | |
270 | 2.40k | if (need_schedule) { |
271 | 1.97k | thread_local_delete_cb_->scheduleCallbackCurrentIteration(); |
272 | 1.97k | } |
273 | 2.40k | } |
274 | | |
275 | 149k | void DispatcherImpl::run(RunType type) { |
276 | 149k | run_tid_ = thread_factory_.currentThreadId(); |
277 | | // Flush all post callbacks before we run the event loop. We do this because there are post |
278 | | // callbacks that have to get run before the initial event loop starts running. libevent does |
279 | | // not guarantee that events are run in any particular order. So even if we post() and call |
280 | | // event_base_once() before some other event, the other event might get called first. |
281 | 149k | runPostCallbacks(); |
282 | 149k | base_scheduler_.run(type); |
283 | 149k | } |
284 | | |
285 | 15.0k | MonotonicTime DispatcherImpl::approximateMonotonicTime() const { |
286 | 15.0k | return approximate_monotonic_time_; |
287 | 15.0k | } |
288 | | |
289 | 12.4k | void DispatcherImpl::shutdown() { |
290 | | // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below |
291 | | // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post |
292 | | // callbacks and dispatcher thread deletable objects. |
293 | 12.4k | ASSERT(isThreadSafe()); |
294 | 12.4k | auto deferred_deletables_size = current_to_delete_->size(); |
295 | 12.4k | std::list<std::function<void()>>::size_type post_callbacks_size; |
296 | 12.4k | { |
297 | 12.4k | Thread::LockGuard lock(post_lock_); |
298 | 12.4k | post_callbacks_size = post_callbacks_.size(); |
299 | 12.4k | } |
300 | | |
301 | 12.4k | std::list<DispatcherThreadDeletableConstPtr> local_deletables; |
302 | 12.4k | { |
303 | 12.4k | Thread::LockGuard lock(thread_local_deletable_lock_); |
304 | 12.4k | local_deletables = std::move(deletables_in_dispatcher_thread_); |
305 | 12.4k | } |
306 | 12.4k | auto thread_local_deletables_size = local_deletables.size(); |
307 | 14.8k | while (!local_deletables.empty()) { |
308 | 2.40k | local_deletables.pop_front(); |
309 | 2.40k | } |
310 | 12.4k | ASSERT(!shutdown_called_); |
311 | 12.4k | shutdown_called_ = true; |
312 | 12.4k | ENVOY_LOG( |
313 | 12.4k | trace, |
314 | 12.4k | "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ", |
315 | 12.4k | __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size); |
316 | 12.4k | } |
317 | | |
318 | 156k | void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); } |
319 | | |
320 | 181k | void DispatcherImpl::updateApproximateMonotonicTimeInternal() { |
321 | 181k | approximate_monotonic_time_ = time_source_.monotonicTime(); |
322 | 181k | } |
323 | | |
324 | 0 | void DispatcherImpl::runThreadLocalDelete() { |
325 | 0 | std::list<DispatcherThreadDeletableConstPtr> to_be_delete; |
326 | 0 | { |
327 | 0 | Thread::LockGuard lock(thread_local_deletable_lock_); |
328 | 0 | to_be_delete = std::move(deletables_in_dispatcher_thread_); |
329 | 0 | ASSERT(deletables_in_dispatcher_thread_.empty()); |
330 | 0 | } |
331 | 0 | while (!to_be_delete.empty()) { |
332 | | // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when |
333 | | // executing complicated destruction. |
334 | 0 | touchWatchdog(); |
335 | | // Delete in FIFO order. |
336 | 0 | to_be_delete.pop_front(); |
337 | 0 | } |
338 | 0 | } |
339 | 172k | void DispatcherImpl::runPostCallbacks() { |
340 | | // Clear the deferred delete list before running post callbacks to reduce non-determinism in |
341 | | // callback processing, and more easily detect if a scheduled post callback refers to one of the |
342 | | // objects that is being deferred deleted. |
343 | 172k | clearDeferredDeleteList(); |
344 | | |
345 | 172k | std::list<PostCb> callbacks; |
346 | 172k | { |
347 | | // Take ownership of the callbacks under the post_lock_. The lock must be released before |
348 | | // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute |
349 | | // later in the event loop. |
350 | 172k | Thread::LockGuard lock(post_lock_); |
351 | 172k | callbacks = std::move(post_callbacks_); |
352 | | // post_callbacks_ should be empty after the move. |
353 | 172k | ASSERT(post_callbacks_.empty()); |
354 | 172k | } |
355 | | // It is important that the execution and deletion of the callback happen while post_lock_ is not |
356 | | // held. Either the invocation or destructor of the callback can call post() on this dispatcher. |
357 | 230k | while (!callbacks.empty()) { |
358 | | // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when |
359 | | // executing a long list of callbacks. |
360 | 57.8k | touchWatchdog(); |
361 | | // Run the callback. |
362 | 57.8k | callbacks.front()(); |
363 | | // Pop the front so that the destructor of the callback that just executed runs before the next |
364 | | // callback executes. |
365 | 57.8k | callbacks.pop_front(); |
366 | 57.8k | } |
367 | 172k | } |
368 | | |
369 | 0 | void DispatcherImpl::onFatalError(std::ostream& os) const { |
370 | | // Dump the state of the tracked objects in the dispatcher if thread safe. This generally |
371 | | // results in dumping the active state only for the thread which caused the fatal error. |
372 | 0 | if (isThreadSafe()) { |
373 | 0 | for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) { |
374 | 0 | (*iter)->dumpState(os); |
375 | 0 | } |
376 | 0 | } |
377 | 0 | } |
378 | | |
379 | | void DispatcherImpl::runFatalActionsOnTrackedObject( |
380 | 0 | const FatalAction::FatalActionPtrList& actions) const { |
381 | | // Only run if this is the dispatcher of the current thread and |
382 | | // DispatcherImpl::Run has been called. |
383 | 0 | if (run_tid_.isEmpty() || (run_tid_ != thread_factory_.currentThreadId())) { |
384 | 0 | return; |
385 | 0 | } |
386 | | |
387 | 0 | for (const auto& action : actions) { |
388 | 0 | action->run(tracked_object_stack_); |
389 | 0 | } |
390 | 0 | } |
391 | | |
392 | 158k | void DispatcherImpl::touchWatchdog() { |
393 | 158k | if (watchdog_registration_) { |
394 | 48.6k | watchdog_registration_->touchWatchdog(); |
395 | 48.6k | } |
396 | 158k | } |
397 | | |
398 | 62.9k | void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) { |
399 | 62.9k | ASSERT(isThreadSafe()); |
400 | 62.9k | ASSERT(object != nullptr); |
401 | 62.9k | tracked_object_stack_.push_back(object); |
402 | 62.9k | ASSERT(tracked_object_stack_.size() <= ExpectedMaxTrackedObjectStackDepth); |
403 | 62.9k | } |
404 | | |
405 | 62.9k | void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) { |
406 | 62.9k | ASSERT(isThreadSafe()); |
407 | 62.9k | ASSERT(expected_object != nullptr); |
408 | 62.9k | RELEASE_ASSERT(!tracked_object_stack_.empty(), "Tracked Object Stack is empty, nothing to pop!"); |
409 | | |
410 | 62.9k | const ScopeTrackedObject* top = tracked_object_stack_.back(); |
411 | 62.9k | tracked_object_stack_.pop_back(); |
412 | 62.9k | ASSERT(top == expected_object, |
413 | 62.9k | "Popped the top of the tracked object stack, but it wasn't the expected object!"); |
414 | 62.9k | } |
415 | | |
416 | | } // namespace Event |
417 | | } // namespace Envoy |