1
#include "source/server/worker_impl.h"
2

            
3
#include <functional>
4
#include <memory>
5

            
6
#include "envoy/event/dispatcher.h"
7
#include "envoy/event/timer.h"
8
#include "envoy/network/exception.h"
9
#include "envoy/server/configuration.h"
10
#include "envoy/thread_local/thread_local.h"
11

            
12
#include "source/common/config/utility.h"
13
#include "source/server/listener_manager_factory.h"
14

            
15
namespace Envoy {
16
namespace Server {
17
namespace {
18

            
19
std::unique_ptr<ConnectionHandler> getHandler(Event::Dispatcher& dispatcher, uint32_t index,
20
                                              OverloadManager& overload_manager,
21
10859
                                              OverloadManager& null_overload_manager) {
22

            
23
10859
  auto* factory = Config::Utility::getFactoryByName<ConnectionHandlerFactory>(
24
10859
      "envoy.connection_handler.default");
25
10859
  if (factory) {
26
10859
    return factory->createConnectionHandler(dispatcher, index, overload_manager,
27
10859
                                            null_overload_manager);
28
10859
  }
29
  ENVOY_LOG_MISC(debug, "Unable to find envoy.connection_handler.default factory");
30
  return nullptr;
31
10859
}
32

            
33
} // namespace
34

            
35
WorkerPtr ProdWorkerFactory::createWorker(uint32_t index, OverloadManager& overload_manager,
36
                                          OverloadManager& null_overload_manager,
37
10859
                                          const std::string& worker_name) {
38
10859
  Event::DispatcherPtr dispatcher(
39
10859
      api_.allocateDispatcher(worker_name, overload_manager.scaledTimerFactory()));
40
10859
  auto conn_handler = getHandler(*dispatcher, index, overload_manager, null_overload_manager);
41
10859
  return std::make_unique<WorkerImpl>(tls_, hooks_, std::move(dispatcher), std::move(conn_handler),
42
10859
                                      overload_manager, api_, stat_names_);
43
10859
}
44

            
45
WorkerImpl::WorkerImpl(ThreadLocal::Instance& tls, ListenerHooks& hooks,
46
                       Event::DispatcherPtr&& dispatcher, Network::ConnectionHandlerPtr handler,
47
                       OverloadManager& overload_manager, Api::Api& api,
48
                       WorkerStatNames& stat_names)
49
10861
    : tls_(tls), hooks_(hooks), dispatcher_(std::move(dispatcher)), handler_(std::move(handler)),
50
10861
      api_(api), reset_streams_counter_(
51
10861
                     api_.rootScope().counterFromStatName(stat_names.reset_high_memory_stream_)) {
52
10861
  tls_.registerThread(*dispatcher_, false);
53
10861
  overload_manager.registerForAction(
54
10861
      OverloadActionNames::get().StopAcceptingConnections, *dispatcher_,
55
10862
      [this](OverloadActionState state) { stopAcceptingConnectionsCb(state); });
56
10861
  overload_manager.registerForAction(
57
10861
      OverloadActionNames::get().RejectIncomingConnections, *dispatcher_,
58
10861
      [this](OverloadActionState state) { rejectIncomingConnectionsCb(state); });
59
10861
  overload_manager.registerForAction(
60
10861
      OverloadActionNames::get().ResetStreams, *dispatcher_,
61
10861
      [this](OverloadActionState state) { resetStreamsUsingExcessiveMemory(state); });
62
10861
}
63

            
64
void WorkerImpl::addListener(absl::optional<uint64_t> overridden_listener,
65
                             Network::ListenerConfig& listener, AddListenerCompletion completion,
66
11489
                             Runtime::Loader& runtime, Random::RandomGenerator& random) {
67
11489
  dispatcher_->post(
68
11489
      [this, overridden_listener, &listener, &runtime, &random, completion]() -> void {
69
11489
        handler_->addListener(overridden_listener, listener, runtime, random);
70
11489
        hooks_.onWorkerListenerAdded();
71
11489
        completion();
72
11489
      });
73
11489
}
74

            
75
22752
uint64_t WorkerImpl::numConnections() const {
76
22752
  uint64_t ret = 0;
77
22752
  if (handler_) {
78
12128
    ret = handler_->numConnections();
79
12128
  }
80
22752
  return ret;
81
22752
}
82

            
83
void WorkerImpl::removeListener(Network::ListenerConfig& listener,
84
83
                                std::function<void()> completion) {
85
83
  ASSERT(thread_);
86
83
  const uint64_t listener_tag = listener.listenerTag();
87
83
  dispatcher_->post([this, listener_tag, completion]() -> void {
88
83
    handler_->removeListeners(listener_tag);
89
83
    completion();
90
83
    hooks_.onWorkerListenerRemoved();
91
83
  });
92
83
}
93

            
94
void WorkerImpl::removeFilterChains(uint64_t listener_tag,
95
                                    const std::list<const Network::FilterChain*>& filter_chains,
96
25
                                    std::function<void()> completion) {
97
25
  ASSERT(thread_);
98
25
  dispatcher_->post(
99
25
      [this, listener_tag, &filter_chains, completion = std::move(completion)]() -> void {
100
25
        handler_->removeFilterChains(listener_tag, filter_chains, completion);
101
25
      });
102
25
}
103

            
104
10638
void WorkerImpl::start(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) {
105
10638
  ASSERT(!thread_);
106

            
107
  // In posix, thread names are limited to 15 characters, so contrive to make
108
  // sure all interesting data fits there. The naming occurs in
109
  // ListenerManagerImpl's constructor: absl::StrCat("worker_", i). Let's say we
110
  // have 9999 threads. We'd need, so we need 7 bytes for "worker_", 4 bytes
111
  // for the thread index, leaving us 4 bytes left to distinguish between the
112
  // two threads used per dispatcher. We'll call this one "dsp:" and the
113
  // one allocated in guarddog_impl.cc "dog:".
114
  //
115
  // TODO(jmarantz): consider refactoring how this naming works so this naming
116
  // architecture is centralized, resulting in clearer names.
117
10638
  Thread::Options options{absl::StrCat("wrk:", dispatcher_->name())};
118
10638
  thread_ = api_.threadFactory().createThread(
119
10638
      [this, guard_dog, cb]() -> void { threadRoutine(guard_dog, cb); }, options);
120
10638
}
121

            
122
6
void WorkerImpl::initializeStats(Stats::Scope& scope) { dispatcher_->initializeStats(scope); }
123

            
124
10638
void WorkerImpl::stop() {
125
  // It's possible for the server to cleanly shut down while cluster initialization during startup
126
  // is happening, so we might not yet have a thread.
127
10638
  if (thread_) {
128
10638
    dispatcher_->exit();
129
10638
    thread_->join();
130
10638
  }
131
10638
}
132

            
133
void WorkerImpl::stopListener(Network::ListenerConfig& listener,
134
                              const Network::ExtraShutdownListenerOptions& options,
135
161
                              std::function<void()> completion) {
136
161
  const uint64_t listener_tag = listener.listenerTag();
137
161
  dispatcher_->post([this, listener_tag, options, completion]() -> void {
138
161
    handler_->stopListeners(listener_tag, options);
139
161
    if (completion != nullptr) {
140
161
      completion();
141
161
    }
142
161
  });
143
161
}
144

            
145
10638
void WorkerImpl::threadRoutine(OptRef<GuardDog> guard_dog, const std::function<void()>& cb) {
146
10638
  ENVOY_LOG(debug, "worker entering dispatch loop");
147
  // The watch dog must be created after the dispatcher starts running and has post events flushed,
148
  // as this is when TLS stat scopes start working.
149
10638
  dispatcher_->post([this, &guard_dog, cb]() {
150
10638
    cb();
151
10638
    if (guard_dog.has_value()) {
152
10638
      watch_dog_ = guard_dog->createWatchDog(api_.threadFactory().currentThreadId(),
153
10638
                                             dispatcher_->name(), *dispatcher_);
154
10638
    }
155
10638
  });
156
10638
  dispatcher_->run(Event::Dispatcher::RunType::Block);
157
10638
  ENVOY_LOG(debug, "worker exited dispatch loop");
158
10638
  if (guard_dog.has_value()) {
159
10638
    guard_dog->stopWatching(watch_dog_);
160
10638
  }
161
10638
  dispatcher_->shutdown();
162

            
163
  // We must close all active connections before we actually exit the thread. This prevents any
164
  // destructors from running on the main thread which might reference thread locals. Destroying
165
  // the handler does this which additionally purges the dispatcher delayed deletion list.
166
10638
  handler_.reset();
167
10638
  tls_.shutdownThread();
168
10638
  watch_dog_.reset();
169
10638
}
170

            
171
20
void WorkerImpl::stopAcceptingConnectionsCb(OverloadActionState state) {
172
20
  if (state.isSaturated()) {
173
10
    handler_->disableListeners();
174
10
  } else {
175
10
    handler_->enableListeners();
176
10
  }
177
20
}
178

            
179
void WorkerImpl::rejectIncomingConnectionsCb(OverloadActionState state) {
180
  handler_->setListenerRejectFraction(state.value());
181
}
182

            
183
20
void WorkerImpl::resetStreamsUsingExcessiveMemory(OverloadActionState state) {
184
20
  uint64_t streams_reset_count =
185
20
      dispatcher_->getWatermarkFactory().resetAccountsGivenPressure(state.value().value());
186
20
  reset_streams_counter_.add(streams_reset_count);
187
20
}
188

            
189
} // namespace Server
190
} // namespace Envoy