Line data Source code
1 : #include "source/server/worker_impl.h"
2 :
3 : #include <functional>
4 : #include <memory>
5 :
6 : #include "envoy/event/dispatcher.h"
7 : #include "envoy/event/timer.h"
8 : #include "envoy/network/exception.h"
9 : #include "envoy/server/configuration.h"
10 : #include "envoy/thread_local/thread_local.h"
11 :
12 : #include "source/common/config/utility.h"
13 : #include "source/server/listener_manager_factory.h"
14 :
15 : namespace Envoy {
16 : namespace Server {
17 : namespace {
18 :
19 : std::unique_ptr<ConnectionHandler> getHandler(Event::Dispatcher& dispatcher, uint32_t index,
20 131 : OverloadManager& overload_manager) {
21 :
22 131 : auto* factory = Config::Utility::getFactoryByName<ConnectionHandlerFactory>(
23 131 : "envoy.connection_handler.default");
24 131 : if (factory) {
25 131 : return factory->createConnectionHandler(dispatcher, index, overload_manager);
26 131 : }
27 0 : ENVOY_LOG_MISC(debug, "Unable to find envoy.connection_handler.default factory");
28 0 : return nullptr;
29 131 : }
30 :
31 : } // namespace
32 :
33 : WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
34 131 : const std::string& worker_name) {
35 131 : Event::DispatcherPtr dispatcher(
36 131 : api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
37 131 : auto conn_handler = getHandler(*dispatcher, index, overload_manager);
38 131 : return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
39 131 : overload_manager, api_, stat_names_);
40 131 : }
41 :
42 : WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
43 : Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler,
44 : OverloadManager& overload_manager, Api::Api& api,
45 : WorkerStatNames& stat_names)
46 : : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)),
47 : api_(api), reset_streams_counter_(
48 131 : api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) {
49 131 : tls_.registerThread(*dispatcher_, false);
50 131 : overload_manager.registerForAction(
51 131 : OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
52 131 : [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); });
53 131 : overload_manager.registerForAction(
54 131 : OverloadActionNames::get().RejectIncomingConnections, *dispatcher_,
55 131 : [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); });
56 131 : overload_manager.registerForAction(
57 131 : OverloadActionNames::get().ResetStreams, *dispatcher_,
58 131 : [this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); });
59 131 : }
60 :
61 : void WorkerImpl::addListener(absl::optional<uint64_t> overridden_listener,
62 : Network::ListenerConfig& listener, AddListenerCompletion completion,
63 114 : Runtime::Loader& runtime, Random::RandomGenerator& random) {
64 114 : dispatcher_->post(
65 114 : [this, overridden_listener, &listener, &runtime, &random, completion]() -> void {
66 114 : handler_->addListener(overridden_listener, listener, runtime, random);
67 114 : hooks_.onWorkerListenerAdded();
68 114 : completion();
69 114 : });
70 114 : }
71 :
72 205 : uint64_t WorkerImpl::numConnections() const {
73 205 : uint64_t ret = 0;
74 205 : if (handler_) {
75 111 : ret = handler_->numConnections();
76 111 : }
77 205 : return ret;
78 205 : }
79 :
80 : void WorkerImpl::removeListener(Network::ListenerConfig& listener,
81 0 : std::function<void()> completion) {
82 0 : ASSERT(thread_);
83 0 : const uint64_t listener_tag = listener.listenerTag();
84 0 : dispatcher_->post([this, listener_tag, completion]() -> void {
85 0 : handler_->removeListeners(listener_tag);
86 0 : completion();
87 0 : hooks_.onWorkerListenerRemoved();
88 0 : });
89 0 : }
90 :
91 : void WorkerImpl::removeFilterChains(uint64_t listener_tag,
92 : const std::list<const Network::FilterChain*>& filter_chains,
93 0 : std::function<void()> completion) {
94 0 : ASSERT(thread_);
95 0 : dispatcher_->post(
96 0 : [this, listener_tag, &filter_chains, completion = std::move(completion)]() -> void {
97 0 : handler_->removeFilterChains(listener_tag, filter_chains, completion);
98 0 : });
99 0 : }
100 :
101 94 : void WorkerImpl::start(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) {
102 94 : ASSERT(!thread_);
103 :
104 : // In posix, thread names are limited to 15 characters, so contrive to make
105 : // sure all interesting data fits there. The naming occurs in
106 : // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
107 : // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
108 : // for the thread index, leaving us 4 bytes left to distinguish between the
109 : // two threads used per dispatcher. We'll call this one "dsp:" and the
110 : // one allocated in guarddog_impl.cc "dog:".
111 : //
112 : // TODO(jmarantz): consider refactoring how this naming works so this naming
113 : // architecture is centralized, resulting in clearer names.
114 94 : Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
115 94 : thread_ = api_.threadFactory().createThread(
116 94 : [this, guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
117 94 : }
118 :
119 0 : void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); }
120 :
121 94 : void WorkerImpl::stop() {
122 : // It's possible for the server to cleanly shut down while cluster initialization during startup
123 : // is happening, so we might not yet have a thread.
124 94 : if (thread_) {
125 94 : dispatcher_->exit();
126 94 : thread_->join();
127 94 : }
128 94 : }
129 :
130 : void WorkerImpl::stopListener(Network::ListenerConfig& listener,
131 : const Network::ExtraShutdownListenerOptions& options,
132 10 : std::function<void()> completion) {
133 10 : const uint64_t listener_tag = listener.listenerTag();
134 10 : dispatcher_->post([this, listener_tag, options, completion]() -> void {
135 10 : handler_->stopListeners(listener_tag, options);
136 10 : if (completion != nullptr) {
137 10 : completion();
138 10 : }
139 10 : });
140 10 : }
141 :
142 94 : void WorkerImpl::threadRoutine(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) {
143 94 : ENVOY_LOG(debug, "worker entering dispatch loop");
144 : // The watch dog must be created after the dispatcher starts running and has post events flushed,
145 : // as this is when TLS stat scopes start working.
146 94 : dispatcher_->post([this, &guard_dog, cb]() {
147 94 : cb();
148 94 : if (guard_dog.has_value()) {
149 94 : watch_dog_ = guard_dog->createWatchDog(api_.threadFactory().currentThreadId(),
150 94 : dispatcher_->name(), *dispatcher_);
151 94 : }
152 94 : });
153 94 : dispatcher_->run(Event::Dispatcher::RunType::Block);
154 94 : ENVOY_LOG(debug, "worker exited dispatch loop");
155 94 : if (guard_dog.has_value()) {
156 94 : guard_dog->stopWatching(watch_dog_);
157 94 : }
158 94 : dispatcher_->shutdown();
159 :
160 : // We must close all active connections before we actually exit the thread. This prevents any
161 : // destructors from running on the main thread which might reference thread locals. Destroying
162 : // the handler does this which additionally purges the dispatcher delayed deletion list.
163 94 : handler_.reset();
164 94 : tls_.shutdownThread();
165 94 : watch_dog_.reset();
166 94 : }
167 :
168 0 : void WorkerImpl::stopAcceptingConnectionsCb(OverloadActionState state) {
169 0 : if (state.isSaturated()) {
170 0 : handler_->disableListeners();
171 0 : } else {
172 0 : handler_->enableListeners();
173 0 : }
174 0 : }
175 :
176 0 : void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) {
177 0 : handler_->setListenerRejectFraction(state.value());
178 0 : }
179 :
180 0 : void WorkerImpl::resetStreamsUsingExcessiveMemory(OverloadActionState state) {
181 0 : uint64_t streams_reset_count =
182 0 : dispatcher_->getWatermarkFactory().resetAccountsGivenPressure(state.value().value());
183 0 : reset_streams_counter_.add(streams_reset_count);
184 0 : }
185 :
186 : } // namespace Server
187 : } // namespace Envoy
|