LCOV - code coverage report
Current view: top level - source/common/event - dispatcher_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 212 244 86.9 %
Date: 2024-01-05 06:35:25 Functions: 37 42 88.1 %

          Line data    Source code
       1             : #include "source/common/event/dispatcher_impl.h"
       2             : 
       3             : #include <chrono>
       4             : #include <cstdint>
       5             : #include <functional>
       6             : #include <string>
       7             : #include <vector>
       8             : 
       9             : #include "envoy/api/api.h"
      10             : #include "envoy/common/scope_tracker.h"
      11             : #include "envoy/config/overload/v3/overload.pb.h"
      12             : #include "envoy/network/client_connection_factory.h"
      13             : #include "envoy/network/listen_socket.h"
      14             : #include "envoy/network/listener.h"
      15             : 
      16             : #include "source/common/buffer/buffer_impl.h"
      17             : #include "source/common/common/assert.h"
      18             : #include "source/common/common/lock_guard.h"
      19             : #include "source/common/common/thread.h"
      20             : #include "source/common/config/utility.h"
      21             : #include "source/common/event/file_event_impl.h"
      22             : #include "source/common/event/libevent_scheduler.h"
      23             : #include "source/common/event/scaled_range_timer_manager_impl.h"
      24             : #include "source/common/event/signal_impl.h"
      25             : #include "source/common/event/timer_impl.h"
      26             : #include "source/common/filesystem/watcher_impl.h"
      27             : #include "source/common/network/address_impl.h"
      28             : #include "source/common/network/connection_impl.h"
      29             : #include "source/common/runtime/runtime_features.h"
      30             : 
      31             : #include "event2/event.h"
      32             : 
      33             : #ifdef ENVOY_HANDLE_SIGNALS
      34             : #include "source/common/signal/signal_action.h"
      35             : #endif
      36             : 
      37             : namespace Envoy {
      38             : namespace Event {
      39             : 
      40             : DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
      41             :                                Event::TimeSystem& time_system)
      42         321 :     : DispatcherImpl(name, api, time_system, {}) {}
      43             : 
      44             : DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
      45             :                                Event::TimeSystem& time_system,
      46             :                                const Buffer::WatermarkFactorySharedPtr& watermark_factory)
      47             :     : DispatcherImpl(
      48             :           name, api, time_system,
      49        1367 :           [](Dispatcher& dispatcher) {
      50        1367 :             return std::make_unique<ScaledRangeTimerManagerImpl>(dispatcher);
      51        1367 :           },
      52        1367 :           watermark_factory) {}
      53             : 
      54             : DispatcherImpl::DispatcherImpl(const std::string& name, Api::Api& api,
      55             :                                Event::TimeSystem& time_system,
      56             :                                const ScaledRangeTimerManagerFactory& scaled_timer_factory,
      57             :                                const Buffer::WatermarkFactorySharedPtr& watermark_factory)
      58             :     : DispatcherImpl(name, api.threadFactory(), api.timeSource(), api.randomGenerator(),
      59             :                      api.fileSystem(), time_system, scaled_timer_factory,
      60             :                      watermark_factory != nullptr
      61             :                          ? watermark_factory
      62             :                          : std::make_shared<Buffer::WatermarkBufferFactory>(
      63        1498 :                                api.bootstrap().overload_manager().buffer_factory_config())) {}
      64             : 
      65             : DispatcherImpl::DispatcherImpl(const std::string& name, Thread::ThreadFactory& thread_factory,
      66             :                                TimeSource& time_source, Random::RandomGenerator&,
      67             :                                Filesystem::Instance& file_system, Event::TimeSystem& time_system,
      68             :                                const ScaledRangeTimerManagerFactory& scaled_timer_factory,
      69             :                                const Buffer::WatermarkFactorySharedPtr& watermark_factory)
      70             :     : name_(name), thread_factory_(thread_factory), time_source_(time_source),
      71             :       file_system_(file_system), buffer_factory_(watermark_factory),
      72             :       scheduler_(time_system.createScheduler(base_scheduler_, base_scheduler_)),
      73             :       thread_local_delete_cb_(
      74           0 :           base_scheduler_.createSchedulableCallback([this]() -> void { runThreadLocalDelete(); })),
      75             :       deferred_delete_cb_(base_scheduler_.createSchedulableCallback(
      76        1845 :           [this]() -> void { clearDeferredDeleteList(); })),
      77        1909 :       post_cb_(base_scheduler_.createSchedulableCallback([this]() -> void { runPostCallbacks(); })),
      78        1498 :       current_to_delete_(&to_delete_1_), scaled_timer_manager_(scaled_timer_factory(*this)) {
      79        1498 :   ASSERT(!name_.empty());
      80        1498 :   FatalErrorHandler::registerFatalErrorHandler(*this);
      81        1498 :   updateApproximateMonotonicTimeInternal();
      82        1498 :   base_scheduler_.registerOnPrepareCallback(
      83        1498 :       std::bind(&DispatcherImpl::updateApproximateMonotonicTime, this));
      84        1498 : }
      85             : 
      86        1498 : DispatcherImpl::~DispatcherImpl() {
      87        1498 :   ENVOY_LOG(debug, "destroying dispatcher {}", name_);
      88        1498 :   FatalErrorHandler::removeFatalErrorHandler(*this);
      89             :   // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and enable
      90             :   // ASSERT(deletable_in_dispatcher_thread_.empty())
      91        1498 : }
      92             : 
      93             : void DispatcherImpl::registerWatchdog(const Server::WatchDogSharedPtr& watchdog,
      94         192 :                                       std::chrono::milliseconds min_touch_interval) {
      95         192 :   ASSERT(!watchdog_registration_, "Each dispatcher can have at most one registered watchdog.");
      96         192 :   watchdog_registration_ =
      97         192 :       std::make_unique<WatchdogRegistration>(watchdog, *scheduler_, min_touch_interval, *this);
      98         192 : }
      99             : 
     100             : void DispatcherImpl::initializeStats(Stats::Scope& scope,
     101           2 :                                      const absl::optional<std::string>& prefix) {
     102           2 :   const std::string effective_prefix = prefix.has_value() ? *prefix : absl::StrCat(name_, ".");
     103             :   // This needs to be run in the dispatcher's thread, so that we have a thread id to log.
     104           2 :   post([this, &scope, effective_prefix] {
     105           2 :     stats_prefix_ = effective_prefix + "dispatcher";
     106           2 :     stats_ = std::make_unique<DispatcherStats>(
     107           2 :         DispatcherStats{ALL_DISPATCHER_STATS(POOL_HISTOGRAM_PREFIX(scope, stats_prefix_ + "."))});
     108           2 :     base_scheduler_.initializeStats(stats_.get());
     109           2 :     ENVOY_LOG(debug, "running {} on thread {}", stats_prefix_, run_tid_.debugString());
     110           2 :   });
     111           2 : }
     112             : 
     113       23361 : void DispatcherImpl::clearDeferredDeleteList() {
     114       23361 :   ASSERT(isThreadSafe());
     115       23361 :   std::vector<DeferredDeletablePtr>* to_delete = current_to_delete_;
     116             : 
     117       23361 :   size_t num_to_delete = to_delete->size();
     118       23361 :   if (deferred_deleting_ || !num_to_delete) {
     119       21509 :     return;
     120       21509 :   }
     121             : 
     122        1852 :   ENVOY_LOG(trace, "clearing deferred deletion list (size={})", num_to_delete);
     123             : 
     124             :   // Swap the current deletion vector so that if we do deferred delete while we are deleting, we
     125             :   // use the other vector. We will get another callback to delete that vector.
     126        1852 :   if (current_to_delete_ == &to_delete_1_) {
     127        1173 :     current_to_delete_ = &to_delete_2_;
     128        1173 :   } else {
     129         679 :     current_to_delete_ = &to_delete_1_;
     130         679 :   }
     131             : 
     132        1852 :   touchWatchdog();
     133        1852 :   deferred_deleting_ = true;
     134             : 
     135             :   // Calling clear() on the vector does not specify which order destructors run in. We want to
     136             :   // destroy in FIFO order so just do it manually. This required 2 passes over the vector which is
     137             :   // not optimal but can be cleaned up later if needed.
     138        6787 :   for (size_t i = 0; i < num_to_delete; i++) {
     139        4935 :     (*to_delete)[i].reset();
     140        4935 :   }
     141             : 
     142        1852 :   to_delete->clear();
     143        1852 :   deferred_deleting_ = false;
     144        1852 : }
     145             : 
     146             : Network::ServerConnectionPtr
     147             : DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
     148             :                                        Network::TransportSocketPtr&& transport_socket,
     149        1042 :                                        StreamInfo::StreamInfo& stream_info) {
     150        1042 :   ASSERT(isThreadSafe());
     151        1042 :   return std::make_unique<Network::ServerConnectionImpl>(*this, std::move(socket),
     152        1042 :                                                          std::move(transport_socket), stream_info);
     153        1042 : }
     154             : 
     155             : Network::ClientConnectionPtr DispatcherImpl::createClientConnection(
     156             :     Network::Address::InstanceConstSharedPtr address,
     157             :     Network::Address::InstanceConstSharedPtr source_address,
     158             :     Network::TransportSocketPtr&& transport_socket,
     159             :     const Network::ConnectionSocket::OptionsSharedPtr& options,
     160        1328 :     const Network::TransportSocketOptionsConstSharedPtr& transport_options) {
     161        1328 :   ASSERT(isThreadSafe());
     162             : 
     163        1328 :   auto* factory = Config::Utility::getFactoryByName<Network::ClientConnectionFactory>(
     164        1328 :       std::string(address->addressType()));
     165             :   // The target address is usually offered by EDS and the EDS api should reject the unsupported
     166             :   // address.
     167             :   // TODO(lambdai): Return a closed connection if the factory is not found. Note that the caller
     168             :   // expects a non-null connection as of today so we cannot gracefully handle unsupported address
     169             :   // type.
     170        1328 :   return factory->createClientConnection(*this, address, source_address,
     171        1328 :                                          std::move(transport_socket), options, transport_options);
     172        1328 : }
     173             : 
     174             : FileEventPtr DispatcherImpl::createFileEvent(os_fd_t fd, FileReadyCb cb, FileTriggerType trigger,
     175        3411 :                                              uint32_t events) {
     176        3411 :   ASSERT(isThreadSafe());
     177        3411 :   return FileEventPtr{new FileEventImpl(
     178        3411 :       *this, fd,
     179        8612 :       [this, cb](uint32_t events) {
     180        8612 :         touchWatchdog();
     181        8612 :         cb(events);
     182        8612 :       },
     183        3411 :       trigger, events)};
     184        3411 : }
     185             : 
     186          70 : Filesystem::WatcherPtr DispatcherImpl::createFilesystemWatcher() {
     187          70 :   ASSERT(isThreadSafe());
     188          70 :   return Filesystem::WatcherPtr{new Filesystem::WatcherImpl(*this, file_system_)};
     189          70 : }
     190             : 
     191       11196 : TimerPtr DispatcherImpl::createTimer(TimerCb cb) {
     192       11196 :   ASSERT(isThreadSafe());
     193       11196 :   return createTimerInternal(cb);
     194       11196 : }
     195             : 
     196        1298 : TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerType timer_type, TimerCb cb) {
     197        1298 :   ASSERT(isThreadSafe());
     198        1298 :   return scaled_timer_manager_->createTimer(timer_type, std::move(cb));
     199        1298 : }
     200             : 
     201           0 : TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerMinimum minimum, TimerCb cb) {
     202           0 :   ASSERT(isThreadSafe());
     203           0 :   return scaled_timer_manager_->createTimer(minimum, std::move(cb));
     204           0 : }
     205             : 
     206        3832 : Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function<void()> cb) {
     207        3832 :   ASSERT(isThreadSafe());
     208        4625 :   return base_scheduler_.createSchedulableCallback([this, cb]() {
     209        2918 :     touchWatchdog();
     210        2918 :     cb();
     211        2918 :   });
     212        3832 : }
     213             : 
     214       11196 : TimerPtr DispatcherImpl::createTimerInternal(TimerCb cb) {
     215       11196 :   return scheduler_->createTimer(
     216       11196 :       [this, cb]() {
     217        1143 :         touchWatchdog();
     218        1143 :         cb();
     219        1143 :       },
     220       11196 :       *this);
     221       11196 : }
     222             : 
     223        4982 : void DispatcherImpl::deferredDelete(DeferredDeletablePtr&& to_delete) {
     224        4982 :   ASSERT(isThreadSafe());
     225        4982 :   if (to_delete != nullptr) {
     226        4935 :     to_delete->deleteIsPending();
     227        4935 :     current_to_delete_->emplace_back(std::move(to_delete));
     228        4935 :     ENVOY_LOG(trace, "item added to deferred deletion list (size={})", current_to_delete_->size());
     229        4935 :     if (current_to_delete_->size() == 1) {
     230        1852 :       deferred_delete_cb_->scheduleCallbackCurrentIteration();
     231        1852 :     }
     232        4935 :   }
     233        4982 : }
     234             : 
     235        1633 : void DispatcherImpl::exit() { base_scheduler_.loopExit(); }
     236             : 
     237         392 : SignalEventPtr DispatcherImpl::listenForSignal(signal_t signal_num, SignalCb cb) {
     238         392 :   ASSERT(isThreadSafe());
     239         392 :   return SignalEventPtr{new SignalEventImpl(*this, signal_num, cb)};
     240         392 : }
     241             : 
     242        3739 : void DispatcherImpl::post(PostCb callback) {
     243        3739 :   bool do_post;
     244        3739 :   {
     245        3739 :     Thread::LockGuard lock(post_lock_);
     246        3739 :     do_post = post_callbacks_.empty();
     247        3739 :     post_callbacks_.push_back(std::move(callback));
     248        3739 :   }
     249             : 
     250        3739 :   if (do_post) {
     251        2046 :     post_cb_->scheduleCallbackCurrentIteration();
     252        2046 :   }
     253        3739 : }
     254             : 
     255         159 : void DispatcherImpl::deleteInDispatcherThread(DispatcherThreadDeletableConstPtr deletable) {
     256         159 :   bool need_schedule;
     257         159 :   {
     258         159 :     Thread::LockGuard lock(thread_local_deletable_lock_);
     259         159 :     need_schedule = deletables_in_dispatcher_thread_.empty();
     260         159 :     deletables_in_dispatcher_thread_.emplace_back(std::move(deletable));
     261             :     // TODO(lambdai): Enable below after https://github.com/envoyproxy/envoy/issues/15072
     262             :     // ASSERT(!shutdown_called_, "inserted after shutdown");
     263         159 :   }
     264             : 
     265         159 :   if (need_schedule) {
     266          98 :     thread_local_delete_cb_->scheduleCallbackCurrentIteration();
     267          98 :   }
     268         159 : }
     269             : 
     270       18604 : void DispatcherImpl::run(RunType type) {
     271       18604 :   run_tid_ = thread_factory_.currentThreadId();
     272             :   // Flush all post callbacks before we run the event loop. We do this because there are post
     273             :   // callbacks that have to get run before the initial event loop starts running. libevent does
     274             :   // not guarantee that events are run in any particular order. So even if we post() and call
     275             :   // event_base_once() before some other event, the other event might get called first.
     276       18604 :   runPostCallbacks();
     277       18604 :   base_scheduler_.run(type);
     278       18604 : }
     279             : 
     280       17348 : MonotonicTime DispatcherImpl::approximateMonotonicTime() const {
     281       17348 :   return approximate_monotonic_time_;
     282       17348 : }
     283             : 
     284         549 : void DispatcherImpl::shutdown() {
     285             :   // TODO(lambdai): Resolve https://github.com/envoyproxy/envoy/issues/15072 and loop delete below
     286             :   // below 3 lists until all lists are empty. The 3 lists are list of deferred delete objects, post
     287             :   // callbacks and dispatcher thread deletable objects.
     288         549 :   ASSERT(isThreadSafe());
     289         549 :   auto deferred_deletables_size = current_to_delete_->size();
     290         549 :   std::list<std::function<void()>>::size_type post_callbacks_size;
     291         549 :   {
     292         549 :     Thread::LockGuard lock(post_lock_);
     293         549 :     post_callbacks_size = post_callbacks_.size();
     294         549 :   }
     295             : 
     296         549 :   std::list<DispatcherThreadDeletableConstPtr> local_deletables;
     297         549 :   {
     298         549 :     Thread::LockGuard lock(thread_local_deletable_lock_);
     299         549 :     local_deletables = std::move(deletables_in_dispatcher_thread_);
     300         549 :   }
     301         549 :   auto thread_local_deletables_size = local_deletables.size();
     302         708 :   while (!local_deletables.empty()) {
     303         159 :     local_deletables.pop_front();
     304         159 :   }
     305         549 :   ASSERT(!shutdown_called_);
     306         549 :   shutdown_called_ = true;
     307         549 :   ENVOY_LOG(
     308         549 :       trace,
     309         549 :       "{} destroyed {} thread local objects. Peek {} deferred deletables, {} post callbacks. ",
     310         549 :       __FUNCTION__, deferred_deletables_size, post_callbacks_size, thread_local_deletables_size);
     311         549 : }
     312             : 
     313       31884 : void DispatcherImpl::updateApproximateMonotonicTime() { updateApproximateMonotonicTimeInternal(); }
     314             : 
     315       33380 : void DispatcherImpl::updateApproximateMonotonicTimeInternal() {
     316       33380 :   approximate_monotonic_time_ = time_source_.monotonicTime();
     317       33380 : }
     318             : 
     319           0 : void DispatcherImpl::runThreadLocalDelete() {
     320           0 :   std::list<DispatcherThreadDeletableConstPtr> to_be_delete;
     321           0 :   {
     322           0 :     Thread::LockGuard lock(thread_local_deletable_lock_);
     323           0 :     to_be_delete = std::move(deletables_in_dispatcher_thread_);
     324           0 :     ASSERT(deletables_in_dispatcher_thread_.empty());
     325           0 :   }
     326           0 :   while (!to_be_delete.empty()) {
     327             :     // Touch the watchdog before deleting the objects to avoid spurious watchdog miss events when
     328             :     // executing complicated destruction.
     329           0 :     touchWatchdog();
     330             :     // Delete in FIFO order.
     331           0 :     to_be_delete.pop_front();
     332           0 :   }
     333           0 : }
     334       20513 : void DispatcherImpl::runPostCallbacks() {
     335             :   // Clear the deferred delete list before running post callbacks to reduce non-determinism in
     336             :   // callback processing, and more easily detect if a scheduled post callback refers to one of the
     337             :   // objects that is being deferred deleted.
     338       20513 :   clearDeferredDeleteList();
     339             : 
     340       20513 :   std::list<PostCb> callbacks;
     341       20513 :   {
     342             :     // Take ownership of the callbacks under the post_lock_. The lock must be released before
     343             :     // callbacks execute. Callbacks added after this transfer will re-arm post_cb_ and will execute
     344             :     // later in the event loop.
     345       20513 :     Thread::LockGuard lock(post_lock_);
     346       20513 :     callbacks = std::move(post_callbacks_);
     347             :     // post_callbacks_ should be empty after the move.
     348       20513 :     ASSERT(post_callbacks_.empty());
     349       20513 :   }
     350             :   // It is important that the execution and deletion of the callback happen while post_lock_ is not
     351             :   // held. Either the invocation or destructor of the callback can call post() on this dispatcher.
     352       23998 :   while (!callbacks.empty()) {
     353             :     // Touch the watchdog before executing the callback to avoid spurious watchdog miss events when
     354             :     // executing a long list of callbacks.
     355        3485 :     touchWatchdog();
     356             :     // Run the callback.
     357        3485 :     callbacks.front()();
     358             :     // Pop the front so that the destructor of the callback that just executed runs before the next
     359             :     // callback executes.
     360        3485 :     callbacks.pop_front();
     361        3485 :   }
     362       20513 : }
     363             : 
     364           0 : void DispatcherImpl::onFatalError(std::ostream& os) const {
     365             :   // Dump the state of the tracked objects in the dispatcher if thread safe. This generally
     366             :   // results in dumping the active state only for the thread which caused the fatal error.
     367           0 :   if (isThreadSafe()) {
     368           0 :     for (auto iter = tracked_object_stack_.rbegin(); iter != tracked_object_stack_.rend(); ++iter) {
     369           0 :       (*iter)->dumpState(os);
     370           0 :     }
     371           0 :   }
     372           0 : }
     373             : 
     374             : void DispatcherImpl::runFatalActionsOnTrackedObject(
     375           0 :     const FatalAction::FatalActionPtrList& actions) const {
     376             :   // Only run if this is the dispatcher of the current thread and
     377             :   // DispatcherImpl::Run has been called.
     378           0 :   if (run_tid_.isEmpty() || (run_tid_ != thread_factory_.currentThreadId())) {
     379           0 :     return;
     380           0 :   }
     381             : 
     382           0 :   for (const auto& action : actions) {
     383           0 :     action->run(tracked_object_stack_);
     384           0 :   }
     385           0 : }
     386             : 
     387       18008 : void DispatcherImpl::touchWatchdog() {
     388       18008 :   if (watchdog_registration_) {
     389        7018 :     watchdog_registration_->touchWatchdog();
     390        7018 :   }
     391       18008 : }
     392             : 
     393       11809 : void DispatcherImpl::pushTrackedObject(const ScopeTrackedObject* object) {
     394       11809 :   ASSERT(isThreadSafe());
     395       11809 :   ASSERT(object != nullptr);
     396       11809 :   tracked_object_stack_.push_back(object);
     397       11809 :   ASSERT(tracked_object_stack_.size() <= ExpectedMaxTrackedObjectStackDepth);
     398       11809 : }
     399             : 
     400       11811 : void DispatcherImpl::popTrackedObject(const ScopeTrackedObject* expected_object) {
     401       11811 :   ASSERT(isThreadSafe());
     402       11811 :   ASSERT(expected_object != nullptr);
     403       11811 :   RELEASE_ASSERT(!tracked_object_stack_.empty(), "Tracked Object Stack is empty, nothing to pop!");
     404             : 
     405       11811 :   const ScopeTrackedObject* top = tracked_object_stack_.back();
     406       11811 :   tracked_object_stack_.pop_back();
     407       11811 :   ASSERT(top == expected_object,
     408       11811 :          "Popped the top of the tracked object stack, but it wasn't the expected object!");
     409       11811 : }
     410             : 
     411             : } // namespace Event
     412             : } // namespace Envoy

Generated by: LCOV version 1.15