Line data Source code
1 : #include "source/server/server.h"
2 :
3 : #include <csignal>
4 : #include <cstdint>
5 : #include <ctime>
6 : #include <functional>
7 : #include <memory>
8 : #include <string>
9 :
10 : #include "envoy/admin/v3/config_dump.pb.h"
11 : #include "envoy/common/exception.h"
12 : #include "envoy/common/time.h"
13 : #include "envoy/config/bootstrap/v3/bootstrap.pb.h"
14 : #include "envoy/config/bootstrap/v3/bootstrap.pb.validate.h"
15 : #include "envoy/event/dispatcher.h"
16 : #include "envoy/event/signal.h"
17 : #include "envoy/event/timer.h"
18 : #include "envoy/network/dns.h"
19 : #include "envoy/registry/registry.h"
20 : #include "envoy/server/bootstrap_extension_config.h"
21 : #include "envoy/server/instance.h"
22 : #include "envoy/server/options.h"
23 : #include "envoy/stats/histogram.h"
24 : #include "envoy/stats/stats.h"
25 : #include "envoy/upstream/cluster_manager.h"
26 :
27 : #include "source/common/api/api_impl.h"
28 : #include "source/common/api/os_sys_calls_impl.h"
29 : #include "source/common/common/enum_to_int.h"
30 : #include "source/common/common/mutex_tracer_impl.h"
31 : #include "source/common/common/utility.h"
32 : #include "source/common/config/utility.h"
33 : #include "source/common/config/well_known_names.h"
34 : #include "source/common/config/xds_resource.h"
35 : #include "source/common/http/codes.h"
36 : #include "source/common/http/headers.h"
37 : #include "source/common/local_info/local_info_impl.h"
38 : #include "source/common/memory/stats.h"
39 : #include "source/common/network/address_impl.h"
40 : #include "source/common/network/dns_resolver/dns_factory_util.h"
41 : #include "source/common/network/socket_interface.h"
42 : #include "source/common/network/socket_interface_impl.h"
43 : #include "source/common/protobuf/utility.h"
44 : #include "source/common/router/rds_impl.h"
45 : #include "source/common/runtime/runtime_impl.h"
46 : #include "source/common/runtime/runtime_keys.h"
47 : #include "source/common/signal/fatal_error_handler.h"
48 : #include "source/common/singleton/manager_impl.h"
49 : #include "source/common/stats/thread_local_store.h"
50 : #include "source/common/stats/timespan_impl.h"
51 : #include "source/common/upstream/cluster_manager_impl.h"
52 : #include "source/common/version/version.h"
53 : #include "source/server/configuration_impl.h"
54 : #include "source/server/listener_hooks.h"
55 : #include "source/server/listener_manager_factory.h"
56 : #include "source/server/regex_engine.h"
57 : #include "source/server/ssl_context_manager.h"
58 : #include "source/server/utils.h"
59 :
60 : namespace Envoy {
61 : namespace Server {
62 :
63 : namespace {
64 135 : std::unique_ptr<ConnectionHandler> getHandler(Event::Dispatcher& dispatcher) {
65 :
66 135 : auto* factory = Config::Utility::getFactoryByName<ConnectionHandlerFactory>(
67 135 : "envoy.connection_handler.default");
68 135 : if (factory) {
69 135 : return factory->createConnectionHandler(dispatcher, absl::nullopt);
70 135 : }
71 0 : ENVOY_LOG_MISC(debug, "Unable to find envoy.connection_handler.default factory");
72 0 : return nullptr;
73 135 : }
74 :
75 : } // namespace
76 :
77 : InstanceBase::InstanceBase(Init::Manager& init_manager, const Options& options,
78 : Event::TimeSystem& time_system, ListenerHooks& hooks,
79 : HotRestart& restarter, Stats::StoreRoot& store,
80 : Thread::BasicLockable& access_log_lock,
81 : Random::RandomGeneratorPtr&& random_generator,
82 : ThreadLocal::Instance& tls, Thread::ThreadFactory& thread_factory,
83 : Filesystem::Instance& file_system,
84 : std::unique_ptr<ProcessContext> process_context,
85 : Buffer::WatermarkFactorySharedPtr watermark_factory)
86 : : init_manager_(init_manager), live_(false), options_(options),
87 : validation_context_(options_.allowUnknownStaticFields(),
88 : !options.rejectUnknownDynamicFields(),
89 : options.ignoreUnknownDynamicFields()),
90 : time_source_(time_system), restarter_(restarter), start_time_(time(nullptr)),
91 : original_start_time_(start_time_), stats_store_(store), thread_local_(tls),
92 : random_generator_(std::move(random_generator)),
93 : api_(new Api::Impl(
94 : thread_factory, store, time_system, file_system, *random_generator_, bootstrap_,
95 : process_context ? ProcessContextOptRef(std::ref(*process_context)) : absl::nullopt,
96 : watermark_factory)),
97 : dispatcher_(api_->allocateDispatcher("main_thread")),
98 : access_log_manager_(options.fileFlushIntervalMsec(), *api_, *dispatcher_, access_log_lock,
99 : store),
100 : singleton_manager_(new Singleton::ManagerImpl(api_->threadFactory())),
101 : handler_(getHandler(*dispatcher_)), worker_factory_(thread_local_, *api_, hooks),
102 : mutex_tracer_(options.mutexTracingEnabled() ? &Envoy::MutexTracerImpl::getOrCreateTracer()
103 : : nullptr),
104 : grpc_context_(store.symbolTable()), http_context_(store.symbolTable()),
105 : router_context_(store.symbolTable()), process_context_(std::move(process_context)),
106 : hooks_(hooks), quic_stat_names_(store.symbolTable()), server_contexts_(*this),
107 135 : enable_reuse_port_default_(true), stats_flush_in_progress_(false) {}
108 :
109 135 : InstanceBase::~InstanceBase() {
110 135 : terminate();
111 :
112 : // Stop logging to file before all the AccessLogManager and its dependencies are
113 : // destructed to avoid crashing at shutdown.
114 135 : file_logger_.reset();
115 :
116 : // Destruct the ListenerManager explicitly, before InstanceBase's local init_manager_ is
117 : // destructed.
118 : //
119 : // The ListenerManager's DestinationPortsMap contains FilterChainSharedPtrs. There is a rare race
120 : // condition where one of these FilterChains contains an HttpConnectionManager, which contains an
121 : // RdsRouteConfigProvider, which contains an RdsRouteConfigSubscriptionSharedPtr. Since
122 : // RdsRouteConfigSubscription is an Init::Target, ~RdsRouteConfigSubscription triggers a callback
123 : // set at initialization, which goes to unregister it from the top-level InitManager, which has
124 : // already been destructed (use-after-free) causing a segfault.
125 135 : ENVOY_LOG(debug, "destroying listener manager");
126 135 : listener_manager_.reset();
127 135 : ENVOY_LOG(debug, "destroyed listener manager");
128 135 : dispatcher_->shutdown();
129 :
130 : #ifdef ENVOY_PERFETTO
131 : if (tracing_session_ != nullptr) {
132 : // Flush the trace data.
133 : perfetto::TrackEvent::Flush();
134 : // Disable tracing and block until tracing has stopped.
135 : tracing_session_->StopBlocking();
136 : close(tracing_fd_);
137 : }
138 : #endif
139 135 : }
140 :
141 2223 : Upstream::ClusterManager& InstanceBase::clusterManager() {
142 2223 : ASSERT(config_.clusterManager() != nullptr);
143 2223 : return *config_.clusterManager();
144 2223 : }
145 :
146 0 : const Upstream::ClusterManager& InstanceBase::clusterManager() const {
147 0 : ASSERT(config_.clusterManager() != nullptr);
148 0 : return *config_.clusterManager();
149 0 : }
150 :
151 0 : void InstanceBase::drainListeners(OptRef<const Network::ExtraShutdownListenerOptions> options) {
152 0 : ENVOY_LOG(info, "closing and draining listeners");
153 0 : listener_manager_->stopListeners(ListenerManager::StopListenersType::All,
154 0 : options.has_value() ? *options
155 0 : : Network::ExtraShutdownListenerOptions{});
156 0 : drain_manager_->startDrainSequence([] {});
157 0 : }
158 :
159 134 : void InstanceBase::failHealthcheck(bool fail) {
160 134 : live_.store(!fail);
161 134 : server_stats_->live_.set(live_.load());
162 134 : }
163 :
164 : MetricSnapshotImpl::MetricSnapshotImpl(Stats::Store& store,
165 : Upstream::ClusterManager& cluster_manager,
166 111 : TimeSource& time_source) {
167 111 : store.forEachSinkedCounter(
168 111 : [this](std::size_t size) {
169 111 : snapped_counters_.reserve(size);
170 111 : counters_.reserve(size);
171 111 : },
172 36262 : [this](Stats::Counter& counter) {
173 36262 : snapped_counters_.push_back(Stats::CounterSharedPtr(&counter));
174 36262 : counters_.push_back({counter.latch(), counter});
175 36262 : });
176 :
177 111 : store.forEachSinkedGauge(
178 111 : [this](std::size_t size) {
179 111 : snapped_gauges_.reserve(size);
180 111 : gauges_.reserve(size);
181 111 : },
182 10143 : [this](Stats::Gauge& gauge) {
183 10143 : snapped_gauges_.push_back(Stats::GaugeSharedPtr(&gauge));
184 10143 : gauges_.push_back(gauge);
185 10143 : });
186 :
187 111 : store.forEachSinkedHistogram(
188 111 : [this](std::size_t size) {
189 98 : snapped_histograms_.reserve(size);
190 98 : histograms_.reserve(size);
191 98 : },
192 111 : [this](Stats::ParentHistogram& histogram) {
193 0 : snapped_histograms_.push_back(Stats::ParentHistogramSharedPtr(&histogram));
194 0 : histograms_.push_back(histogram);
195 0 : });
196 :
197 111 : store.forEachSinkedTextReadout(
198 111 : [this](std::size_t size) {
199 111 : snapped_text_readouts_.reserve(size);
200 111 : text_readouts_.reserve(size);
201 111 : },
202 241 : [this](Stats::TextReadout& text_readout) {
203 230 : snapped_text_readouts_.push_back(Stats::TextReadoutSharedPtr(&text_readout));
204 230 : text_readouts_.push_back(text_readout);
205 230 : });
206 :
207 111 : Upstream::HostUtility::forEachHostMetric(
208 111 : cluster_manager,
209 111 : [this](Stats::PrimitiveCounterSnapshot&& metric) {
210 0 : host_counters_.emplace_back(std::move(metric));
211 0 : },
212 111 : [this](Stats::PrimitiveGaugeSnapshot&& metric) {
213 0 : host_gauges_.emplace_back(std::move(metric));
214 0 : });
215 :
216 111 : snapshot_time_ = time_source.systemTime();
217 111 : }
218 :
219 : void InstanceUtil::flushMetricsToSinks(const std::list<Stats::SinkPtr>& sinks, Stats::Store& store,
220 111 : Upstream::ClusterManager& cm, TimeSource& time_source) {
221 : // Create a snapshot and flush to all sinks.
222 : // NOTE: Even if there are no sinks, creating the snapshot has the important property that it
223 : // latches all counters on a periodic basis. The hot restart code assumes this is being
224 : // done so this should not be removed.
225 111 : MetricSnapshotImpl snapshot(store, cm, time_source);
226 111 : for (const auto& sink : sinks) {
227 0 : sink->flush(snapshot);
228 0 : }
229 111 : }
230 :
231 111 : void InstanceBase::flushStats() {
232 111 : if (stats_flush_in_progress_) {
233 0 : ENVOY_LOG(debug, "skipping stats flush as flush is already in progress");
234 0 : server_stats_->dropped_stat_flushes_.inc();
235 0 : return;
236 0 : }
237 :
238 111 : stats_flush_in_progress_ = true;
239 111 : ENVOY_LOG(debug, "flushing stats");
240 : // If Envoy is not fully initialized, workers will not be started and mergeHistograms
241 : // completion callback is not called immediately. As a result of this server stats will
242 : // not be updated and flushed to stat sinks. So skip mergeHistograms call if workers are
243 : // not started yet.
244 111 : if (initManager().state() == Init::Manager::State::Initialized) {
245 : // A shutdown initiated before this callback may prevent this from being called as per
246 : // the semantics documented in ThreadLocal's runOnAllThreads method.
247 94 : stats_store_.mergeHistograms([this]() -> void { flushStatsInternal(); });
248 107 : } else {
249 17 : ENVOY_LOG(debug, "Envoy is not fully initialized, skipping histogram merge and flushing stats");
250 17 : flushStatsInternal();
251 17 : }
252 111 : }
253 :
254 205 : void InstanceBase::updateServerStats() {
255 : // mergeParentStatsIfAny() does nothing and returns a struct of 0s if there is no parent.
256 205 : HotRestart::ServerStatsFromParent parent_stats = restarter_.mergeParentStatsIfAny(stats_store_);
257 :
258 205 : server_stats_->uptime_.set(time(nullptr) - original_start_time_);
259 205 : server_stats_->memory_allocated_.set(Memory::Stats::totalCurrentlyAllocated() +
260 205 : parent_stats.parent_memory_allocated_);
261 205 : server_stats_->memory_heap_size_.set(Memory::Stats::totalCurrentlyReserved());
262 205 : server_stats_->memory_physical_size_.set(Memory::Stats::totalPhysicalBytes());
263 205 : server_stats_->parent_connections_.set(parent_stats.parent_connections_);
264 205 : server_stats_->total_connections_.set(listener_manager_->numConnections() +
265 205 : parent_stats.parent_connections_);
266 205 : server_stats_->days_until_first_cert_expiring_.set(
267 205 : sslContextManager().daysUntilFirstCertExpires().value_or(0));
268 :
269 205 : auto secs_until_ocsp_response_expires =
270 205 : sslContextManager().secondsUntilFirstOcspResponseExpires();
271 205 : if (secs_until_ocsp_response_expires) {
272 0 : server_stats_->seconds_until_first_ocsp_response_expiring_.set(
273 0 : secs_until_ocsp_response_expires.value());
274 0 : }
275 205 : server_stats_->state_.set(
276 205 : enumToInt(Utility::serverState(initManager().state(), healthCheckFailed())));
277 205 : server_stats_->stats_recent_lookups_.set(
278 205 : stats_store_.symbolTable().getRecentLookups([](absl::string_view, uint64_t) {}));
279 205 : }
280 :
281 111 : void InstanceBase::flushStatsInternal() {
282 111 : updateServerStats();
283 111 : auto& stats_config = config_.statsConfig();
284 111 : InstanceUtil::flushMetricsToSinks(stats_config.sinks(), stats_store_, clusterManager(),
285 111 : timeSource());
286 : // TODO(ramaraochavali): consider adding different flush interval for histograms.
287 111 : if (stat_flush_timer_ != nullptr) {
288 111 : stat_flush_timer_->enableTimer(stats_config.flushInterval());
289 111 : }
290 :
291 111 : stats_flush_in_progress_ = false;
292 111 : }
293 :
294 624 : bool InstanceBase::healthCheckFailed() { return !live_.load(); }
295 :
296 0 : ProcessContextOptRef InstanceBase::processContext() {
297 0 : if (process_context_ == nullptr) {
298 0 : return absl::nullopt;
299 0 : }
300 :
301 0 : return *process_context_;
302 0 : }
303 :
304 : namespace {
305 :
306 0 : bool canBeRegisteredAsInlineHeader(const Http::LowerCaseString& header_name) {
307 : // 'set-cookie' cannot currently be registered as an inline header.
308 0 : if (header_name == Http::Headers::get().SetCookie) {
309 0 : return false;
310 0 : }
311 0 : return true;
312 0 : }
313 :
314 : void registerCustomInlineHeadersFromBootstrap(
315 134 : const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
316 134 : for (const auto& inline_header : bootstrap.inline_headers()) {
317 0 : const Http::LowerCaseString lower_case_name(inline_header.inline_header_name());
318 0 : if (!canBeRegisteredAsInlineHeader(lower_case_name)) {
319 0 : throwEnvoyExceptionOrPanic(fmt::format("Header {} cannot be registered as an inline header.",
320 0 : inline_header.inline_header_name()));
321 0 : }
322 0 : switch (inline_header.inline_header_type()) {
323 0 : case envoy::config::bootstrap::v3::CustomInlineHeader::REQUEST_HEADER:
324 0 : Http::CustomInlineHeaderRegistry::registerInlineHeader<
325 0 : Http::RequestHeaderMap::header_map_type>(lower_case_name);
326 0 : break;
327 0 : case envoy::config::bootstrap::v3::CustomInlineHeader::REQUEST_TRAILER:
328 0 : Http::CustomInlineHeaderRegistry::registerInlineHeader<
329 0 : Http::RequestTrailerMap::header_map_type>(lower_case_name);
330 0 : break;
331 0 : case envoy::config::bootstrap::v3::CustomInlineHeader::RESPONSE_HEADER:
332 0 : Http::CustomInlineHeaderRegistry::registerInlineHeader<
333 0 : Http::ResponseHeaderMap::header_map_type>(lower_case_name);
334 0 : break;
335 0 : case envoy::config::bootstrap::v3::CustomInlineHeader::RESPONSE_TRAILER:
336 0 : Http::CustomInlineHeaderRegistry::registerInlineHeader<
337 0 : Http::ResponseTrailerMap::header_map_type>(lower_case_name);
338 0 : break;
339 0 : default:
340 0 : PANIC("not implemented");
341 0 : }
342 0 : }
343 134 : }
344 :
345 : } // namespace
346 :
347 : void InstanceUtil::loadBootstrapConfig(envoy::config::bootstrap::v3::Bootstrap& bootstrap,
348 : const Options& options,
349 : ProtobufMessage::ValidationVisitor& validation_visitor,
350 455 : Api::Api& api) {
351 455 : const std::string& config_path = options.configPath();
352 455 : const std::string& config_yaml = options.configYaml();
353 455 : const envoy::config::bootstrap::v3::Bootstrap& config_proto = options.configProto();
354 :
355 : // One of config_path and config_yaml or bootstrap should be specified.
356 455 : if (config_path.empty() && config_yaml.empty() && config_proto.ByteSizeLong() == 0) {
357 0 : throwEnvoyExceptionOrPanic(
358 0 : "At least one of --config-path or --config-yaml or Options::configProto() "
359 0 : "should be non-empty");
360 0 : }
361 :
362 455 : if (!config_path.empty()) {
363 455 : #ifdef ENVOY_ENABLE_YAML
364 455 : MessageUtil::loadFromFile(config_path, bootstrap, validation_visitor, api);
365 : #else
366 : if (!config_path.empty()) {
367 : throwEnvoyExceptionOrPanic("Cannot load from file with YAML disabled\n");
368 : }
369 : UNREFERENCED_PARAMETER(api);
370 : #endif
371 455 : }
372 455 : if (!config_yaml.empty()) {
373 0 : #ifdef ENVOY_ENABLE_YAML
374 0 : envoy::config::bootstrap::v3::Bootstrap bootstrap_override;
375 0 : MessageUtil::loadFromYaml(config_yaml, bootstrap_override, validation_visitor);
376 : // TODO(snowp): The fact that we do a merge here doesn't seem to be covered under test.
377 0 : bootstrap.MergeFrom(bootstrap_override);
378 : #else
379 : throwEnvoyExceptionOrPanic("Cannot load from YAML with YAML disabled\n");
380 : #endif
381 0 : }
382 455 : if (config_proto.ByteSizeLong() != 0) {
383 0 : bootstrap.MergeFrom(config_proto);
384 0 : }
385 455 : MessageUtil::validate(bootstrap, validation_visitor);
386 455 : }
387 :
388 : void InstanceBase::initialize(Network::Address::InstanceConstSharedPtr local_address,
389 135 : ComponentFactory& component_factory) {
390 135 : std::function set_up_logger = [&] {
391 0 : TRY_ASSERT_MAIN_THREAD {
392 0 : file_logger_ = std::make_unique<Logger::FileSinkDelegate>(
393 0 : options_.logPath(), access_log_manager_, Logger::Registry::getSink());
394 0 : }
395 0 : END_TRY
396 0 : CATCH(const EnvoyException& e, {
397 0 : throw EnvoyException(
398 0 : fmt::format("Failed to open log-file '{}'. e.what(): {}", options_.logPath(), e.what()));
399 0 : });
400 0 : };
401 :
402 135 : TRY_ASSERT_MAIN_THREAD {
403 135 : if (!options_.logPath().empty()) {
404 0 : set_up_logger();
405 0 : }
406 135 : restarter_.initialize(*dispatcher_, *this);
407 135 : drain_manager_ = component_factory.createDrainManager(*this);
408 135 : initializeOrThrow(std::move(local_address), component_factory);
409 135 : }
410 135 : END_TRY
411 135 : MULTI_CATCH(
412 135 : const EnvoyException& e,
413 135 : {
414 135 : ENVOY_LOG(critical, "error initializing config '{} {} {}': {}",
415 135 : options_.configProto().DebugString(), options_.configYaml(),
416 135 : options_.configPath(), e.what());
417 135 : terminate();
418 135 : throw;
419 135 : },
420 135 : {
421 135 : ENVOY_LOG(critical, "error initializing due to unknown exception");
422 135 : terminate();
423 135 : throw;
424 135 : });
425 111 : }
426 :
427 : void InstanceBase::initializeOrThrow(Network::Address::InstanceConstSharedPtr local_address,
428 135 : ComponentFactory& component_factory) {
429 135 : ENVOY_LOG(info, "initializing epoch {} (base id={}, hot restart version={})",
430 135 : options_.restartEpoch(), restarter_.baseId(), restarter_.version());
431 :
432 135 : ENVOY_LOG(info, "statically linked extensions:");
433 6610 : for (const auto& ext : Envoy::Registry::FactoryCategoryRegistry::registeredFactories()) {
434 6610 : ENVOY_LOG(info, " {}: {}", ext.first, absl::StrJoin(ext.second->registeredNames(), ", "));
435 6610 : }
436 :
437 : // Handle configuration that needs to take place prior to the main configuration load.
438 135 : InstanceUtil::loadBootstrapConfig(bootstrap_, options_,
439 135 : messageValidationContext().staticValidationVisitor(), *api_);
440 135 : bootstrap_config_update_time_ = time_source_.systemTime();
441 :
442 135 : if (bootstrap_.has_application_log_config()) {
443 2 : THROW_IF_NOT_OK(
444 2 : Utility::assertExclusiveLogFormatMethod(options_, bootstrap_.application_log_config()));
445 2 : THROW_IF_NOT_OK(Utility::maybeSetApplicationLogFormat(bootstrap_.application_log_config()));
446 2 : }
447 :
448 : #ifdef ENVOY_PERFETTO
449 : perfetto::TracingInitArgs args;
450 : // Include in-process events only.
451 : args.backends = perfetto::kInProcessBackend;
452 : perfetto::Tracing::Initialize(args);
453 : perfetto::TrackEvent::Register();
454 :
455 : // Prepare a configuration for a new "Perfetto" tracing session.
456 : perfetto::TraceConfig cfg;
457 : // TODO(rojkov): make the tracer configurable with either "Perfetto"'s native
458 : // message or custom one embedded into Bootstrap.
459 : cfg.add_buffers()->set_size_kb(1024);
460 : auto* ds_cfg = cfg.add_data_sources()->mutable_config();
461 : ds_cfg->set_name("track_event");
462 :
463 : const std::string pftrace_path =
464 : PROTOBUF_GET_STRING_OR_DEFAULT(bootstrap_, perf_tracing_file_path, "envoy.pftrace");
465 : // Instantiate a new tracing session.
466 : tracing_session_ = perfetto::Tracing::NewTrace();
467 : tracing_fd_ = open(pftrace_path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0600);
468 : if (tracing_fd_ == -1) {
469 : throwEnvoyExceptionOrPanic(
470 : fmt::format("unable to open tracing file {}: {}", pftrace_path, errorDetails(errno)));
471 : }
472 : // Configure the tracing session.
473 : tracing_session_->Setup(cfg, tracing_fd_);
474 : // Enable tracing and block until tracing has started.
475 : tracing_session_->StartBlocking();
476 : #endif
477 :
478 : // Immediate after the bootstrap has been loaded, override the header prefix, if configured to
479 : // do so. This must be set before any other code block references the HeaderValues ConstSingleton.
480 135 : if (!bootstrap_.header_prefix().empty()) {
481 : // setPrefix has a release assert verifying that setPrefix() is not called after prefix()
482 0 : ThreadSafeSingleton<Http::PrefixValue>::get().setPrefix(bootstrap_.header_prefix().c_str());
483 0 : }
484 :
485 : // Register Custom O(1) headers from bootstrap.
486 135 : registerCustomInlineHeadersFromBootstrap(bootstrap_);
487 :
488 135 : ENVOY_LOG(info, "HTTP header map info:");
489 536 : for (const auto& info : Http::HeaderMapImplUtility::getAllHeaderMapImplInfo()) {
490 536 : ENVOY_LOG(info, " {}: {} bytes: {}", info.name_, info.size_,
491 536 : absl::StrJoin(info.registered_headers_, ","));
492 536 : }
493 :
494 : // Initialize the regex engine and inject to singleton.
495 : // Needs to happen before stats store initialization because the stats
496 : // matcher config can include regexes.
497 135 : regex_engine_ = createRegexEngine(
498 135 : bootstrap_, messageValidationContext().staticValidationVisitor(), serverFactoryContext());
499 :
500 : // Needs to happen as early as possible in the instantiation to preempt the objects that require
501 : // stats.
502 135 : stats_store_.setTagProducer(Config::Utility::createTagProducer(bootstrap_, options_.statsTags()));
503 135 : stats_store_.setStatsMatcher(
504 135 : Config::Utility::createStatsMatcher(bootstrap_, stats_store_.symbolTable()));
505 135 : stats_store_.setHistogramSettings(Config::Utility::createHistogramSettings(bootstrap_));
506 :
507 135 : const std::string server_stats_prefix = "server.";
508 135 : const std::string server_compilation_settings_stats_prefix = "server.compilation_settings";
509 135 : server_stats_ = std::make_unique<ServerStats>(
510 135 : ServerStats{ALL_SERVER_STATS(POOL_COUNTER_PREFIX(stats_store_, server_stats_prefix),
511 135 : POOL_GAUGE_PREFIX(stats_store_, server_stats_prefix),
512 135 : POOL_HISTOGRAM_PREFIX(stats_store_, server_stats_prefix))});
513 135 : server_compilation_settings_stats_ =
514 135 : std::make_unique<CompilationSettings::ServerCompilationSettingsStats>(
515 135 : CompilationSettings::ServerCompilationSettingsStats{ALL_SERVER_COMPILATION_SETTINGS_STATS(
516 135 : POOL_COUNTER_PREFIX(stats_store_, server_compilation_settings_stats_prefix),
517 135 : POOL_GAUGE_PREFIX(stats_store_, server_compilation_settings_stats_prefix),
518 135 : POOL_HISTOGRAM_PREFIX(stats_store_, server_compilation_settings_stats_prefix))});
519 135 : validation_context_.setCounters(server_stats_->static_unknown_fields_,
520 135 : server_stats_->dynamic_unknown_fields_,
521 135 : server_stats_->wip_protos_);
522 :
523 135 : initialization_timer_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
524 135 : server_stats_->initialization_time_ms_, timeSource());
525 135 : server_stats_->concurrency_.set(options_.concurrency());
526 135 : server_stats_->hot_restart_epoch_.set(options_.restartEpoch());
527 135 : InstanceBase::failHealthcheck(false);
528 :
529 : // Check if bootstrap has server version override set, if yes, we should use that as
530 : // 'server.version' stat.
531 135 : uint64_t version_int;
532 135 : if (bootstrap_.stats_server_version_override().value() > 0) {
533 0 : version_int = bootstrap_.stats_server_version_override().value();
534 135 : } else {
535 135 : if (!StringUtil::atoull(VersionInfo::revision().substr(0, 6).c_str(), version_int, 16)) {
536 0 : throwEnvoyExceptionOrPanic("compiled GIT SHA is invalid. Invalid build.");
537 0 : }
538 135 : }
539 135 : server_stats_->version_.set(version_int);
540 135 : if (VersionInfo::sslFipsCompliant()) {
541 0 : server_compilation_settings_stats_->fips_mode_.set(1);
542 135 : } else {
543 : // Set this explicitly so that "used" flag is set so that it can be pushed to stats sinks.
544 135 : server_compilation_settings_stats_->fips_mode_.set(0);
545 135 : }
546 :
547 : // If user has set user_agent_name in the bootstrap config, use it.
548 : // Default to "envoy" if unset.
549 135 : if (bootstrap_.node().user_agent_name().empty()) {
550 134 : bootstrap_.mutable_node()->set_user_agent_name("envoy");
551 134 : }
552 :
553 : // If user has set user_agent_build_version in the bootstrap config, use it.
554 : // Default to the internal server version.
555 135 : if (!bootstrap_.node().user_agent_build_version().has_version()) {
556 134 : *bootstrap_.mutable_node()->mutable_user_agent_build_version() = VersionInfo::buildVersion();
557 134 : }
558 :
559 6532 : for (const auto& ext : Envoy::Registry::FactoryCategoryRegistry::registeredFactories()) {
560 6532 : auto registered_types = ext.second->registeredTypes();
561 22410 : for (const auto& name : ext.second->allRegisteredNames()) {
562 22410 : auto* extension = bootstrap_.mutable_node()->add_extensions();
563 22410 : extension->set_name(std::string(name));
564 22410 : extension->set_category(ext.first);
565 22410 : auto const version = ext.second->getFactoryVersion(name);
566 22410 : if (version) {
567 0 : *extension->mutable_version() = version.value();
568 0 : }
569 22410 : extension->set_disabled(ext.second->isFactoryDisabled(name));
570 22410 : auto it = registered_types.find(name);
571 22410 : if (it != registered_types.end()) {
572 13849 : std::sort(it->second.begin(), it->second.end());
573 14672 : for (const auto& type_url : it->second) {
574 14672 : extension->add_type_urls(type_url);
575 14672 : }
576 13849 : }
577 22410 : }
578 6532 : }
579 :
580 135 : local_info_ = std::make_unique<LocalInfo::LocalInfoImpl>(
581 135 : stats().symbolTable(), bootstrap_.node(), bootstrap_.node_context_params(), local_address,
582 135 : options_.serviceZone(), options_.serviceClusterName(), options_.serviceNodeName());
583 :
584 135 : Configuration::InitialImpl initial_config(bootstrap_);
585 :
586 : // Learn original_start_time_ if our parent is still around to inform us of it.
587 135 : const auto parent_admin_shutdown_response = restarter_.sendParentAdminShutdownRequest();
588 135 : if (parent_admin_shutdown_response.has_value()) {
589 0 : original_start_time_ = parent_admin_shutdown_response.value().original_start_time_;
590 : // TODO(soulxu): This is added for switching the reuse port default value as true (#17259).
591 : // It ensures the same default value during the hot restart. This can be removed when
592 : // everyone switches to the new default value.
593 0 : enable_reuse_port_default_ =
594 0 : parent_admin_shutdown_response.value().enable_reuse_port_default_ ? true : false;
595 0 : }
596 :
597 135 : OptRef<Server::ConfigTracker> config_tracker;
598 135 : #ifdef ENVOY_ADMIN_FUNCTIONALITY
599 135 : admin_ = std::make_unique<AdminImpl>(initial_config.admin().profilePath(), *this,
600 135 : initial_config.admin().ignoreGlobalConnLimit());
601 :
602 135 : config_tracker = admin_->getConfigTracker();
603 135 : #endif
604 135 : secret_manager_ = std::make_unique<Secret::SecretManagerImpl>(config_tracker);
605 :
606 135 : loadServerFlags(initial_config.flagsPath());
607 :
608 : // Initialize the overload manager early so other modules can register for actions.
609 135 : overload_manager_ = createOverloadManager();
610 :
611 135 : maybeCreateHeapShrinker();
612 :
613 135 : for (const auto& bootstrap_extension : bootstrap_.bootstrap_extensions()) {
614 0 : auto& factory = Config::Utility::getAndCheckFactory<Configuration::BootstrapExtensionFactory>(
615 0 : bootstrap_extension);
616 0 : auto config = Config::Utility::translateAnyToFactoryConfig(
617 0 : bootstrap_extension.typed_config(), messageValidationContext().staticValidationVisitor(),
618 0 : factory);
619 0 : bootstrap_extensions_.push_back(
620 0 : factory.createBootstrapExtension(*config, serverFactoryContext()));
621 0 : }
622 :
623 : // Register the fatal actions.
624 135 : {
625 135 : FatalAction::FatalActionPtrList safe_actions;
626 135 : FatalAction::FatalActionPtrList unsafe_actions;
627 135 : for (const auto& action_config : bootstrap_.fatal_actions()) {
628 1 : auto& factory =
629 1 : Config::Utility::getAndCheckFactory<Server::Configuration::FatalActionFactory>(
630 1 : action_config.config());
631 1 : auto action = factory.createFatalActionFromProto(action_config, this);
632 :
633 1 : if (action->isAsyncSignalSafe()) {
634 0 : safe_actions.push_back(std::move(action));
635 1 : } else {
636 1 : unsafe_actions.push_back(std::move(action));
637 1 : }
638 1 : }
639 135 : Envoy::FatalErrorHandler::registerFatalActions(
640 135 : std::move(safe_actions), std::move(unsafe_actions), api_->threadFactory());
641 135 : }
642 :
643 135 : if (!bootstrap_.default_socket_interface().empty()) {
644 0 : auto& sock_name = bootstrap_.default_socket_interface();
645 0 : auto sock = const_cast<Network::SocketInterface*>(Network::socketInterface(sock_name));
646 0 : if (sock != nullptr) {
647 0 : Network::SocketInterfaceSingleton::clear();
648 0 : Network::SocketInterfaceSingleton::initialize(sock);
649 0 : }
650 0 : }
651 :
652 135 : ListenerManagerFactory* listener_manager_factory = nullptr;
653 135 : if (bootstrap_.has_listener_manager()) {
654 0 : listener_manager_factory = Config::Utility::getAndCheckFactory<ListenerManagerFactory>(
655 0 : bootstrap_.listener_manager(), false);
656 135 : } else {
657 135 : listener_manager_factory = &Config::Utility::getAndCheckFactoryByName<ListenerManagerFactory>(
658 135 : Config::ServerExtensionValues::get().DEFAULT_LISTENER);
659 135 : }
660 :
661 : // Workers get created first so they register for thread local updates.
662 135 : listener_manager_ = listener_manager_factory->createListenerManager(
663 135 : *this, nullptr, worker_factory_, bootstrap_.enable_dispatcher_stats(), quic_stat_names_);
664 :
665 : // The main thread is also registered for thread local updates so that code that does not care
666 : // whether it runs on the main thread or on workers can still use TLS.
667 135 : thread_local_.registerThread(*dispatcher_, true);
668 :
669 : // We can now initialize stats for threading.
670 135 : stats_store_.initializeThreading(*dispatcher_, thread_local_);
671 :
672 : // It's now safe to start writing stats from the main thread's dispatcher.
673 135 : if (bootstrap_.enable_dispatcher_stats()) {
674 2 : dispatcher_->initializeStats(*stats_store_.rootScope(), "server.");
675 2 : }
676 :
677 : // The broad order of initialization from this point on is the following:
678 : // 1. Statically provisioned configuration (bootstrap) are loaded.
679 : // 2. Cluster manager is created and all primary clusters (i.e. with endpoint assignments
680 : // provisioned statically in bootstrap, discovered through DNS or file based CDS) are
681 : // initialized.
682 : // 3. Various services are initialized and configured using the bootstrap config.
683 : // 4. RTDS is initialized using primary clusters. This allows runtime overrides to be fully
684 : // configured before the rest of xDS configuration is provisioned.
685 : // 5. Secondary clusters (with endpoint assignments provisioned by xDS servers) are initialized.
686 : // 6. The rest of the dynamic configuration is provisioned.
687 : //
688 : // Please note: this order requires that RTDS is provisioned using a primary cluster. If RTDS is
689 : // provisioned through ADS then ADS must use primary cluster as well. This invariant is enforced
690 : // during RTDS initialization and invalid configuration will be rejected.
691 :
692 : // Runtime gets initialized before the main configuration since during main configuration
693 : // load things may grab a reference to the loader for later use.
694 135 : runtime_ = component_factory.createRuntime(*this, initial_config);
695 135 : validation_context_.setRuntime(runtime());
696 :
697 135 : if (!runtime().snapshot().getBoolean("envoy.disallow_global_stats", false)) {
698 131 : assert_action_registration_ = Assert::addDebugAssertionFailureRecordAction(
699 131 : [this](const char*) { server_stats_->debug_assertion_failures_.inc(); });
700 131 : envoy_bug_action_registration_ = Assert::addEnvoyBugFailureRecordAction(
701 131 : [this](const char*) { server_stats_->envoy_bug_failures_.inc(); });
702 131 : }
703 :
704 135 : if (initial_config.admin().address()) {
705 98 : #ifdef ENVOY_ADMIN_FUNCTIONALITY
706 : // Admin instance always be created if admin support is not compiled out.
707 98 : RELEASE_ASSERT(admin_ != nullptr, "Admin instance should be created but actually not.");
708 98 : auto typed_admin = dynamic_cast<AdminImpl*>(admin_.get());
709 98 : RELEASE_ASSERT(typed_admin != nullptr, "Admin implementation is not an AdminImpl.");
710 98 : initial_config.initAdminAccessLog(bootstrap_, typed_admin->factoryContext());
711 98 : admin_->startHttpListener(initial_config.admin().accessLogs(), initial_config.admin().address(),
712 98 : initial_config.admin().socketOptions());
713 : #else
714 : throwEnvoyExceptionOrPanic("Admin address configured but admin support compiled out");
715 : #endif
716 135 : } else {
717 37 : ENVOY_LOG(info, "No admin address given, so no admin HTTP server started.");
718 37 : }
719 135 : if (admin_) {
720 131 : config_tracker_entry_ = admin_->getConfigTracker().add(
721 131 : "bootstrap", [this](const Matchers::StringMatcher&) { return dumpBootstrapConfig(); });
722 131 : }
723 135 : if (initial_config.admin().address()) {
724 98 : admin_->addListenerToHandler(handler_.get());
725 98 : }
726 :
727 : // Once we have runtime we can initialize the SSL context manager.
728 135 : ssl_context_manager_ = createContextManager("ssl_context_manager", time_source_);
729 :
730 135 : cluster_manager_factory_ = std::make_unique<Upstream::ProdClusterManagerFactory>(
731 135 : serverFactoryContext(), stats_store_, thread_local_, http_context_,
732 135 : [this]() -> Network::DnsResolverSharedPtr { return this->getOrCreateDnsResolver(); },
733 135 : *ssl_context_manager_, *secret_manager_, quic_stat_names_, *this);
734 :
735 : // Now the configuration gets parsed. The configuration may start setting
736 : // thread local data per above. See MainImpl::initialize() for why ConfigImpl
737 : // is constructed as part of the InstanceBase and then populated once
738 : // cluster_manager_factory_ is available.
739 135 : config_.initialize(bootstrap_, *this, *cluster_manager_factory_);
740 :
741 : // Instruct the listener manager to create the LDS provider if needed. This must be done later
742 : // because various items do not yet exist when the listener manager is created.
743 135 : if (bootstrap_.dynamic_resources().has_lds_config() ||
744 135 : !bootstrap_.dynamic_resources().lds_resources_locator().empty()) {
745 98 : std::unique_ptr<xds::core::v3::ResourceLocator> lds_resources_locator;
746 98 : if (!bootstrap_.dynamic_resources().lds_resources_locator().empty()) {
747 0 : lds_resources_locator =
748 0 : std::make_unique<xds::core::v3::ResourceLocator>(Config::XdsResourceIdentifier::decodeUrl(
749 0 : bootstrap_.dynamic_resources().lds_resources_locator()));
750 0 : }
751 98 : listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config(),
752 98 : lds_resources_locator.get());
753 98 : }
754 :
755 : // We have to defer RTDS initialization until after the cluster manager is
756 : // instantiated (which in turn relies on runtime...).
757 135 : runtime().initialize(clusterManager());
758 :
759 135 : clusterManager().setPrimaryClustersInitializedCb(
760 135 : [this]() { onClusterManagerPrimaryInitializationComplete(); });
761 :
762 135 : auto& stats_config = config_.statsConfig();
763 135 : for (const Stats::SinkPtr& sink : stats_config.sinks()) {
764 0 : stats_store_.addSink(*sink);
765 0 : }
766 135 : if (!stats_config.flushOnAdmin()) {
767 : // Some of the stat sinks may need dispatcher support so don't flush until the main loop starts.
768 : // Just setup the timer.
769 111 : stat_flush_timer_ = dispatcher_->createTimer([this]() -> void { flushStats(); });
770 111 : stat_flush_timer_->enableTimer(stats_config.flushInterval());
771 111 : }
772 :
773 : // Now that we are initialized, notify the bootstrap extensions.
774 135 : for (auto&& bootstrap_extension : bootstrap_extensions_) {
775 0 : bootstrap_extension->onServerInitialized();
776 0 : }
777 :
778 : // GuardDog (deadlock detection) object and thread setup before workers are
779 : // started and before our own run() loop runs.
780 135 : main_thread_guard_dog_ = maybeCreateGuardDog("main_thread");
781 135 : worker_guard_dog_ = maybeCreateGuardDog("workers");
782 135 : }
783 :
784 111 : void InstanceBase::onClusterManagerPrimaryInitializationComplete() {
785 : // If RTDS was not configured the `onRuntimeReady` callback is immediately invoked.
786 111 : runtime().startRtdsSubscriptions([this]() { onRuntimeReady(); });
787 111 : }
788 :
789 110 : void InstanceBase::onRuntimeReady() {
790 : // Begin initializing secondary clusters after RTDS configuration has been applied.
791 : // Initializing can throw exceptions, so catch these.
792 110 : TRY_ASSERT_MAIN_THREAD {
793 110 : THROW_IF_NOT_OK(clusterManager().initializeSecondaryClusters(bootstrap_));
794 110 : }
795 110 : END_TRY
796 110 : CATCH(const EnvoyException& e, {
797 110 : ENVOY_LOG(warn, "Skipping initialization of secondary cluster: {}", e.what());
798 110 : shutdown();
799 110 : });
800 :
801 110 : if (bootstrap_.has_hds_config()) {
802 1 : const auto& hds_config = bootstrap_.hds_config();
803 1 : async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
804 1 : *config_.clusterManager(), thread_local_, time_source_, *api_, grpc_context_.statNames(),
805 1 : bootstrap_.grpc_async_client_manager_config());
806 1 : TRY_ASSERT_MAIN_THREAD {
807 1 : THROW_IF_NOT_OK(Config::Utility::checkTransportVersion(hds_config));
808 1 : hds_delegate_ = std::make_unique<Upstream::HdsDelegate>(
809 1 : serverFactoryContext(), *stats_store_.rootScope(),
810 1 : Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, hds_config,
811 1 : *stats_store_.rootScope(), false)
812 1 : ->createUncachedRawAsyncClient(),
813 1 : stats_store_, *ssl_context_manager_, info_factory_);
814 1 : }
815 1 : END_TRY
816 1 : CATCH(const EnvoyException& e, {
817 1 : ENVOY_LOG(warn, "Skipping initialization of HDS cluster: {}", e.what());
818 1 : shutdown();
819 1 : });
820 1 : }
821 110 : }
822 :
823 94 : void InstanceBase::startWorkers() {
824 : // The callback will be called after workers are started.
825 94 : listener_manager_->startWorkers(makeOptRefFromPtr(worker_guard_dog_.get()), [this]() {
826 94 : if (isShutdown()) {
827 0 : return;
828 0 : }
829 :
830 94 : initialization_timer_->complete();
831 : // Update server stats as soon as initialization is done.
832 94 : updateServerStats();
833 94 : workers_started_ = true;
834 94 : hooks_.onWorkersStarted();
835 : // At this point we are ready to take traffic and all listening ports are up. Notify our
836 : // parent if applicable that they can stop listening and drain.
837 94 : restarter_.drainParentListeners();
838 94 : drain_manager_->startParentShutdownSequence();
839 94 : });
840 94 : }
841 :
842 : Runtime::LoaderPtr InstanceUtil::createRuntime(Instance& server,
843 131 : Server::Configuration::Initial& config) {
844 131 : #ifdef ENVOY_ENABLE_YAML
845 131 : ENVOY_LOG(info, "runtime: {}", MessageUtil::getYamlStringFromMessage(config.runtime()));
846 131 : #endif
847 131 : return std::make_unique<Runtime::LoaderImpl>(
848 131 : server.dispatcher(), server.threadLocal(), config.runtime(), server.localInfo(),
849 131 : server.stats(), server.api().randomGenerator(),
850 131 : server.messageValidationContext().dynamicValidationVisitor(), server.api());
851 131 : }
852 :
853 134 : void InstanceBase::loadServerFlags(const absl::optional<std::string>& flags_path) {
854 134 : if (!flags_path) {
855 131 : return;
856 131 : }
857 :
858 3 : ENVOY_LOG(info, "server flags path: {}", flags_path.value());
859 3 : if (api_->fileSystem().fileExists(flags_path.value() + "/drain")) {
860 0 : ENVOY_LOG(info, "starting server in drain mode");
861 0 : InstanceBase::failHealthcheck(true);
862 0 : }
863 3 : }
864 :
865 : RunHelper::RunHelper(Instance& instance, const Options& options, Event::Dispatcher& dispatcher,
866 : Upstream::ClusterManager& cm, AccessLog::AccessLogManager& access_log_manager,
867 : Init::Manager& init_manager, OverloadManager& overload_manager,
868 : std::function<void()> post_init_cb)
869 94 : : init_watcher_("RunHelper", [&instance, post_init_cb]() {
870 94 : if (!instance.isShutdown()) {
871 94 : post_init_cb();
872 94 : }
873 98 : }) {
874 : // Setup signals.
875 : // Since signals are not supported on Windows we have an internal definition for `SIGTERM`
876 : // On POSIX it resolves as expected to SIGTERM
877 : // On Windows we use it internally for all the console events that indicate that we should
878 : // terminate the process.
879 98 : if (options.signalHandlingEnabled()) {
880 98 : sigterm_ = dispatcher.listenForSignal(ENVOY_SIGTERM, [&instance]() {
881 0 : ENVOY_LOG(warn, "caught ENVOY_SIGTERM");
882 0 : instance.shutdown();
883 0 : });
884 98 : #ifndef WIN32
885 98 : sigint_ = dispatcher.listenForSignal(SIGINT, [&instance]() {
886 0 : ENVOY_LOG(warn, "caught SIGINT");
887 0 : instance.shutdown();
888 0 : });
889 :
890 98 : sig_usr_1_ = dispatcher.listenForSignal(SIGUSR1, [&access_log_manager]() {
891 0 : ENVOY_LOG(info, "caught SIGUSR1. Reopening access logs.");
892 0 : access_log_manager.reopen();
893 0 : });
894 :
895 98 : sig_hup_ = dispatcher.listenForSignal(SIGHUP, []() {
896 0 : ENVOY_LOG(warn, "caught and eating SIGHUP. See documentation for how to hot restart.");
897 0 : });
898 98 : #endif
899 98 : }
900 :
901 : // Start overload manager before workers.
902 98 : overload_manager.start();
903 :
904 : // If there is no global limit to the number of active connections, warn on startup.
905 98 : if (!overload_manager.getThreadLocalOverloadState().isResourceMonitorEnabled(
906 98 : Server::OverloadProactiveResourceName::GlobalDownstreamMaxConnections) &&
907 98 : !instance.runtime().snapshot().get(Runtime::Keys::GlobalMaxCxRuntimeKey)) {
908 98 : ENVOY_LOG(warn, "There is no configured limit to the number of allowed active downstream "
909 98 : "connections. Configure a "
910 98 : "limit in `envoy.resource_monitors.downstream_connections` resource monitor.");
911 98 : }
912 :
913 : // Register for cluster manager init notification. We don't start serving worker traffic until
914 : // upstream clusters are initialized which may involve running the event loop. Note however that
915 : // this can fire immediately if all clusters have already initialized. Also note that we need
916 : // to guard against shutdown at two different levels since SIGTERM can come in once the run loop
917 : // starts.
918 98 : cm.setInitializedCb([&instance, &init_manager, &cm, this]() {
919 98 : if (instance.isShutdown()) {
920 0 : return;
921 0 : }
922 :
923 98 : const auto type_url = Config::getTypeUrl<envoy::config::route::v3::RouteConfiguration>();
924 : // Pause RDS to ensure that we don't send any requests until we've
925 : // subscribed to all the RDS resources. The subscriptions happen in the init callbacks,
926 : // so we pause RDS until we've completed all the callbacks.
927 98 : Config::ScopedResume maybe_resume_rds;
928 98 : if (cm.adsMux()) {
929 98 : maybe_resume_rds = cm.adsMux()->pause(type_url);
930 98 : }
931 :
932 98 : ENVOY_LOG(info, "all clusters initialized. initializing init manager");
933 98 : init_manager.initialize(init_watcher_);
934 :
935 : // Now that we're execute all the init callbacks we can resume RDS
936 : // as we've subscribed to all the statically defined RDS resources.
937 : // This is done by tearing down the maybe_resume_rds Cleanup object.
938 98 : });
939 98 : }
940 :
941 98 : void InstanceBase::run() {
942 : // RunHelper exists primarily to facilitate testing of how we respond to early shutdown during
943 : // startup (see RunHelperTest in server_test.cc).
944 98 : const auto run_helper = RunHelper(*this, options_, *dispatcher_, clusterManager(),
945 98 : access_log_manager_, init_manager_, overloadManager(), [this] {
946 94 : notifyCallbacksForStage(Stage::PostInit);
947 94 : startWorkers();
948 94 : });
949 :
950 : // Run the main dispatch loop waiting to exit.
951 98 : ENVOY_LOG(info, "starting main dispatch loop");
952 98 : WatchDogSharedPtr watchdog;
953 98 : if (main_thread_guard_dog_) {
954 98 : watchdog = main_thread_guard_dog_->createWatchDog(api_->threadFactory().currentThreadId(),
955 98 : "main_thread", *dispatcher_);
956 98 : }
957 98 : dispatcher_->post([this] { notifyCallbacksForStage(Stage::Startup); });
958 98 : dispatcher_->run(Event::Dispatcher::RunType::Block);
959 98 : ENVOY_LOG(info, "main dispatch loop exited");
960 98 : if (main_thread_guard_dog_) {
961 98 : main_thread_guard_dog_->stopWatching(watchdog);
962 98 : }
963 98 : watchdog.reset();
964 :
965 98 : terminate();
966 98 : }
967 :
968 257 : void InstanceBase::terminate() {
969 257 : if (terminated_) {
970 122 : return;
971 122 : }
972 135 : terminated_ = true;
973 :
974 : // Before starting to shutdown anything else, stop slot destruction updates.
975 135 : thread_local_.shutdownGlobalThreading();
976 :
977 : // Before the workers start exiting we should disable stat threading.
978 135 : stats_store_.shutdownThreading();
979 :
980 : // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
981 135 : std::vector<std::string> muxes = {
982 135 : "envoy.config_mux.new_grpc_mux_factory", "envoy.config_mux.grpc_mux_factory",
983 135 : "envoy.config_mux.delta_grpc_mux_factory", "envoy.config_mux.sotw_grpc_mux_factory"};
984 540 : for (const auto& name : muxes) {
985 540 : auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
986 540 : if (factory) {
987 540 : factory->shutdownAll();
988 540 : }
989 540 : }
990 :
991 135 : if (overload_manager_) {
992 132 : overload_manager_->stop();
993 132 : }
994 :
995 : // Shutdown all the workers now that the main dispatch loop is done.
996 135 : if (listener_manager_ != nullptr) {
997 131 : listener_manager_->stopWorkers();
998 131 : }
999 :
1000 : // Only flush if we have not been hot restarted.
1001 135 : if (stat_flush_timer_) {
1002 111 : flushStats();
1003 111 : }
1004 :
1005 135 : if (config_.clusterManager() != nullptr) {
1006 129 : config_.clusterManager()->shutdown();
1007 129 : }
1008 135 : handler_.reset();
1009 135 : thread_local_.shutdownThread();
1010 135 : restarter_.shutdown();
1011 135 : ENVOY_LOG(info, "exiting");
1012 135 : ENVOY_FLUSH_LOG();
1013 135 : FatalErrorHandler::clearFatalActionsOnTerminate();
1014 135 : }
1015 :
1016 3490 : Runtime::Loader& InstanceBase::runtime() { return *runtime_; }
1017 :
1018 100 : void InstanceBase::shutdown() {
1019 100 : ENVOY_LOG(info, "shutting down server instance");
1020 100 : shutdown_ = true;
1021 100 : restarter_.sendParentTerminateRequest();
1022 100 : notifyCallbacksForStage(Stage::ShutdownExit, [this] { dispatcher_->exit(); });
1023 100 : }
1024 :
1025 0 : void InstanceBase::shutdownAdmin() {
1026 0 : ENVOY_LOG(warn, "shutting down admin due to child startup");
1027 0 : stat_flush_timer_.reset();
1028 0 : handler_->stopListeners();
1029 0 : if (admin_) {
1030 0 : admin_->closeSocket();
1031 0 : }
1032 :
1033 : // If we still have a parent, it should be terminated now that we have a child.
1034 0 : ENVOY_LOG(warn, "terminating parent process");
1035 0 : restarter_.sendParentTerminateRequest();
1036 0 : }
1037 :
1038 : ServerLifecycleNotifier::HandlePtr InstanceBase::registerCallback(Stage stage,
1039 0 : StageCallback callback) {
1040 0 : auto& callbacks = stage_callbacks_[stage];
1041 0 : return std::make_unique<LifecycleCallbackHandle<StageCallback>>(callbacks, callback);
1042 0 : }
1043 :
1044 : ServerLifecycleNotifier::HandlePtr
1045 0 : InstanceBase::registerCallback(Stage stage, StageCallbackWithCompletion callback) {
1046 0 : ASSERT(stage == Stage::ShutdownExit);
1047 0 : auto& callbacks = stage_completable_callbacks_[stage];
1048 0 : return std::make_unique<LifecycleCallbackHandle<StageCallbackWithCompletion>>(callbacks,
1049 0 : callback);
1050 0 : }
1051 :
1052 292 : void InstanceBase::notifyCallbacksForStage(Stage stage, std::function<void()> completion_cb) {
1053 292 : ASSERT_IS_MAIN_OR_TEST_THREAD();
1054 292 : const auto it = stage_callbacks_.find(stage);
1055 292 : if (it != stage_callbacks_.end()) {
1056 0 : for (const StageCallback& callback : it->second) {
1057 0 : callback();
1058 0 : }
1059 0 : }
1060 :
1061 : // Wrap completion_cb so that it only gets invoked when all callbacks for this stage
1062 : // have finished their work.
1063 292 : std::shared_ptr<void> cb_guard(
1064 292 : new Cleanup([this, completion_cb]() { dispatcher_->post(completion_cb); }));
1065 :
1066 : // Registrations which take a completion callback are typically implemented by executing a
1067 : // callback on all worker threads using Slot::runOnAllThreads which will hang indefinitely if
1068 : // worker threads have not been started so we need to skip notifications if envoy is shutdown
1069 : // early before workers have started.
1070 292 : if (workers_started_) {
1071 164 : const auto it2 = stage_completable_callbacks_.find(stage);
1072 164 : if (it2 != stage_completable_callbacks_.end()) {
1073 0 : ENVOY_LOG(info, "Notifying {} callback(s) with completion.", it2->second.size());
1074 0 : for (const StageCallbackWithCompletion& callback : it2->second) {
1075 0 : callback([cb_guard] {});
1076 0 : }
1077 0 : }
1078 164 : }
1079 292 : }
1080 :
1081 98 : ProtobufTypes::MessagePtr InstanceBase::dumpBootstrapConfig() {
1082 98 : auto config_dump = std::make_unique<envoy::admin::v3::BootstrapConfigDump>();
1083 98 : config_dump->mutable_bootstrap()->MergeFrom(bootstrap_);
1084 98 : TimestampUtil::systemClockToTimestamp(bootstrap_config_update_time_,
1085 98 : *(config_dump->mutable_last_updated()));
1086 98 : return config_dump;
1087 98 : }
1088 :
1089 0 : Network::DnsResolverSharedPtr InstanceBase::getOrCreateDnsResolver() {
1090 0 : if (!dns_resolver_) {
1091 0 : envoy::config::core::v3::TypedExtensionConfig typed_dns_resolver_config;
1092 0 : Network::DnsResolverFactory& dns_resolver_factory =
1093 0 : Network::createDnsResolverFactoryFromProto(bootstrap_, typed_dns_resolver_config);
1094 0 : dns_resolver_ =
1095 0 : dns_resolver_factory.createDnsResolver(dispatcher(), api(), typed_dns_resolver_config);
1096 0 : }
1097 0 : return dns_resolver_;
1098 0 : }
1099 :
1100 150 : bool InstanceBase::enableReusePortDefault() { return enable_reuse_port_default_; }
1101 :
1102 : } // namespace Server
1103 : } // namespace Envoy
|