Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/server/worker_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/server/worker_impl.h"
2
3
#include <functional>
4
#include <memory>
5
6
#include "envoy/event/dispatcher.h"
7
#include "envoy/event/timer.h"
8
#include "envoy/network/exception.h"
9
#include "envoy/server/configuration.h"
10
#include "envoy/thread_local/thread_local.h"
11
12
#include "source/common/config/utility.h"
13
#include "source/server/listener_manager_factory.h"
14
15
namespace Envoy {
16
namespace Server {
17
namespace {
18
19
std::unique_ptr<ConnectionHandler> getHandler(Event::Dispatcher& dispatcher, uint32_t index,
20
                                              OverloadManager& overload_manager,
21
4.45k
                                              OverloadManager& null_overload_manager) {
22
23
4.45k
  auto* factory = Config::Utility::getFactoryByName<ConnectionHandlerFactory>(
24
4.45k
      "envoy.connection_handler.default");
25
4.45k
  if (factory) {
26
4.45k
    return factory->createConnectionHandler(dispatcher, index, overload_manager,
27
4.45k
                                            null_overload_manager);
28
4.45k
  }
29
0
  ENVOY_LOG_MISC(debug, "Unable to find envoy.connection_handler.default factory");
30
0
  return nullptr;
31
4.45k
}
32
33
} // namespace
34
35
WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
36
                                          OverloadManager& null_overload_manager,
37
4.45k
                                          const std::string& worker_name) {
38
4.45k
  Event::DispatcherPtr dispatcher(
39
4.45k
      api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
40
4.45k
  auto conn_handler = getHandler(*dispatcher, index, overload_manager, null_overload_manager);
41
4.45k
  return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
42
4.45k
                                      overload_manager, api_, stat_names_);
43
4.45k
}
44
45
WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
46
                       Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler,
47
                       OverloadManager& overload_manager, Api::Api& api,
48
                       WorkerStatNames& stat_names)
49
    : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)),
50
      api_(api), reset_streams_counter_(
51
4.45k
                     api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) {
52
4.45k
  tls_.registerThread(*dispatcher_, false);
53
4.45k
  overload_manager.registerForAction(
54
4.45k
      OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
55
4.45k
      [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); });
56
4.45k
  overload_manager.registerForAction(
57
4.45k
      OverloadActionNames::get().RejectIncomingConnections, *dispatcher_,
58
4.45k
      [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); });
59
4.45k
  overload_manager.registerForAction(
60
4.45k
      OverloadActionNames::get().ResetStreams, *dispatcher_,
61
4.45k
      [this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); });
