LCOV - code coverage report
Current view: top level - source/server - worker_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 81 115 70.4 %
Date: 2024-01-05 06:35:25 Functions: 13 24 54.2 %

          Line data    Source code
       1             : #include "source/server/worker_impl.h"
       2             : 
       3             : #include <functional>
       4             : #include <memory>
       5             : 
       6             : #include "envoy/event/dispatcher.h"
       7             : #include "envoy/event/timer.h"
       8             : #include "envoy/network/exception.h"
       9             : #include "envoy/server/configuration.h"
      10             : #include "envoy/thread_local/thread_local.h"
      11             : 
      12             : #include "source/common/config/utility.h"
      13             : #include "source/server/listener_manager_factory.h"
      14             : 
      15             : namespace Envoy {
      16             : namespace Server {
      17             : namespace {
      18             : 
      19             : std::unique_ptr<ConnectionHandler> getHandler(Event::Dispatcher& dispatcher, uint32_t index,
      20         131 :                                               OverloadManager& overload_manager) {
      21             : 
      22         131 :   auto* factory = Config::Utility::getFactoryByName<ConnectionHandlerFactory>(
      23         131 :       "envoy.connection_handler.default");
      24         131 :   if (factory) {
      25         131 :     return factory->createConnectionHandler(dispatcher, index, overload_manager);
      26         131 :   }
      27           0 :   ENVOY_LOG_MISC(debug, "Unable to find envoy.connection_handler.default factory");
      28           0 :   return nullptr;
      29         131 : }
      30             : 
      31             : } // namespace
      32             : 
      33             : WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
      34         131 :                                           const std::string& worker_name) {
      35         131 :   Event::DispatcherPtr dispatcher(
      36         131 :       api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
      37         131 :   auto conn_handler = getHandler(*dispatcher, index, overload_manager);
      38         131 :   return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
      39         131 :                                       overload_manager, api_, stat_names_);
      40         131 : }
      41             : 
      42             : WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
      43             :                        Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler,
      44             :                        OverloadManager& overload_manager, Api::Api& api,
      45             :                        WorkerStatNames& stat_names)
      46             :     : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)),
      47             :       api_(api), reset_streams_counter_(
      48         131 :                      api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) {
      49         131 :   tls_.registerThread(*dispatcher_, false);
      50         131 :   overload_manager.registerForAction(
      51         131 :       OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
      52         131 :       [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); });
      53         131 :   overload_manager.registerForAction(
      54         131 :       OverloadActionNames::get().RejectIncomingConnections, *dispatcher_,
      55         131 :       [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); });
      56         131 :   overload_manager.registerForAction(
      57         131 :       OverloadActionNames::get().ResetStreams, *dispatcher_,
      58         131 :       [this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); });
      59         131 : }
      60             : 
      61             : void WorkerImpl::addListener(absl::optional<uint64_t> overridden_listener,
      62             :                              Network::ListenerConfig& listener, AddListenerCompletion completion,
      63         114 :                              Runtime::Loader& runtime, Random::RandomGenerator& random) {
      64         114 :   dispatcher_->post(
      65         114 :       [this, overridden_listener, &listener, &runtime, &random, completion]() -> void {
      66         114 :         handler_->addListener(overridden_listener, listener, runtime, random);
      67         114 :         hooks_.onWorkerListenerAdded();
      68         114 :         completion();
      69         114 :       });
      70         114 : }
      71             : 
      72         205 : uint64_t WorkerImpl::numConnections() const {
      73         205 :   uint64_t ret = 0;
      74         205 :   if (handler_) {
      75         111 :     ret = handler_->numConnections();
      76         111 :   }
      77         205 :   return ret;
      78         205 : }
      79             : 
      80             : void WorkerImpl::removeListener(Network::ListenerConfig& listener,
      81           0 :                                 std::function<void()> completion) {
      82           0 :   ASSERT(thread_);
      83           0 :   const uint64_t listener_tag = listener.listenerTag();
      84           0 :   dispatcher_->post([this, listener_tag, completion]() -> void {
      85           0 :     handler_->removeListeners(listener_tag);
      86           0 :     completion();
      87           0 :     hooks_.onWorkerListenerRemoved();
      88           0 :   });
      89           0 : }
      90             : 
      91             : void WorkerImpl::removeFilterChains(uint64_t listener_tag,
      92             :                                     const std::list<const Network::FilterChain*>& filter_chains,
      93           0 :                                     std::function<void()> completion) {
      94           0 :   ASSERT(thread_);
      95           0 :   dispatcher_->post(
      96           0 :       [this, listener_tag, &filter_chains, completion = std::move(completion)]() -> void {
      97           0 :         handler_->removeFilterChains(listener_tag, filter_chains, completion);
      98           0 :       });
      99           0 : }
     100             : 
     101          94 : void WorkerImpl::start(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) {
     102          94 :   ASSERT(!thread_);
     103             : 
     104             :   // In posix, thread names are limited to 15 characters, so contrive to make
     105             :   // sure all interesting data fits there. The naming occurs in
     106             :   // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
     107             :   // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
     108             :   // for the thread index, leaving us 4 bytes left to distinguish between the
     109             :   // two threads used per dispatcher. We'll call this one "dsp:" and the
     110             :   // one allocated in guarddog_impl.cc "dog:".
     111             :   //
     112             :   // TODO(jmarantz): consider refactoring how this naming works so this naming
     113             :   // architecture is centralized, resulting in clearer names.
     114          94 :   Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
     115          94 :   thread_ = api_.threadFactory().createThread(
     116          94 :       [this, guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
     117          94 : }
     118             : 
     119           0 : void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); }
     120             : 
     121          94 : void WorkerImpl::stop() {
     122             :   // It's possible for the server to cleanly shut down while cluster initialization during startup
     123             :   // is happening, so we might not yet have a thread.
     124          94 :   if (thread_) {
     125          94 :     dispatcher_->exit();
     126          94 :     thread_->join();
     127          94 :   }
     128          94 : }
     129             : 
     130             : void WorkerImpl::stopListener(Network::ListenerConfig& listener,
     131             :                               const Network::ExtraShutdownListenerOptions& options,
     132          10 :                               std::function<void()> completion) {
     133          10 :   const uint64_t listener_tag = listener.listenerTag();
     134          10 :   dispatcher_->post([this, listener_tag, options, completion]() -> void {
     135          10 :     handler_->stopListeners(listener_tag, options);
     136          10 :     if (completion != nullptr) {
     137          10 :       completion();
     138          10 :     }
     139          10 :   });
     140          10 : }
     141             : 
     142          94 : void WorkerImpl::threadRoutine(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) {
     143          94 :   ENVOY_LOG(debug, "worker entering dispatch loop");
     144             :   // The watch dog must be created after the dispatcher starts running and has post events flushed,
     145             :   // as this is when TLS stat scopes start working.
     146          94 :   dispatcher_->post([this, &guard_dog, cb]() {
     147          94 :     cb();
     148          94 :     if (guard_dog.has_value()) {
     149          94 :       watch_dog_ = guard_dog->createWatchDog(api_.threadFactory().currentThreadId(),
     150          94 :                                              dispatcher_->name(), *dispatcher_);
     151          94 :     }
     152          94 :   });
     153          94 :   dispatcher_->run(Event::Dispatcher::RunType::Block);
     154          94 :   ENVOY_LOG(debug, "worker exited dispatch loop");
     155          94 :   if (guard_dog.has_value()) {
     156          94 :     guard_dog->stopWatching(watch_dog_);
     157          94 :   }
     158          94 :   dispatcher_->shutdown();
     159             : 
     160             :   // We must close all active connections before we actually exit the thread. This prevents any
     161             :   // destructors from running on the main thread which might reference thread locals. Destroying
     162             :   // the handler does this which additionally purges the dispatcher delayed deletion list.
     163          94 :   handler_.reset();
     164          94 :   tls_.shutdownThread();
     165          94 :   watch_dog_.reset();
     166          94 : }
     167             : 
     168           0 : void WorkerImpl::stopAcceptingConnectionsCb(OverloadActionState state) {
     169           0 :   if (state.isSaturated()) {
     170           0 :     handler_->disableListeners();
     171           0 :   } else {
     172           0 :     handler_->enableListeners();
     173           0 :   }
     174           0 : }
     175             : 
     176           0 : void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) {
     177           0 :   handler_->setListenerRejectFraction(state.value());
     178           0 : }
     179             : 
     180           0 : void WorkerImpl::resetStreamsUsingExcessiveMemory(OverloadActionState state) {
     181           0 :   uint64_t streams_reset_count =
     182           0 :       dispatcher_->getWatermarkFactory().resetAccountsGivenPressure(state.value().value());
     183           0 :   reset_streams_counter_.add(streams_reset_count);
     184           0 : }
     185             : 
     186             : } // namespace Server
     187             : } // namespace Envoy

Generated by: LCOV version 1.15