/proc/self/cwd/source/server/worker_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/server/worker_impl.h" |
2 | | |
3 | | #include <functional> |
4 | | #include <memory> |
5 | | |
6 | | #include "envoy/event/dispatcher.h" |
7 | | #include "envoy/event/timer.h" |
8 | | #include "envoy/network/exception.h" |
9 | | #include "envoy/server/configuration.h" |
10 | | #include "envoy/thread_local/thread_local.h" |
11 | | |
12 | | #include "source/common/config/utility.h" |
13 | | #include "source/server/listener_manager_factory.h" |
14 | | |
15 | | namespace Envoy { |
16 | | namespace Server { |
17 | | namespace { |
18 | | |
19 | | std::unique_ptr<ConnectionHandler> getHandler(Event::Dispatcher& dispatcher, uint32_t index, |
20 | 5.33k | OverloadManager& overload_manager) { |
21 | | |
22 | 5.33k | auto* factory = Config::Utility::getFactoryByName<ConnectionHandlerFactory>( |
23 | 5.33k | "envoy.connection_handler.default"); |
24 | 5.33k | if (factory) { |
25 | 5.33k | return factory->createConnectionHandler(dispatcher, index, overload_manager); |
26 | 5.33k | } |
27 | 0 | ENVOY_LOG_MISC(debug, "Unable to find envoy.connection_handler.default factory"); |
28 | 0 | return nullptr; |
29 | 5.33k | } |
30 | | |
31 | | } // namespace |
32 | | |
33 | | WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager, |
34 | 5.33k | const std::string& worker_name) { |
35 | 5.33k | Event::DispatcherPtr dispatcher( |
36 | 5.33k | api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory())); |
37 | 5.33k | auto conn_handler = getHandler(*dispatcher, index, overload_manager); |
38 | 5.33k | return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler), |
39 | 5.33k | overload_manager, api_, stat_names_); |
40 | 5.33k | } |
41 | | |
42 | | WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks, |
43 | | Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler, |
44 | | OverloadManager& overload_manager, Api::Api& api, |
45 | | WorkerStatNames& stat_names) |
46 | | : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)), |
47 | | api_(api), reset_streams_counter_( |
48 | 5.33k | api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) { |
49 | 5.33k | tls_.registerThread(*dispatcher_, false); |
50 | 5.33k | overload_manager.registerForAction( |
51 | 5.33k | OverloadActionNames::get().StopAcceptingConnections, *dispatcher_, |
52 | 5.33k | [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); }); |
53 | 5.33k | overload_manager.registerForAction( |
54 | 5.33k | OverloadActionNames::get().RejectIncomingConnections, *dispatcher_, |
55 | 5.33k | [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); }); |
56 | 5.33k | overload_manager.registerForAction( |
57 | 5.33k | OverloadActionNames::get().ResetStreams, *dispatcher_, |
58 | 5.33k | [this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); }); |
59 | 5.33k | } |
60 | | |
61 | | void WorkerImpl::addListener(absl::optional<uint64_t> overridden_listener, |
62 | | Network::ListenerConfig& listener, AddListenerCompletion completion, |
63 | 2.66k | Runtime::Loader& runtime) { |
64 | 2.66k | dispatcher_->post([this, overridden_listener, &listener, &runtime, completion]() -> void { |
65 | 2.66k | handler_->addListener(overridden_listener, listener, runtime); |
66 | 2.66k | hooks_.onWorkerListenerAdded(); |
67 | 2.66k | completion(); |
68 | 2.66k | }); |
69 | 2.66k | } |
70 | | |
71 | 6.43k | uint64_t WorkerImpl::numConnections() const { |
72 | 6.43k | uint64_t ret = 0; |
73 | 6.43k | if (handler_) { |
74 | 3.78k | ret = handler_->numConnections(); |
75 | 3.78k | } |
76 | 6.43k | return ret; |
77 | 6.43k | } |
78 | | |
79 | | void WorkerImpl::removeListener(Network::ListenerConfig& listener, |
80 | 0 | std::function<void()> completion) { |
81 | 0 | ASSERT(thread_); |
82 | 0 | const uint64_t listener_tag = listener.listenerTag(); |
83 | 0 | dispatcher_->post([this, listener_tag, completion]() -> void { |
84 | 0 | handler_->removeListeners(listener_tag); |
85 | 0 | completion(); |
86 | 0 | hooks_.onWorkerListenerRemoved(); |
87 | 0 | }); |
88 | 0 | } |
89 | | |
90 | | void WorkerImpl::removeFilterChains(uint64_t listener_tag, |
91 | | const std::list<const Network::FilterChain*>& filter_chains, |
92 | 0 | std::function<void()> completion) { |
93 | 0 | ASSERT(thread_); |
94 | 0 | dispatcher_->post( |
95 | 0 | [this, listener_tag, &filter_chains, completion = std::move(completion)]() -> void { |
96 | 0 | handler_->removeFilterChains(listener_tag, filter_chains, completion); |
97 | 0 | }); |
98 | 0 | } |
99 | | |
100 | 2.64k | void WorkerImpl::start(GuardDog& guard_dog, const std::function<void()>& cb) { |
101 | 2.64k | ASSERT(!thread_); |
102 | | |
103 | | // In posix, thread names are limited to 15 characters, so contrive to make |
104 | | // sure all interesting data fits there. The naming occurs in |
105 | | // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we |
106 | | // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes |
107 | | // for the thread index, leaving us 4 bytes left to distinguish between the |
108 | | // two threads used per dispatcher. We'll call this one "dsp:" and the |
109 | | // one allocated in guarddog_impl.cc "dog:". |
110 | | // |
111 | | // TODO(jmarantz): consider refactoring how this naming works so this naming |
112 | | // architecture is centralized, resulting in clearer names. |
113 | 2.64k | Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())}; |
114 | 2.64k | thread_ = api_.threadFactory().createThread( |
115 | 2.64k | [this, &guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options); |
116 | 2.64k | } |
117 | | |
118 | 0 | void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); } |
119 | | |
120 | 2.64k | void WorkerImpl::stop() { |
121 | | // It's possible for the server to cleanly shut down while cluster initialization during startup |
122 | | // is happening, so we might not yet have a thread. |
123 | 2.64k | if (thread_) { |
124 | 2.64k | dispatcher_->exit(); |
125 | 2.64k | thread_->join(); |
126 | 2.64k | } |
127 | 2.64k | } |
128 | | |
129 | | void WorkerImpl::stopListener(Network::ListenerConfig& listener, |
130 | | const Network::ExtraShutdownListenerOptions& options, |
131 | 4 | std::function<void()> completion) { |
132 | 4 | const uint64_t listener_tag = listener.listenerTag(); |
133 | 4 | dispatcher_->post([this, listener_tag, options, completion]() -> void { |
134 | 4 | handler_->stopListeners(listener_tag, options); |
135 | 4 | if (completion != nullptr) { |
136 | 4 | completion(); |
137 | 4 | } |
138 | 4 | }); |
139 | 4 | } |
140 | | |
141 | 2.64k | void WorkerImpl::threadRoutine(GuardDog& guard_dog, const std::function<void()>& cb) { |
142 | 2.64k | ENVOY_LOG(debug, "worker entering dispatch loop"); |
143 | | // The watch dog must be created after the dispatcher starts running and has post events flushed, |
144 | | // as this is when TLS stat scopes start working. |
145 | 2.64k | dispatcher_->post([this, &guard_dog, cb]() { |
146 | 2.64k | cb(); |
147 | 2.64k | watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(), |
148 | 2.64k | dispatcher_->name(), *dispatcher_); |
149 | 2.64k | }); |
150 | 2.64k | dispatcher_->run(Event::Dispatcher::RunType::Block); |
151 | 2.64k | ENVOY_LOG(debug, "worker exited dispatch loop"); |
152 | 2.64k | guard_dog.stopWatching(watch_dog_); |
153 | 2.64k | dispatcher_->shutdown(); |
154 | | |
155 | | // We must close all active connections before we actually exit the thread. This prevents any |
156 | | // destructors from running on the main thread which might reference thread locals. Destroying |
157 | | // the handler does this which additionally purges the dispatcher delayed deletion list. |
158 | 2.64k | handler_.reset(); |
159 | 2.64k | tls_.shutdownThread(); |
160 | 2.64k | watch_dog_.reset(); |
161 | 2.64k | } |
162 | | |
163 | 0 | void WorkerImpl::stopAcceptingConnectionsCb(OverloadActionState state) { |
164 | 0 | if (state.isSaturated()) { |
165 | 0 | handler_->disableListeners(); |
166 | 0 | } else { |
167 | 0 | handler_->enableListeners(); |
168 | 0 | } |
169 | 0 | } |
170 | | |
171 | 0 | void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) { |
172 | 0 | handler_->setListenerRejectFraction(state.value()); |
173 | 0 | } |
174 | | |
175 | 0 | void WorkerImpl::resetStreamsUsingExcessiveMemory(OverloadActionState state) { |
176 | 0 | uint64_t streams_reset_count = |
177 | 0 | dispatcher_->getWatermarkFactory().resetAccountsGivenPressure(state.value().value()); |
178 | 0 | reset_streams_counter_.add(streams_reset_count); |
179 | 0 | } |
180 | | |
181 | | } // namespace Server |
182 | | } // namespace Envoy |