62
4.45k
}
63
64
void WorkerImpl::addListener(absl::optional<uint64_t> overridden_listener,
65
                             Network::ListenerConfig& listener, AddListenerCompletion completion,
66
1.99k
                             Runtime::Loader& runtime, Random::RandomGenerator& random) {
67
1.99k
  dispatcher_->post(
68
1.99k
      [this, overridden_listener, &listener, &runtime, &random, completion]() -> void {
69
1.99k
        handler_->addListener(overridden_listener, listener, runtime, random);
70
1.99k
        hooks_.onWorkerListenerAdded();
71
1.99k
        completion();
72
1.99k
      });
73
1.99k
}
74
75
5.00k
uint64_t WorkerImpl::numConnections() const {
76
5.00k
  uint64_t ret = 0;
77
5.00k
  if (handler_) {
78
3.03k
    ret = handler_->numConnections();
79
3.03k
  }
80
5.00k
  return ret;
81
5.00k
}
82
83
void WorkerImpl::removeListener(Network::ListenerConfig& listener,
84
0
                                std::function<void()> completion) {
85
0
  ASSERT(thread_);
86
0
  const uint64_t listener_tag = listener.listenerTag();
87
0
  dispatcher_->post([this, listener_tag, completion]() -> void {
88
0
    handler_->removeListeners(listener_tag);
89
0
    completion();
90
0
    hooks_.onWorkerListenerRemoved();
91
0
  });
92
0
}
93
94
void WorkerImpl::removeFilterChains(uint64_t listener_tag,
95
                                    const std::list<const Network::FilterChain*>& filter_chains,
96
0
                                    std::function<void()> completion) {
97
0
  ASSERT(thread_);
98
0
  dispatcher_->post(
99
0
      [this, listener_tag, &filter_chains, completion = std::move(completion)]() -> void {
100
0
        handler_->removeFilterChains(listener_tag, filter_chains, completion);
101
0
      });
102
0
}
103
104
1.97k
void WorkerImpl::start(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) {
105
1.97k
  ASSERT(!thread_);
106
107
  // In posix, thread names are limited to 15 characters, so contrive to make
108
  // sure all interesting data fits there. The naming occurs in
109
  // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
110
  // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
111
  // for the thread index, leaving us 4 bytes left to distinguish between the
112
  // two threads used per dispatcher. We'll call this one "dsp:" and the
113
  // one allocated in guarddog_impl.cc "dog:".
114
  //
115
  // TODO(jmarantz): consider refactoring how this naming works so this naming
116
  // architecture is centralized, resulting in clearer names.
117
1.97k
  Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
118
1.97k
  thread_ = api_.threadFactory().createThread(
119
1.97k
      [this, guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
120
1.97k
}
121
122
0
void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); }
123
124
1.97k
void WorkerImpl::stop() {
125
  // It's possible for the server to cleanly shut down while cluster initialization during startup
126
  // is happening, so we might not yet have a thread.
127
1.97k
  if (thread_) {
128
1.97k
    dispatcher_->exit();
129
1.97k
    thread_->join();
130
1.97k
  }
131
1.97k
}
132
133
void WorkerImpl::stopListener(Network::ListenerConfig& listener,
134
                              const Network::ExtraShutdownListenerOptions& options,
135
4
                              std::function<void()> completion) {
136
4
  const uint64_t listener_tag = listener.listenerTag();
137
4
  dispatcher_->post([this, listener_tag, options, completion]() -> void {
138
4
    handler_->stopListeners(listener_tag, options);
139
4
    if (completion != nullptr) {
140
4
      completion();
141
4
    }
142
4
  });
143
4
}
144
145
1.97k
void WorkerImpl::threadRoutine(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) {
146
1.97k
  ENVOY_LOG(debug, "worker entering dispatch loop");
147
  // The watch dog must be created after the dispatcher starts running and has post events flushed,
148
  // as this is when TLS stat scopes start working.
149
1.97k
  dispatcher_->post([this, &guard_dog, cb]() {
150
1.97k
    cb();
151
1.97k
    if (guard_dog.has_value()) {
152
1.97k
      watch_dog_ = guard_dog->createWatchDog(api_.threadFactory().currentThreadId(),
153
1.97k
                                             dispatcher_->name(), *dispatcher_);
154
1.97k
    }
155
1.97k
  });
156
1.97k
  dispatcher_->run(Event::Dispatcher::RunType::Block);
157
1.97k
  ENVOY_LOG(debug, "worker exited dispatch loop");
158
1.97k
  if (guard_dog.has_value()) {
159
1.97k
    guard_dog->stopWatching(watch_dog_);
160
1.97k
  }
161
1.97k
  dispatcher_->shutdown();
162
163
  // We must close all active connections before we actually exit the thread. This prevents any
164
  // destructors from running on the main thread which might reference thread locals. Destroying
165
  // the handler does this which additionally purges the dispatcher delayed deletion list.
166
1.97k
  handler_.reset();
167
1.97k
  tls_.shutdownThread();
168
1.97k
  watch_dog_.reset();
169
1.97k
}
170
171
0
void WorkerImpl::stopAcceptingConnectionsCb(OverloadActionState state) {
172
0
  if (state.isSaturated()) {
173
0
    handler_->disableListeners();
174
0
  } else {
175
0
    handler_->enableListeners();
176
0
  }
177
0
}
178
179
0
void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) {
180
0
  handler_->setListenerRejectFraction(state.value());
181
0
}
182
183
0
void WorkerImpl::resetStreamsUsingExcessiveMemory(OverloadActionState state) {
184
0
  uint64_t streams_reset_count =
185
0
      dispatcher_->getWatermarkFactory().resetAccountsGivenPressure(state.value().value());
186
0
  reset_streams_counter_.add(streams_reset_count);
187
0
}
188
189
} // namespace Server
190
} // namespace Envoy