Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/server/worker_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/server/worker_impl.h"
2
3
#include <functional>
4
#include <memory>
5
6
#include "envoy/event/dispatcher.h"
7
#include "envoy/event/timer.h"
8
#include "envoy/network/exception.h"
9
#include "envoy/server/configuration.h"
10
#include "envoy/thread_local/thread_local.h"
11
12
#include "source/common/config/utility.h"
13
#include "source/server/listener_manager_factory.h"
14
15
namespace Envoy {
16
namespace Server {
17
namespace {
18
19
std::unique_ptr<ConnectionHandler> getHandler(Event::Dispatcher& dispatcher, uint32_t index,
20
5.33k
                                              OverloadManager& overload_manager) {
21
22
5.33k
  auto* factory = Config::Utility::getFactoryByName<ConnectionHandlerFactory>(
23
5.33k
      "envoy.connection_handler.default");
24
5.33k
  if (factory) {
25
5.33k
    return factory->createConnectionHandler(dispatcher, index, overload_manager);
26
5.33k
  }
27
0
  ENVOY_LOG_MISC(debug, "Unable to find envoy.connection_handler.default factory");
28
0
  return nullptr;
29
5.33k
}
30
31
} // namespace
32
33
WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
34
5.33k
                                          const std::string& worker_name) {
35
5.33k
  Event::DispatcherPtr dispatcher(
36
5.33k
      api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
37
5.33k
  auto conn_handler = getHandler(*dispatcher, index, overload_manager);
38
5.33k
  return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
39
5.33k
                                      overload_manager, api_, stat_names_);
40
5.33k
}
41
42
WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
43
                       Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler,
44
                       OverloadManager& overload_manager, Api::Api& api,
45
                       WorkerStatNames& stat_names)
46
    : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)),
47
      api_(api), reset_streams_counter_(
48
5.33k
                     api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) {
49
5.33k
  tls_.registerThread(*dispatcher_, false);
50
5.33k
  overload_manager.registerForAction(
51
5.33k
      OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
52
5.33k
      [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); });
53
5.33k
  overload_manager.registerForAction(
54
5.33k
      OverloadActionNames::get().RejectIncomingConnections, *dispatcher_,
55
5.33k
      [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); });
56
5.33k
  overload_manager.registerForAction(
57
5.33k
      OverloadActionNames::get().ResetStreams, *dispatcher_,
58
5.33k
      [this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); });
