/proc/self/cwd/source/server/worker_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/server/worker_impl.h" |
2 | | |
3 | | #include <functional> |
4 | | #include <memory> |
5 | | |
6 | | #include "envoy/event/dispatcher.h" |
7 | | #include "envoy/event/timer.h" |
8 | | #include "envoy/network/exception.h" |
9 | | #include "envoy/server/configuration.h" |
10 | | #include "envoy/thread_local/thread_local.h" |
11 | | |
12 | | #include "source/common/config/utility.h" |
13 | | #include "source/server/listener_manager_factory.h" |
14 | | |
15 | | namespace Envoy { |
16 | | namespace Server { |
17 | | namespace { |
18 | | |
19 | | std::unique_ptr<ConnectionHandler> getHandler(Event::Dispatcher& dispatcher, uint32_t index, |
20 | | OverloadManager& overload_manager, |
21 | 4.45k | OverloadManager& null_overload_manager) { |
22 | | |
23 | 4.45k | auto* factory = Config::Utility::getFactoryByName<ConnectionHandlerFactory>( |
24 | 4.45k | "envoy.connection_handler.default"); |
25 | 4.45k | if (factory) { |
26 | 4.45k | return factory->createConnectionHandler(dispatcher, index, overload_manager, |
27 | 4.45k | null_overload_manager); |
28 | 4.45k | } |
29 | 0 | ENVOY_LOG_MISC(debug, "Unable to find envoy.connection_handler.default factory"); |
30 | 0 | return nullptr; |
31 | 4.45k | } |
32 | | |
33 | | } // namespace |
34 | | |
35 | | WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager, |
36 | | OverloadManager& null_overload_manager, |
37 | 4.45k | const std::string& worker_name) { |
38 | 4.45k | Event::DispatcherPtr dispatcher( |
39 | 4.45k | api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory())); |
40 | 4.45k | auto conn_handler = getHandler(*dispatcher, index, overload_manager, null_overload_manager); |
41 | 4.45k | return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler), |
42 | 4.45k | overload_manager, api_, stat_names_); |
43 | 4.45k | } |
44 | | |
45 | | WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks, |
46 | | Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler, |
47 | | OverloadManager& overload_manager, Api::Api& api, |
48 | | WorkerStatNames& stat_names) |
49 | | : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)), |
50 | | api_(api), reset_streams_counter_( |
51 | 4.45k | api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) { |
52 | 4.45k | tls_.registerThread(*dispatcher_, false); |
53 | 4.45k | overload_manager.registerForAction( |
54 | 4.45k | OverloadActionNames::get().StopAcceptingConnections, *dispatcher_, |
55 | 4.45k | [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); }); |
56 | 4.45k | overload_manager.registerForAction( |
57 | 4.45k | OverloadActionNames::get().RejectIncomingConnections, *dispatcher_, |
58 | 4.45k | [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); }); |
59 | 4.45k | overload_manager.registerForAction( |
60 | 4.45k | OverloadActionNames::get().ResetStreams, *dispatcher_, |
61 | 4.45k | [this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); }); |
62 | 4.45k | } |
63 | | |
64 | | void WorkerImpl::addListener(absl::optional<uint64_t> overridden_listener, |
65 | | Network::ListenerConfig& listener, AddListenerCompletion completion, |
66 | 1.99k | Runtime::Loader& runtime, Random::RandomGenerator& random) { |
67 | 1.99k | dispatcher_->post( |
68 | 1.99k | [this, overridden_listener, &listener, &runtime, &random, completion]() -> void { |
69 | 1.99k | handler_->addListener(overridden_listener, listener, runtime, random); |
70 | 1.99k | hooks_.onWorkerListenerAdded(); |
71 | 1.99k | completion(); |
72 | 1.99k | }); |
73 | 1.99k | } |
74 | | |
75 | 5.00k | uint64_t WorkerImpl::numConnections() const { |
76 | 5.00k | uint64_t ret = 0; |
77 | 5.00k | if (handler_) { |
78 | 3.03k | ret = handler_->numConnections(); |
79 | 3.03k | } |
80 | 5.00k | return ret; |
81 | 5.00k | } |
82 | | |
83 | | void WorkerImpl::removeListener(Network::ListenerConfig& listener, |
84 | 0 | std::function<void()> completion) { |
85 | 0 | ASSERT(thread_); |
86 | 0 | const uint64_t listener_tag = listener.listenerTag(); |
87 | 0 | dispatcher_->post([this, listener_tag, completion]() -> void { |
88 | 0 | handler_->removeListeners(listener_tag); |
89 | 0 | completion(); |
90 | 0 | hooks_.onWorkerListenerRemoved(); |
91 | 0 | }); |
92 | 0 | } |
93 | | |
94 | | void WorkerImpl::removeFilterChains(uint64_t listener_tag, |
95 | | const std::list<const Network::FilterChain*>& filter_chains, |
96 | 0 | std::function<void()> completion) { |
97 | 0 | ASSERT(thread_); |
98 | 0 | dispatcher_->post( |
99 | 0 | [this, listener_tag, &filter_chains, completion = std::move(completion)]() -> void { |
100 | 0 | handler_->removeFilterChains(listener_tag, filter_chains, completion); |
101 | 0 | }); |
102 | 0 | } |
103 | | |
104 | 1.97k | void WorkerImpl::start(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) { |
105 | 1.97k | ASSERT(!thread_); |
106 | | |
107 | | // In posix, thread names are limited to 15 characters, so contrive to make |
108 | | // sure all interesting data fits there. The naming occurs in |
109 | | // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we |
110 | | // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes |
111 | | // for the thread index, leaving us 4 bytes left to distinguish between the |
112 | | // two threads used per dispatcher. We'll call this one "dsp:" and the |
113 | | // one allocated in guarddog_impl.cc "dog:". |
114 | | // |
115 | | // TODO(jmarantz): consider refactoring how this naming works so this naming |
116 | | // architecture is centralized, resulting in clearer names. |
117 | 1.97k | Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())}; |
118 | 1.97k | thread_ = api_.threadFactory().createThread( |
119 | 1.97k | [this, guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options); |
120 | 1.97k | } |
121 | | |
122 | 0 | void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); } |
123 | | |
124 | 1.97k | void WorkerImpl::stop() { |
125 | | // It's possible for the server to cleanly shut down while cluster initialization during startup |
126 | | // is happening, so we might not yet have a thread. |
127 | 1.97k | if (thread_) { |
128 | 1.97k | dispatcher_->exit(); |
129 | 1.97k | thread_->join(); |
130 | 1.97k | } |
131 | 1.97k | } |
132 | | |
133 | | void WorkerImpl::stopListener(Network::ListenerConfig& listener, |
134 | | const Network::ExtraShutdownListenerOptions& options, |
135 | 4 | std::function<void()> completion) { |
136 | 4 | const uint64_t listener_tag = listener.listenerTag(); |
137 | 4 | dispatcher_->post([this, listener_tag, options, completion]() -> void { |
138 | 4 | handler_->stopListeners(listener_tag, options); |
139 | 4 | if (completion != nullptr) { |
140 | 4 | completion(); |
141 | 4 | } |
142 | 4 | }); |
143 | 4 | } |
144 | | |
145 | 1.97k | void WorkerImpl::threadRoutine(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) { |
146 | 1.97k | ENVOY_LOG(debug, "worker entering dispatch loop"); |
147 | | // The watch dog must be created after the dispatcher starts running and has post events flushed, |
148 | | // as this is when TLS stat scopes start working. |
149 | 1.97k | dispatcher_->post([this, &guard_dog, cb]() { |
150 | 1.97k | cb(); |
151 | 1.97k | if (guard_dog.has_value()) { |
152 | 1.97k | watch_dog_ = guard_dog->createWatchDog(api_.threadFactory().currentThreadId(), |
153 | 1.97k | dispatcher_->name(), *dispatcher_); |
154 | 1.97k | } |
155 | 1.97k | }); |
156 | 1.97k | dispatcher_->run(Event::Dispatcher::RunType::Block); |
157 | 1.97k | ENVOY_LOG(debug, "worker exited dispatch loop"); |
158 | 1.97k | if (guard_dog.has_value()) { |
159 | 1.97k | guard_dog->stopWatching(watch_dog_); |
160 | 1.97k | } |
161 | 1.97k | dispatcher_->shutdown(); |
162 | | |
163 | | // We must close all active connections before we actually exit the thread. This prevents any |
164 | | // destructors from running on the main thread which might reference thread locals. Destroying |
165 | | // the handler does this which additionally purges the dispatcher delayed deletion list. |
166 | 1.97k | handler_.reset(); |
167 | 1.97k | tls_.shutdownThread(); |
168 | 1.97k | watch_dog_.reset(); |
169 | 1.97k | } |
170 | | |
171 | 0 | void WorkerImpl::stopAcceptingConnectionsCb(OverloadActionState state) { |
172 | 0 | if (state.isSaturated()) { |
173 | 0 | handler_->disableListeners(); |
174 | 0 | } else { |
175 | 0 | handler_->enableListeners(); |
176 | 0 | } |
177 | 0 | } |
178 | | |
179 | 0 | void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) { |
180 | 0 | handler_->setListenerRejectFraction(state.value()); |
181 | 0 | } |
182 | | |
183 | 0 | void WorkerImpl::resetStreamsUsingExcessiveMemory(OverloadActionState state) { |
184 | 0 | uint64_t streams_reset_count = |
185 | 0 | dispatcher_->getWatermarkFactory().resetAccountsGivenPressure(state.value().value()); |
186 | 0 | reset_streams_counter_.add(streams_reset_count); |
187 | 0 | } |
188 | | |
189 | | } // namespace Server |
190 | | } // namespace Envoy |