59
5.33k
}
60
61
void WorkerImpl::addListener(absl::optional<uint64_t> overridden_listener,
62
                             Network::ListenerConfig& listener, AddListenerCompletion completion,
63
2.66k
                             Runtime::Loader& runtime) {
64
2.66k
  dispatcher_->post([this, overridden_listener, &listener, &runtime, completion]() -> void {
65
2.66k
    handler_->addListener(overridden_listener, listener, runtime);
66
2.66k
    hooks_.onWorkerListenerAdded();
67
2.66k
    completion();
68
2.66k
  });
69
2.66k
}
70
71
6.43k
uint64_t WorkerImpl::numConnections() const {
72
6.43k
  uint64_t ret = 0;
73
6.43k
  if (handler_) {
74
3.78k
    ret = handler_->numConnections();
75
3.78k
  }
76
6.43k
  return ret;
77
6.43k
}
78
79
void WorkerImpl::removeListener(Network::ListenerConfig& listener,
80
0
                                std::function<void()> completion) {
81
0
  ASSERT(thread_);
82
0
  const uint64_t listener_tag = listener.listenerTag();
83
0
  dispatcher_->post([this, listener_tag, completion]() -> void {
84
0
    handler_->removeListeners(listener_tag);
85
0
    completion();
86
0
    hooks_.onWorkerListenerRemoved();
87
0
  });
88
0
}
89
90
void WorkerImpl::removeFilterChains(uint64_t listener_tag,
91
                                    const std::list<const Network::FilterChain*>& filter_chains,
92
0
                                    std::function<void()> completion) {
93
0
  ASSERT(thread_);
94
0
  dispatcher_->post(
95
0
      [this, listener_tag, &filter_chains, completion = std::move(completion)]() -> void {
96
0
        handler_->removeFilterChains(listener_tag, filter_chains, completion);
97
0
      });
98
0
}
99
100
2.64k
void WorkerImpl::start(GuardDog& guard_dog, const std::function<void()>& cb) {
101
2.64k
  ASSERT(!thread_);
102
103
  // In posix, thread names are limited to 15 characters, so contrive to make
104
  // sure all interesting data fits there. The naming occurs in
105
  // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
106
  // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
107
  // for the thread index, leaving us 4 bytes left to distinguish between the
108
  // two threads used per dispatcher. We'll call this one "dsp:" and the
109
  // one allocated in guarddog_impl.cc "dog:".
110
  //
111
  // TODO(jmarantz): consider refactoring how this naming works so this naming
112
  // architecture is centralized, resulting in clearer names.
113
2.64k
  Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
114
2.64k
  thread_ = api_.threadFactory().createThread(
115
2.64k
      [this, &guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
116
2.64k
}
117
118
0
void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); }
119
120
2.64k
void WorkerImpl::stop() {
121
  // It's possible for the server to cleanly shut down while cluster initialization during startup
122
  // is happening, so we might not yet have a thread.
123
2.64k
  if (thread_) {
124
2.64k
    dispatcher_->exit();
125
2.64k
    thread_->join();
126
2.64k
  }
127
2.64k
}
128
129
void WorkerImpl::stopListener(Network::ListenerConfig& listener,
130
                              const Network::ExtraShutdownListenerOptions& options,
131
4
                              std::function<void()> completion) {
132
4
  const uint64_t listener_tag = listener.listenerTag();
133
4
  dispatcher_->post([this, listener_tag, options, completion]() -> void {
134
4
    handler_->stopListeners(listener_tag, options);
135
4
    if (completion != nullptr) {
136
4
      completion();
137
4
    }
138
4
  });
139
4
}
140
141
2.64k
void WorkerImpl::threadRoutine(GuardDog& guard_dog, const std::function<void()>& cb) {
142
2.64k
  ENVOY_LOG(debug, "worker entering dispatch loop");
143
  // The watch dog must be created after the dispatcher starts running and has post events flushed,
144
  // as this is when TLS stat scopes start working.
145
2.64k
  dispatcher_->post([this, &guard_dog, cb]() {
146
2.64k
    cb();
147
2.64k
    watch_dog_ = guard_dog.createWatchDog(api_.threadFactory().currentThreadId(),
148
2.64k
                                          dispatcher_->name(), *dispatcher_);
149
2.64k
  });
150
2.64k
  dispatcher_->run(Event::Dispatcher::RunType::Block);
151
2.64k
  ENVOY_LOG(debug, "worker exited dispatch loop");
152
2.64k
  guard_dog.stopWatching(watch_dog_);
153
2.64k
  dispatcher_->shutdown();
154
155
  // We must close all active connections before we actually exit the thread. This prevents any
156
  // destructors from running on the main thread which might reference thread locals. Destroying
157
  // the handler does this which additionally purges the dispatcher delayed deletion list.
158
2.64k
  handler_.reset();
159
2.64k
  tls_.shutdownThread();
160
2.64k
  watch_dog_.reset();
161
2.64k
}
162
163
0
void WorkerImpl::stopAcceptingConnectionsCb(OverloadActionState state) {
164
0
  if (state.isSaturated()) {
165
0
    handler_->disableListeners();
166
0
  } else {
167
0
    handler_->enableListeners();
168
0
  }
169
0
}
170
171
0
void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) {
172
0
  handler_->setListenerRejectFraction(state.value());
173
0
}
174
175
0
void WorkerImpl::resetStreamsUsingExcessiveMemory(OverloadActionState state) {
176
0
  uint64_t streams_reset_count =
177
0
      dispatcher_->getWatermarkFactory().resetAccountsGivenPressure(state.value().value());
178
0
  reset_streams_counter_.add(streams_reset_count);
179
0
}
180
181
} // namespace Server
182
} // namespace Envoy