Line data Source code
1 : #include "source/common/upstream/cluster_manager_impl.h"
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <functional>
6 : #include <memory>
7 : #include <string>
8 : #include <vector>
9 :
10 : #include "envoy/admin/v3/config_dump.pb.h"
11 : #include "envoy/config/bootstrap/v3/bootstrap.pb.h"
12 : #include "envoy/config/cluster/v3/cluster.pb.h"
13 : #include "envoy/config/core/v3/config_source.pb.h"
14 : #include "envoy/config/core/v3/protocol.pb.h"
15 : #include "envoy/config/xds_resources_delegate.h"
16 : #include "envoy/event/dispatcher.h"
17 : #include "envoy/network/dns.h"
18 : #include "envoy/runtime/runtime.h"
19 : #include "envoy/stats/scope.h"
20 : #include "envoy/tcp/async_tcp_client.h"
21 : #include "envoy/upstream/load_balancer.h"
22 : #include "envoy/upstream/upstream.h"
23 :
24 : #include "source/common/common/assert.h"
25 : #include "source/common/common/enum_to_int.h"
26 : #include "source/common/common/fmt.h"
27 : #include "source/common/common/utility.h"
28 : #include "source/common/config/custom_config_validators_impl.h"
29 : #include "source/common/config/null_grpc_mux_impl.h"
30 : #include "source/common/config/utility.h"
31 : #include "source/common/config/xds_resource.h"
32 : #include "source/common/grpc/async_client_manager_impl.h"
33 : #include "source/common/http/async_client_impl.h"
34 : #include "source/common/http/http1/conn_pool.h"
35 : #include "source/common/http/http2/conn_pool.h"
36 : #include "source/common/http/mixed_conn_pool.h"
37 : #include "source/common/network/resolver_impl.h"
38 : #include "source/common/network/utility.h"
39 : #include "source/common/protobuf/utility.h"
40 : #include "source/common/router/shadow_writer_impl.h"
41 : #include "source/common/runtime/runtime_features.h"
42 : #include "source/common/tcp/conn_pool.h"
43 : #include "source/common/upstream/cds_api_impl.h"
44 : #include "source/common/upstream/cluster_factory_impl.h"
45 : #include "source/common/upstream/load_balancer_impl.h"
46 : #include "source/common/upstream/priority_conn_pool_map_impl.h"
47 :
48 : #ifdef ENVOY_ENABLE_QUIC
49 : #include "source/common/http/conn_pool_grid.h"
50 : #include "source/common/http/http3/conn_pool.h"
51 : #include "source/common/quic/client_connection_factory_impl.h"
52 : #endif
53 :
54 : namespace Envoy {
55 : namespace Upstream {
56 : namespace {
57 :
58 : void addOptionsIfNotNull(Network::Socket::OptionsSharedPtr& options,
59 434 : const Network::Socket::OptionsSharedPtr& to_add) {
60 434 : if (to_add != nullptr) {
61 183 : Network::Socket::appendOptions(options, to_add);
62 183 : }
63 434 : }
64 :
65 : // Helper function to make sure each protocol in expected_protocols is present
66 : // in protocols (only used for an ASSERT in debug builds)
67 : bool contains(const std::vector<Http::Protocol>& protocols,
68 0 : const std::vector<Http::Protocol>& expected_protocols) {
69 0 : for (auto protocol : expected_protocols) {
70 0 : if (std::find(protocols.begin(), protocols.end(), protocol) == protocols.end()) {
71 0 : return false;
72 0 : }
73 0 : }
74 0 : return true;
75 0 : }
76 :
77 : absl::optional<Http::HttpServerPropertiesCache::Origin>
78 173 : getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostConstSharedPtr host) {
79 173 : std::string sni = std::string(host->transportSocketFactory().defaultServerNameIndication());
80 173 : if (options && options->serverNameOverride().has_value()) {
81 0 : sni = options->serverNameOverride().value();
82 0 : }
83 173 : if (sni.empty() || !host->address() || !host->address()->ip()) {
84 173 : return absl::nullopt;
85 173 : }
86 0 : return {{"https", sni, host->address()->ip()->port()}};
87 173 : }
88 :
89 : bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
90 131 : absl::string_view cluster_name) {
91 131 : if (bootstrap.dynamic_resources().has_ads_config()) {
92 56 : const auto& ads_config_source = bootstrap.dynamic_resources().ads_config();
93 : // We only care about EnvoyGrpc, not GoogleGrpc, because we only need to delay ADS mux
94 : // initialization if it uses an Envoy cluster that needs to be initialized first. We don't
95 : // depend on the same cluster initialization when opening a gRPC stream for GoogleGrpc.
96 56 : return (ads_config_source.grpc_services_size() > 0 &&
97 56 : ads_config_source.grpc_services(0).has_envoy_grpc() &&
98 56 : ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
99 56 : }
100 75 : return false;
101 131 : }
102 :
103 : } // namespace
104 :
105 159 : void ClusterManagerInitHelper::addCluster(ClusterManagerCluster& cm_cluster) {
106 : // See comments in ClusterManagerImpl::addOrUpdateCluster() for why this is only called during
107 : // server initialization.
108 159 : ASSERT(state_ != State::AllClustersInitialized);
109 :
110 159 : const auto initialize_cb = [&cm_cluster, this] {
111 131 : onClusterInit(cm_cluster);
112 131 : cm_cluster.cluster().info()->configUpdateStats().warming_state_.set(0);
113 131 : };
114 159 : Cluster& cluster = cm_cluster.cluster();
115 :
116 159 : cluster.info()->configUpdateStats().warming_state_.set(1);
117 159 : if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
118 : // Remove the previous cluster before the cluster object is destroyed.
119 131 : primary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
120 131 : cluster.initialize(initialize_cb);
121 131 : } else {
122 28 : ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
123 : // Remove the previous cluster before the cluster object is destroyed.
124 28 : secondary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
125 28 : if (started_secondary_initialize_) {
126 : // This can happen if we get a second CDS update that adds new clusters after we have
127 : // already started secondary init. In this case, just immediately initialize.
128 0 : cluster.initialize(initialize_cb);
129 0 : }
130 28 : }
131 :
132 159 : ENVOY_LOG(debug, "cm init: adding: cluster={} primary={} secondary={}", cluster.info()->name(),
133 159 : primary_init_clusters_.size(), secondary_init_clusters_.size());
134 159 : }
135 :
136 159 : void ClusterManagerInitHelper::onClusterInit(ClusterManagerCluster& cluster) {
137 159 : ASSERT(state_ != State::AllClustersInitialized);
138 159 : per_cluster_init_callback_(cluster);
139 159 : removeCluster(cluster);
140 159 : }
141 :
142 159 : void ClusterManagerInitHelper::removeCluster(ClusterManagerCluster& cluster) {
143 159 : if (state_ == State::AllClustersInitialized) {
144 0 : return;
145 0 : }
146 :
147 : // There is a remote edge case where we can remove a cluster via CDS that has not yet been
148 : // initialized. When called via the remove cluster API this code catches that case.
149 159 : absl::flat_hash_map<std::string, ClusterManagerCluster*>* cluster_map;
150 159 : if (cluster.cluster().initializePhase() == Cluster::InitializePhase::Primary) {
151 131 : cluster_map = &primary_init_clusters_;
152 131 : } else {
153 28 : ASSERT(cluster.cluster().initializePhase() == Cluster::InitializePhase::Secondary);
154 28 : cluster_map = &secondary_init_clusters_;
155 28 : }
156 :
157 : // It is possible that the cluster we are removing has already been initialized, and is not
158 : // present in the initializer map. If so, this is fine as a CDS update may happen for a
159 : // cluster with the same name. See the case "UpdateAlreadyInitialized" of the
160 : // target //test/common/upstream:cluster_manager_impl_test.
161 159 : auto iter = cluster_map->find(cluster.cluster().info()->name());
162 159 : if (iter != cluster_map->end() && iter->second == &cluster) {
163 159 : cluster_map->erase(iter);
164 159 : }
165 159 : ENVOY_LOG(debug, "cm init: init complete: cluster={} primary={} secondary={}",
166 159 : cluster.cluster().info()->name(), primary_init_clusters_.size(),
167 159 : secondary_init_clusters_.size());
168 159 : maybeFinishInitialize();
169 159 : }
170 :
171 28 : void ClusterManagerInitHelper::initializeSecondaryClusters() {
172 28 : started_secondary_initialize_ = true;
173 : // Cluster::initialize() method can modify the map of secondary_init_clusters_ to remove
174 : // the item currently being initialized, so we eschew range-based-for and do this complicated
175 : // dance to increment the iterator before calling initialize.
176 56 : for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) {
177 28 : ClusterManagerCluster* cluster = iter->second;
178 28 : ENVOY_LOG(debug, "initializing secondary cluster {}", iter->first);
179 28 : ++iter;
180 28 : cluster->cluster().initialize([cluster, this] { onClusterInit(*cluster); });
181 28 : }
182 28 : }
183 :
184 426 : void ClusterManagerInitHelper::maybeFinishInitialize() {
185 : // Do not do anything if we are still doing the initial static load or if we are waiting for
186 : // CDS initialize.
187 426 : ENVOY_LOG(debug, "maybe finish initialize state: {}", enumToInt(state_));
188 426 : if (state_ == State::Loading || state_ == State::WaitingToStartCdsInitialization) {
189 131 : return;
190 131 : }
191 :
192 295 : ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
193 295 : state_ == State::CdsInitialized ||
194 295 : state_ == State::WaitingForPrimaryInitializationToComplete);
195 295 : ENVOY_LOG(debug, "maybe finish initialize primary init clusters empty: {}",
196 295 : primary_init_clusters_.empty());
197 : // If we are still waiting for primary clusters to initialize, do nothing.
198 295 : if (!primary_init_clusters_.empty()) {
199 0 : return;
200 295 : } else if (state_ == State::WaitingForPrimaryInitializationToComplete) {
201 129 : state_ = State::WaitingToStartSecondaryInitialization;
202 129 : if (primary_clusters_initialized_callback_) {
203 0 : primary_clusters_initialized_callback_();
204 0 : }
205 129 : return;
206 129 : }
207 :
208 : // If we are still waiting for secondary clusters to initialize, see if we need to first call
209 : // initialize on them. This is only done once.
210 166 : ENVOY_LOG(debug, "maybe finish initialize secondary init clusters empty: {}",
211 166 : secondary_init_clusters_.empty());
212 166 : if (!secondary_init_clusters_.empty()) {
213 28 : if (!started_secondary_initialize_) {
214 28 : ENVOY_LOG(info, "cm init: initializing secondary clusters");
215 : // If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
216 : // should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
217 : // avoid double pause ClusterLoadAssignment.
218 28 : Config::ScopedResume maybe_resume_eds_leds_sds;
219 28 : if (cm_.adsMux()) {
220 28 : const std::vector<std::string> paused_xds_types{
221 28 : Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(),
222 28 : Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>(),
223 28 : Config::getTypeUrl<envoy::extensions::transport_sockets::tls::v3::Secret>()};
224 28 : maybe_resume_eds_leds_sds = cm_.adsMux()->pause(paused_xds_types);
225 28 : }
226 28 : initializeSecondaryClusters();
227 28 : }
228 28 : return;
229 28 : }
230 :
231 : // At this point, if we are doing static init, and we have CDS, start CDS init. Otherwise, move
232 : // directly to initialized.
233 138 : started_secondary_initialize_ = false;
234 138 : ENVOY_LOG(debug, "maybe finish initialize cds api ready: {}", cds_ != nullptr);
235 138 : if (state_ == State::WaitingToStartSecondaryInitialization && cds_) {
236 28 : ENVOY_LOG(info, "cm init: initializing cds");
237 28 : state_ = State::WaitingToStartCdsInitialization;
238 28 : cds_->initialize();
239 110 : } else {
240 110 : ENVOY_LOG(info, "cm init: all clusters initialized");
241 110 : state_ = State::AllClustersInitialized;
242 110 : if (initialized_callback_) {
243 28 : initialized_callback_();
244 28 : }
245 110 : }
246 138 : }
247 :
248 129 : void ClusterManagerInitHelper::onStaticLoadComplete() {
249 129 : ASSERT(state_ == State::Loading);
250 : // After initialization of primary clusters has completed, transition to
251 : // waiting for signal to initialize secondary clusters and then CDS.
252 129 : state_ = State::WaitingForPrimaryInitializationToComplete;
253 129 : maybeFinishInitialize();
254 129 : }
255 :
256 110 : void ClusterManagerInitHelper::startInitializingSecondaryClusters() {
257 110 : ASSERT(state_ == State::WaitingToStartSecondaryInitialization);
258 110 : ENVOY_LOG(debug, "continue initializing secondary clusters");
259 110 : maybeFinishInitialize();
260 110 : }
261 :
262 129 : void ClusterManagerInitHelper::setCds(CdsApi* cds) {
263 129 : ASSERT(state_ == State::Loading);
264 129 : cds_ = cds;
265 129 : if (cds_) {
266 28 : cds_->setInitializedCb([this]() -> void {
267 28 : ASSERT(state_ == State::WaitingToStartCdsInitialization);
268 28 : state_ = State::CdsInitialized;
269 28 : maybeFinishInitialize();
270 28 : });
271 28 : }
272 129 : }
273 :
274 : void ClusterManagerInitHelper::setInitializedCb(
275 98 : ClusterManager::InitializationCompleteCallback callback) {
276 98 : if (state_ == State::AllClustersInitialized) {
277 70 : callback();
278 98 : } else {
279 28 : initialized_callback_ = callback;
280 28 : }
281 98 : }
282 :
283 : void ClusterManagerInitHelper::setPrimaryClustersInitializedCb(
284 111 : ClusterManager::PrimaryClustersReadyCallback callback) {
285 : // The callback must be set before or at the `WaitingToStartSecondaryInitialization` state.
286 111 : ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
287 111 : state_ == State::WaitingForPrimaryInitializationToComplete || state_ == State::Loading);
288 111 : if (state_ == State::WaitingToStartSecondaryInitialization) {
289 : // This is the case where all clusters are STATIC and without health checking.
290 111 : callback();
291 111 : } else {
292 0 : primary_clusters_initialized_callback_ = callback;
293 0 : }
294 111 : }
295 :
296 : ClusterManagerImpl::ClusterManagerImpl(
297 : const envoy::config::bootstrap::v3::Bootstrap& bootstrap, ClusterManagerFactory& factory,
298 : Stats::Store& stats, ThreadLocal::Instance& tls, Runtime::Loader& runtime,
299 : const LocalInfo::LocalInfo& local_info, AccessLog::AccessLogManager& log_manager,
300 : Event::Dispatcher& main_thread_dispatcher, OptRef<Server::Admin> admin,
301 : ProtobufMessage::ValidationContext& validation_context, Api::Api& api,
302 : Http::Context& http_context, Grpc::Context& grpc_context, Router::Context& router_context,
303 : const Server::Instance& server)
304 : : server_(server), factory_(factory), runtime_(runtime), stats_(stats), tls_(tls),
305 : random_(api.randomGenerator()),
306 : deferred_cluster_creation_(bootstrap.cluster_manager().enable_deferred_cluster_creation()),
307 : bind_config_(bootstrap.cluster_manager().has_upstream_bind_config()
308 : ? absl::make_optional(bootstrap.cluster_manager().upstream_bind_config())
309 : : absl::nullopt),
310 : local_info_(local_info), cm_stats_(generateStats(*stats.rootScope())),
311 159 : init_helper_(*this, [this](ClusterManagerCluster& cluster) { onClusterInit(cluster); }),
312 : time_source_(main_thread_dispatcher.timeSource()), dispatcher_(main_thread_dispatcher),
313 : http_context_(http_context), validation_context_(validation_context),
314 : router_context_(router_context), cluster_stat_names_(stats.symbolTable()),
315 : cluster_config_update_stat_names_(stats.symbolTable()),
316 : cluster_lb_stat_names_(stats.symbolTable()),
317 : cluster_endpoint_stat_names_(stats.symbolTable()),
318 : cluster_load_report_stat_names_(stats.symbolTable()),
319 : cluster_circuit_breakers_stat_names_(stats.symbolTable()),
320 : cluster_request_response_size_stat_names_(stats.symbolTable()),
321 : cluster_timeout_budget_stat_names_(stats.symbolTable()),
322 : common_lb_config_pool_(
323 : std::make_shared<SharedPool::ObjectSharedPool<
324 : const envoy::config::cluster::v3::Cluster::CommonLbConfig, MessageUtil, MessageUtil>>(
325 : main_thread_dispatcher)),
326 131 : shutdown_(false) {
327 131 : if (admin.has_value()) {
328 131 : config_tracker_entry_ = admin->getConfigTracker().add(
329 131 : "clusters", [this](const Matchers::StringMatcher& name_matcher) {
330 98 : return dumpClusterConfigs(name_matcher);
331 98 : });
332 131 : }
333 131 : async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
334 131 : *this, tls, time_source_, api, grpc_context.statNames(),
335 131 : bootstrap.grpc_async_client_manager_config());
336 131 : const auto& cm_config = bootstrap.cluster_manager();
337 131 : if (cm_config.has_outlier_detection()) {
338 0 : const std::string event_log_file_path = cm_config.outlier_detection().event_log_path();
339 0 : if (!event_log_file_path.empty()) {
340 0 : outlier_event_logger_ = std::make_shared<Outlier::EventLoggerImpl>(
341 0 : log_manager, event_log_file_path, time_source_);
342 0 : }
343 0 : }
344 :
345 : // We need to know whether we're zone aware early on, so make sure we do this lookup
346 : // before we load any clusters.
347 131 : if (!cm_config.local_cluster_name().empty()) {
348 0 : local_cluster_name_ = cm_config.local_cluster_name();
349 0 : }
350 :
351 : // Initialize the XdsResourceDelegate extension, if set on the bootstrap config.
352 131 : if (bootstrap.has_xds_delegate_extension()) {
353 0 : auto& factory = Config::Utility::getAndCheckFactory<Config::XdsResourcesDelegateFactory>(
354 0 : bootstrap.xds_delegate_extension());
355 0 : xds_resources_delegate_ = factory.createXdsResourcesDelegate(
356 0 : bootstrap.xds_delegate_extension().typed_config(),
357 0 : validation_context.dynamicValidationVisitor(), api, main_thread_dispatcher);
358 0 : }
359 :
360 131 : if (bootstrap.has_xds_config_tracker_extension()) {
361 0 : auto& tracer_factory = Config::Utility::getAndCheckFactory<Config::XdsConfigTrackerFactory>(
362 0 : bootstrap.xds_config_tracker_extension());
363 0 : xds_config_tracker_ = tracer_factory.createXdsConfigTracker(
364 0 : bootstrap.xds_config_tracker_extension().typed_config(),
365 0 : validation_context.dynamicValidationVisitor(), api, main_thread_dispatcher);
366 0 : }
367 :
368 131 : subscription_factory_ = std::make_unique<Config::SubscriptionFactoryImpl>(
369 131 : local_info, main_thread_dispatcher, *this, validation_context.dynamicValidationVisitor(), api,
370 131 : server, makeOptRefFromPtr(xds_resources_delegate_.get()),
371 131 : makeOptRefFromPtr(xds_config_tracker_.get()));
372 131 : }
373 :
374 131 : absl::Status ClusterManagerImpl::init(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
375 131 : ASSERT(!initialized_);
376 131 : initialized_ = true;
377 :
378 : // Cluster loading happens in two phases: first all the primary clusters are loaded, and then all
379 : // the secondary clusters are loaded. As it currently stands all non-EDS clusters and EDS which
380 : // load endpoint definition from file are primary and
381 : // (REST,GRPC,DELTA_GRPC) EDS clusters are secondary. This two phase
382 : // loading is done because in v2 configuration each EDS cluster individually sets up a
383 : // subscription. When this subscription is an API source the cluster will depend on a non-EDS
384 : // cluster, so the non-EDS clusters must be loaded first.
385 295 : auto is_primary_cluster = [](const envoy::config::cluster::v3::Cluster& cluster) -> bool {
386 262 : return cluster.type() != envoy::config::cluster::v3::Cluster::EDS ||
387 262 : (cluster.type() == envoy::config::cluster::v3::Cluster::EDS &&
388 0 : Config::SubscriptionFactory::isPathBasedConfigSource(
389 0 : cluster.eds_cluster_config().eds_config().config_source_specifier_case()));
390 262 : };
391 : // Build book-keeping for which clusters are primary. This is useful when we
392 : // invoke loadCluster() below and it needs the complete set of primaries.
393 164 : for (const auto& cluster : bootstrap.static_resources().clusters()) {
394 131 : if (is_primary_cluster(cluster)) {
395 131 : primary_clusters_.insert(cluster.name());
396 131 : }
397 131 : }
398 :
399 131 : bool has_ads_cluster = false;
400 : // Load all the primary clusters.
401 164 : for (const auto& cluster : bootstrap.static_resources().clusters()) {
402 131 : if (is_primary_cluster(cluster)) {
403 131 : const bool required_for_ads = isBlockingAdsCluster(bootstrap, cluster.name());
404 131 : has_ads_cluster |= required_for_ads;
405 : // TODO(abeyad): Consider passing a lambda for a "post-cluster-init" callback, which would
406 : // include a conditional ads_mux_->start() call, if other uses cases for "post-cluster-init"
407 : // functionality pops up.
408 131 : auto status_or_cluster =
409 131 : loadCluster(cluster, MessageUtil::hash(cluster), "", /*added_via_api=*/false,
410 131 : required_for_ads, active_clusters_);
411 131 : RETURN_IF_STATUS_NOT_OK(status_or_cluster);
412 131 : }
413 131 : }
414 :
415 131 : const auto& dyn_resources = bootstrap.dynamic_resources();
416 :
417 : // Now setup ADS if needed, this might rely on a primary cluster.
418 : // This is the only point where distinction between delta ADS and state-of-the-world ADS is made.
419 : // After here, we just have a GrpcMux interface held in ads_mux_, which hides
420 : // whether the backing implementation is delta or SotW.
421 131 : if (dyn_resources.has_ads_config()) {
422 31 : Config::CustomConfigValidatorsPtr custom_config_validators =
423 31 : std::make_unique<Config::CustomConfigValidatorsImpl>(
424 31 : validation_context_.dynamicValidationVisitor(), server_,
425 31 : dyn_resources.ads_config().config_validators());
426 :
427 31 : JitteredExponentialBackOffStrategyPtr backoff_strategy =
428 31 : Config::Utility::prepareJitteredExponentialBackOffStrategy(
429 31 : dyn_resources.ads_config(), random_,
430 31 : Envoy::Config::SubscriptionFactory::RetryInitialDelayMs,
431 31 : Envoy::Config::SubscriptionFactory::RetryMaxDelayMs);
432 :
433 31 : const bool use_eds_cache =
434 31 : Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads");
435 31 : if (dyn_resources.ads_config().api_type() ==
436 31 : envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
437 8 : absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
438 8 : RETURN_IF_NOT_OK(status);
439 8 : std::string name;
440 8 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
441 4 : name = "envoy.config_mux.delta_grpc_mux_factory";
442 4 : } else {
443 4 : name = "envoy.config_mux.new_grpc_mux_factory";
444 4 : }
445 8 : auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
446 8 : if (!factory) {
447 0 : return absl::InvalidArgumentError(fmt::format("{} not found", name));
448 0 : }
449 8 : ads_mux_ = factory->create(
450 8 : Config::Utility::factoryForGrpcApiConfigSource(
451 8 : *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false)
452 8 : ->createUncachedRawAsyncClient(),
453 8 : dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
454 8 : std::move(custom_config_validators), std::move(backoff_strategy),
455 8 : makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
456 23 : } else {
457 23 : absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
458 23 : RETURN_IF_NOT_OK(status);
459 23 : auto xds_delegate_opt_ref = makeOptRefFromPtr(xds_resources_delegate_.get());
460 23 : std::string name;
461 23 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
462 10 : name = "envoy.config_mux.sotw_grpc_mux_factory";
463 13 : } else {
464 13 : name = "envoy.config_mux.grpc_mux_factory";
465 13 : }
466 :
467 23 : auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
468 23 : if (!factory) {
469 0 : return absl::InvalidArgumentError(fmt::format("{} not found", name));
470 0 : }
471 23 : ads_mux_ = factory->create(
472 23 : Config::Utility::factoryForGrpcApiConfigSource(
473 23 : *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false)
474 23 : ->createUncachedRawAsyncClient(),
475 23 : dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
476 23 : std::move(custom_config_validators), std::move(backoff_strategy),
477 23 : makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, use_eds_cache);
478 23 : }
479 128 : } else {
480 100 : ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
481 100 : }
482 :
483 : // After ADS is initialized, load EDS static clusters as EDS config may potentially need ADS.
484 164 : for (const auto& cluster : bootstrap.static_resources().clusters()) {
485 : // Now load all the secondary clusters.
486 131 : if (cluster.type() == envoy::config::cluster::v3::Cluster::EDS &&
487 131 : !Config::SubscriptionFactory::isPathBasedConfigSource(
488 0 : cluster.eds_cluster_config().eds_config().config_source_specifier_case())) {
489 0 : const bool required_for_ads = isBlockingAdsCluster(bootstrap, cluster.name());
490 0 : has_ads_cluster |= required_for_ads;
491 0 : auto status_or_cluster =
492 0 : loadCluster(cluster, MessageUtil::hash(cluster), "", /*added_via_api=*/false,
493 0 : required_for_ads, active_clusters_);
494 0 : if (!status_or_cluster.status().ok()) {
495 0 : return status_or_cluster.status();
496 0 : }
497 0 : }
498 131 : }
499 :
500 131 : cm_stats_.cluster_added_.add(bootstrap.static_resources().clusters().size());
501 131 : updateClusterCounts();
502 :
503 131 : absl::optional<ThreadLocalClusterManagerImpl::LocalClusterParams> local_cluster_params;
504 131 : if (local_cluster_name_) {
505 0 : auto local_cluster = active_clusters_.find(local_cluster_name_.value());
506 0 : if (local_cluster == active_clusters_.end()) {
507 0 : return absl::InvalidArgumentError(
508 0 : fmt::format("local cluster '{}' must be defined", local_cluster_name_.value()));
509 0 : }
510 0 : local_cluster_params.emplace();
511 0 : local_cluster_params->info_ = local_cluster->second->cluster().info();
512 0 : local_cluster_params->load_balancer_factory_ = local_cluster->second->loadBalancerFactory();
513 0 : local_cluster->second->setAddedOrUpdated();
514 0 : }
515 :
516 : // Once the initial set of static bootstrap clusters are created (including the local cluster),
517 : // we can instantiate the thread local cluster manager.
518 225 : tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) {
519 223 : return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_params);
520 223 : });
521 :
522 : // We can now potentially create the CDS API once the backing cluster exists.
523 131 : if (dyn_resources.has_cds_config() || !dyn_resources.cds_resources_locator().empty()) {
524 28 : std::unique_ptr<xds::core::v3::ResourceLocator> cds_resources_locator;
525 28 : if (!dyn_resources.cds_resources_locator().empty()) {
526 0 : cds_resources_locator = std::make_unique<xds::core::v3::ResourceLocator>(
527 0 : Config::XdsResourceIdentifier::decodeUrl(dyn_resources.cds_resources_locator()));
528 0 : }
529 28 : cds_api_ = factory_.createCds(dyn_resources.cds_config(), cds_resources_locator.get(), *this);
530 28 : init_helper_.setCds(cds_api_.get());
531 131 : } else {
532 103 : init_helper_.setCds(nullptr);
533 103 : }
534 :
535 : // Proceed to add all static bootstrap clusters to the init manager. This will immediately
536 : // initialize any primary clusters. Post-init processing further initializes any thread
537 : // aware load balancer and sets up the per-worker host set updates.
538 164 : for (auto& cluster : active_clusters_) {
539 131 : init_helper_.addCluster(*cluster.second);
540 131 : }
541 :
542 : // Potentially move to secondary initialization on the static bootstrap clusters if all primary
543 : // clusters have already initialized. (E.g., if all static).
544 131 : init_helper_.onStaticLoadComplete();
545 :
546 131 : if (!has_ads_cluster) {
547 : // There is no ADS cluster, so we won't be starting the ADS mux after a cluster has finished
548 : // initializing, so we must start ADS here.
549 101 : ads_mux_->start();
550 101 : }
551 131 : return absl::OkStatus();
552 131 : }
553 :
554 : absl::Status ClusterManagerImpl::initializeSecondaryClusters(
555 110 : const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
556 110 : init_helper_.startInitializingSecondaryClusters();
557 :
558 110 : const auto& cm_config = bootstrap.cluster_manager();
559 110 : if (cm_config.has_load_stats_config()) {
560 2 : const auto& load_stats_config = cm_config.load_stats_config();
561 :
562 2 : absl::Status status = Config::Utility::checkTransportVersion(load_stats_config);
563 2 : RETURN_IF_NOT_OK(status);
564 2 : load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
565 2 : local_info_, *this, *stats_.rootScope(),
566 2 : Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, load_stats_config,
567 2 : *stats_.rootScope(), false)
568 2 : ->createUncachedRawAsyncClient(),
569 2 : dispatcher_);
570 2 : }
571 110 : return absl::OkStatus();
572 110 : }
573 :
574 131 : ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) {
575 131 : const std::string final_prefix = "cluster_manager.";
576 131 : return {ALL_CLUSTER_MANAGER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix),
577 131 : POOL_GAUGE_PREFIX(scope, final_prefix))};
578 131 : }
579 :
580 : ThreadLocalClusterManagerStats
581 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::generateStats(Stats::Scope& scope,
582 223 : const std::string& thread_name) {
583 223 : const std::string final_prefix = absl::StrCat("thread_local_cluster_manager.", thread_name);
584 223 : return {ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(POOL_GAUGE_PREFIX(scope, final_prefix))};
585 223 : }
586 :
587 159 : void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) {
588 : // This routine is called when a cluster has finished initializing. The cluster has not yet
589 : // been setup for cross-thread updates to avoid needless updates during initialization. The order
590 : // of operations here is important. We start by initializing the thread aware load balancer if
591 : // needed. This must happen first so cluster updates are heard first by the load balancer.
592 : // Also, it assures that all of clusters which this function is called should be always active.
593 159 : auto& cluster = cm_cluster.cluster();
594 159 : auto cluster_data = warming_clusters_.find(cluster.info()->name());
595 : // We have a situation that clusters will be immediately active, such as static and primary
596 : // cluster. So we must have this prevention logic here.
597 159 : if (cluster_data != warming_clusters_.end()) {
598 28 : clusterWarmingToActive(cluster.info()->name());
599 28 : updateClusterCounts();
600 28 : }
601 159 : cluster_data = active_clusters_.find(cluster.info()->name());
602 :
603 159 : if (cluster_data->second->thread_aware_lb_ != nullptr) {
604 159 : cluster_data->second->thread_aware_lb_->initialize();
605 159 : }
606 :
607 : // Now setup for cross-thread updates.
608 : // This is used by cluster types such as EDS clusters to drain the connection pools of removed
609 : // hosts.
610 159 : cluster_data->second->member_update_cb_ = cluster.prioritySet().addMemberUpdateCb(
611 159 : [&cluster, this](const HostVector&, const HostVector& hosts_removed) -> void {
612 28 : if (cluster.info()->lbConfig().close_connections_on_host_set_change()) {
613 0 : for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
614 : // This will drain all tcp and http connection pools.
615 0 : postThreadLocalRemoveHosts(cluster, host_set->hosts());
616 0 : }
617 28 : } else {
618 : // TODO(snowp): Should this be subject to merge windows?
619 :
620 : // Whenever hosts are removed from the cluster, we make each TLS cluster drain it's
621 : // connection pools for the removed hosts. If `close_connections_on_host_set_change` is
622 : // enabled, this case will be covered by first `if` statement, where all
623 : // connection pools are drained.
624 28 : if (!hosts_removed.empty()) {
625 0 : postThreadLocalRemoveHosts(cluster, hosts_removed);
626 0 : }
627 28 : }
628 28 : });
629 :
630 : // This is used by cluster types such as EDS clusters to update the cluster
631 : // without draining the cluster.
632 159 : cluster_data->second->priority_update_cb_ = cluster.prioritySet().addPriorityUpdateCb(
633 159 : [&cm_cluster, this](uint32_t priority, const HostVector& hosts_added,
634 159 : const HostVector& hosts_removed) {
635 : // This fires when a cluster is about to have an updated member set. We need to send this
636 : // out to all of the thread local configurations.
637 :
638 : // Should we save this update and merge it with other updates?
639 : //
640 : // Note that we can only _safely_ merge updates that have no added/removed hosts. That is,
641 : // only those updates that signal a change in host healthcheck state, weight or metadata.
642 : //
643 : // We've discussed merging updates related to hosts being added/removed, but it's really
644 : // tricky to merge those given that downstream consumers of these updates expect to see the
645 : // full list of updates, not a condensed one. This is because they use the broadcasted
646 : // HostSharedPtrs within internal maps to track hosts. If we fail to broadcast the entire
647 : // list of removals, these maps will leak those HostSharedPtrs.
648 : //
649 : // See https://github.com/envoyproxy/envoy/pull/3941 for more context.
650 0 : bool scheduled = false;
651 0 : const auto merge_timeout = PROTOBUF_GET_MS_OR_DEFAULT(
652 0 : cm_cluster.cluster().info()->lbConfig(), update_merge_window, 1000);
653 : // Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes.
654 0 : const bool is_mergeable = hosts_added.empty() && hosts_removed.empty();
655 :
656 0 : if (merge_timeout > 0) {
657 : // If this is not mergeable, we should cancel any scheduled updates since
658 : // we'll deliver it immediately.
659 0 : scheduled = scheduleUpdate(cm_cluster, priority, is_mergeable, merge_timeout);
660 0 : }
661 :
662 : // If an update was not scheduled for later, deliver it immediately.
663 0 : if (!scheduled) {
664 0 : cm_stats_.cluster_updated_.inc();
665 0 : postThreadLocalClusterUpdate(
666 0 : cm_cluster, ThreadLocalClusterUpdateParams(priority, hosts_added, hosts_removed));
667 0 : }
668 0 : });
669 :
670 : // Finally, post updates cross-thread so the per-thread load balancers are ready. First we
671 : // populate any update information that may be available after cluster init.
672 159 : ThreadLocalClusterUpdateParams params;
673 159 : for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
674 159 : if (host_set->hosts().empty()) {
675 0 : continue;
676 0 : }
677 159 : params.per_priority_update_params_.emplace_back(host_set->priority(), host_set->hosts(),
678 159 : HostVector{});
679 159 : }
680 : // NOTE: In all cases *other* than the local cluster, this is when a cluster is added/updated
681 : // The local cluster must currently be statically defined and must exist prior to other
682 : // clusters being added/updated. We could gate the below update on hosts being available on
683 : // the cluster or the cluster not already existing, but the special logic is not worth it.
684 159 : postThreadLocalClusterUpdate(cm_cluster, std::move(params));
685 159 : }
686 :
687 : bool ClusterManagerImpl::scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority,
688 0 : bool mergeable, const uint64_t timeout) {
689 : // Find pending updates for this cluster.
690 0 : auto& updates_by_prio = updates_map_[cluster.cluster().info()->name()];
691 0 : if (!updates_by_prio) {
692 0 : updates_by_prio = std::make_unique<PendingUpdatesByPriorityMap>();
693 0 : }
694 :
695 : // Find pending updates for this priority.
696 0 : auto& updates = (*updates_by_prio)[priority];
697 0 : if (!updates) {
698 0 : updates = std::make_unique<PendingUpdates>();
699 0 : }
700 :
701 : // Has an update_merge_window gone by since the last update? If so, don't schedule
702 : // the update so it can be applied immediately. Ditto if this is not a mergeable update.
703 0 : const auto delta = time_source_.monotonicTime() - updates->last_updated_;
704 0 : const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
705 0 : const bool out_of_merge_window = delta_ms > timeout;
706 0 : if (out_of_merge_window || !mergeable) {
707 : // If there was a pending update, we cancel the pending merged update.
708 : //
709 : // Note: it's possible that even though we are outside of a merge window (delta_ms > timeout),
710 : // a timer is enabled. This race condition is fine, since we'll disable the timer here and
711 : // deliver the update immediately.
712 :
713 : // Why wasn't the update scheduled for later delivery? We keep some stats that are helpful
714 : // to understand why merging did not happen. There's 2 things we are tracking here:
715 :
716 : // 1) Was this update out of a merge window?
717 0 : if (mergeable && out_of_merge_window) {
718 0 : cm_stats_.update_out_of_merge_window_.inc();
719 0 : }
720 :
721 : // 2) Were there previous updates that we are cancelling (and delivering immediately)?
722 0 : if (updates->disableTimer()) {
723 0 : cm_stats_.update_merge_cancelled_.inc();
724 0 : }
725 :
726 0 : updates->last_updated_ = time_source_.monotonicTime();
727 0 : return false;
728 0 : }
729 :
730 : // If there's no timer, create one.
731 0 : if (updates->timer_ == nullptr) {
732 0 : updates->timer_ = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void {
733 0 : applyUpdates(cluster, priority, *updates);
734 0 : });
735 0 : }
736 :
737 : // Ensure there's a timer set to deliver these updates.
738 0 : if (!updates->timer_->enabled()) {
739 0 : updates->enableTimer(timeout);
740 0 : }
741 :
742 0 : return true;
743 0 : }
744 :
745 : void ClusterManagerImpl::applyUpdates(ClusterManagerCluster& cluster, uint32_t priority,
746 0 : PendingUpdates& updates) {
747 : // Deliver pending updates.
748 :
749 : // Remember that these merged updates are _only_ for updates related to
750 : // HC/weight/metadata changes. That's why added/removed are empty. All
751 : // adds/removals were already immediately broadcasted.
752 0 : static const HostVector hosts_added;
753 0 : static const HostVector hosts_removed;
754 :
755 0 : postThreadLocalClusterUpdate(
756 0 : cluster, ThreadLocalClusterUpdateParams(priority, hosts_added, hosts_removed));
757 :
758 0 : cm_stats_.cluster_updated_via_merge_.inc();
759 0 : updates.last_updated_ = time_source_.monotonicTime();
760 0 : }
761 :
762 : bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cluster& cluster,
763 28 : const std::string& version_info) {
764 : // First we need to see if this new config is new or an update to an existing dynamic cluster.
765 : // We don't allow updates to statically configured clusters in the main configuration. We check
766 : // both the warming clusters and the active clusters to see if we need an update or the update
767 : // should be blocked.
768 28 : const std::string& cluster_name = cluster.name();
769 28 : const auto existing_active_cluster = active_clusters_.find(cluster_name);
770 28 : const auto existing_warming_cluster = warming_clusters_.find(cluster_name);
771 28 : const uint64_t new_hash = MessageUtil::hash(cluster);
772 28 : if (existing_warming_cluster != warming_clusters_.end()) {
773 : // If the cluster is the same as the warming cluster of the same name, block the update.
774 0 : if (existing_warming_cluster->second->blockUpdate(new_hash)) {
775 0 : return false;
776 0 : }
777 : // NB: https://github.com/envoyproxy/envoy/issues/14598
778 : // Always proceed if the cluster is different from the existing warming cluster.
779 28 : } else if (existing_active_cluster != active_clusters_.end() &&
780 28 : existing_active_cluster->second->blockUpdate(new_hash)) {
781 : // If there's no warming cluster of the same name, and if the cluster is the same as the active
782 : // cluster of the same name, block the update.
783 0 : return false;
784 0 : }
785 :
786 28 : if (existing_active_cluster != active_clusters_.end() ||
787 28 : existing_warming_cluster != warming_clusters_.end()) {
788 0 : if (existing_active_cluster != active_clusters_.end()) {
789 : // The following init manager remove call is a NOP in the case we are already initialized.
790 : // It's just kept here to avoid additional logic.
791 0 : init_helper_.removeCluster(*existing_active_cluster->second);
792 0 : }
793 0 : cm_stats_.cluster_modified_.inc();
794 28 : } else {
795 28 : cm_stats_.cluster_added_.inc();
796 28 : }
797 :
798 : // There are two discrete paths here depending on when we are adding/updating a cluster.
799 : // 1) During initial server load we use the init manager which handles complex logic related to
800 : // primary/secondary init, static/CDS init, warming all clusters, etc.
801 : // 2) After initial server load, we handle warming independently for each cluster in the warming
802 : // map.
803 : // Note: It's likely possible that all warming logic could be centralized in the init manager, but
804 : // a decision was made to split the logic given how complex the init manager already is. In
805 : // the future we may decide to undergo a refactor to unify the logic but the effort/risk to
806 : // do that right now does not seem worth it given that the logic is generally pretty clean
807 : // and easy to understand.
808 28 : const bool all_clusters_initialized =
809 28 : init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
810 : // Preserve the previous cluster data to avoid early destroy. The same cluster should be added
811 : // before destroy to avoid early initialization complete.
812 28 : auto status_or_cluster = loadCluster(cluster, new_hash, version_info, /*added_via_api=*/true,
813 28 : /*required_for_ads=*/false, warming_clusters_);
814 28 : THROW_IF_STATUS_NOT_OK(status_or_cluster, throw);
815 28 : const ClusterDataPtr previous_cluster = std::move(status_or_cluster.value());
816 28 : auto& cluster_entry = warming_clusters_.at(cluster_name);
817 28 : cluster_entry->cluster_->info()->configUpdateStats().warming_state_.set(1);
818 28 : if (!all_clusters_initialized) {
819 28 : ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
820 28 : init_helper_.addCluster(*cluster_entry);
821 28 : } else {
822 0 : ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
823 0 : cluster_entry->cluster_->initialize([this, cluster_name] {
824 0 : ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
825 0 : auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
826 0 : state_changed_cluster_entry->second->cluster_->info()->configUpdateStats().warming_state_.set(
827 0 : 0);
828 0 : onClusterInit(*state_changed_cluster_entry->second);
829 0 : });
830 0 : }
831 :
832 28 : return true;
833 28 : }
834 :
835 28 : void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
836 28 : auto warming_it = warming_clusters_.find(cluster_name);
837 28 : ASSERT(warming_it != warming_clusters_.end());
838 :
839 : // If the cluster is being updated, we need to cancel any pending merged updates.
840 : // Otherwise, applyUpdates() will fire with a dangling cluster reference.
841 28 : updates_map_.erase(cluster_name);
842 :
843 28 : active_clusters_[cluster_name] = std::move(warming_it->second);
844 28 : warming_clusters_.erase(warming_it);
845 28 : }
846 :
847 40 : bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
848 40 : bool removed = false;
849 40 : auto existing_active_cluster = active_clusters_.find(cluster_name);
850 40 : if (existing_active_cluster != active_clusters_.end() &&
851 40 : existing_active_cluster->second->added_via_api_) {
852 0 : removed = true;
853 0 : init_helper_.removeCluster(*existing_active_cluster->second);
854 0 : active_clusters_.erase(existing_active_cluster);
855 :
856 0 : ENVOY_LOG(debug, "removing cluster {}", cluster_name);
857 0 : tls_.runOnAllThreads([cluster_name](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
858 0 : ASSERT(cluster_manager->thread_local_clusters_.contains(cluster_name) ||
859 0 : cluster_manager->thread_local_deferred_clusters_.contains(cluster_name));
860 0 : ENVOY_LOG(debug, "removing TLS cluster {}", cluster_name);
861 0 : for (auto cb_it = cluster_manager->update_callbacks_.begin();
862 0 : cb_it != cluster_manager->update_callbacks_.end();) {
863 : // The current callback may remove itself from the list, so a handle for
864 : // the next item is fetched before calling the callback.
865 0 : auto curr_cb_it = cb_it;
866 0 : ++cb_it;
867 0 : (*curr_cb_it)->onClusterRemoval(cluster_name);
868 0 : }
869 0 : cluster_manager->thread_local_clusters_.erase(cluster_name);
870 0 : cluster_manager->thread_local_deferred_clusters_.erase(cluster_name);
871 0 : cluster_manager->local_stats_.clusters_inflated_.set(
872 0 : cluster_manager->thread_local_clusters_.size());
873 0 : });
874 0 : cluster_initialization_map_.erase(cluster_name);
875 0 : }
876 :
877 40 : auto existing_warming_cluster = warming_clusters_.find(cluster_name);
878 40 : if (existing_warming_cluster != warming_clusters_.end() &&
879 40 : existing_warming_cluster->second->added_via_api_) {
880 0 : removed = true;
881 0 : init_helper_.removeCluster(*existing_warming_cluster->second);
882 0 : warming_clusters_.erase(existing_warming_cluster);
883 0 : ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
884 0 : }
885 :
886 40 : if (removed) {
887 0 : cm_stats_.cluster_removed_.inc();
888 0 : updateClusterCounts();
889 : // Cancel any pending merged updates.
890 0 : updates_map_.erase(cluster_name);
891 0 : }
892 :
893 40 : return removed;
894 40 : }
895 :
896 : absl::StatusOr<ClusterManagerImpl::ClusterDataPtr>
897 : ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
898 : const uint64_t cluster_hash, const std::string& version_info,
899 : bool added_via_api, const bool required_for_ads,
900 159 : ClusterMap& cluster_map) {
901 159 : absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
902 159 : new_cluster_pair_or_error =
903 159 : factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
904 :
905 159 : if (!new_cluster_pair_or_error.ok()) {
906 0 : return absl::InvalidArgumentError(std::string(new_cluster_pair_or_error.status().message()));
907 0 : }
908 159 : auto& new_cluster = new_cluster_pair_or_error->first;
909 159 : auto& lb = new_cluster_pair_or_error->second;
910 159 : Cluster& cluster_reference = *new_cluster;
911 :
912 159 : const auto cluster_info = cluster_reference.info();
913 :
914 159 : if (!added_via_api) {
915 131 : if (cluster_map.find(cluster_info->name()) != cluster_map.end()) {
916 0 : return absl::InvalidArgumentError(
917 0 : fmt::format("cluster manager: duplicate cluster '{}'", cluster_info->name()));
918 0 : }
919 131 : }
920 :
921 : // Check if the cluster provided load balancing policy is used. We need handle it as special
922 : // case.
923 159 : bool cluster_provided_lb = cluster_info->lbType() == LoadBalancerType::ClusterProvided;
924 159 : if (cluster_info->lbType() == LoadBalancerType::LoadBalancingPolicyConfig) {
925 159 : TypedLoadBalancerFactory* typed_lb_factory = cluster_info->loadBalancerFactory();
926 159 : RELEASE_ASSERT(typed_lb_factory != nullptr, "ClusterInfo should contain a valid factory");
927 159 : cluster_provided_lb =
928 159 : typed_lb_factory->name() == "envoy.load_balancing_policies.cluster_provided";
929 159 : }
930 :
931 159 : if (cluster_provided_lb && lb == nullptr) {
932 0 : return absl::InvalidArgumentError(
933 0 : fmt::format("cluster manager: cluster provided LB specified but cluster "
934 0 : "'{}' did not provide one. Check cluster documentation.",
935 0 : cluster_info->name()));
936 0 : }
937 159 : if (!cluster_provided_lb && lb != nullptr) {
938 0 : return absl::InvalidArgumentError(
939 0 : fmt::format("cluster manager: cluster provided LB not specified but cluster "
940 0 : "'{}' provided one. Check cluster documentation.",
941 0 : cluster_info->name()));
942 0 : }
943 :
944 159 : if (new_cluster->healthChecker() != nullptr) {
945 0 : new_cluster->healthChecker()->addHostCheckCompleteCb(
946 0 : [this](HostSharedPtr host, HealthTransition changed_state) {
947 0 : if (changed_state == HealthTransition::Changed &&
948 0 : host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
949 0 : postThreadLocalHealthFailure(host);
950 0 : }
951 0 : });
952 0 : }
953 :
954 159 : if (new_cluster->outlierDetector() != nullptr) {
955 0 : new_cluster->outlierDetector()->addChangedStateCb([this](HostSharedPtr host) {
956 0 : if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
957 0 : ENVOY_LOG_EVENT(debug, "outlier_detection_ejection",
958 0 : "host {} in cluster {} was ejected by the outlier detector",
959 0 : host->address()->asStringView(), host->cluster().name());
960 0 : postThreadLocalHealthFailure(host);
961 0 : }
962 0 : });
963 0 : }
964 159 : ClusterDataPtr result;
965 159 : auto cluster_entry_it = cluster_map.find(cluster_info->name());
966 159 : if (cluster_entry_it != cluster_map.end()) {
967 0 : result = std::exchange(cluster_entry_it->second,
968 0 : std::make_unique<ClusterData>(cluster, cluster_hash, version_info,
969 0 : added_via_api, required_for_ads,
970 0 : std::move(new_cluster), time_source_));
971 159 : } else {
972 159 : bool inserted = false;
973 159 : std::tie(cluster_entry_it, inserted) = cluster_map.emplace(
974 159 : cluster_info->name(),
975 159 : std::make_unique<ClusterData>(cluster, cluster_hash, version_info, added_via_api,
976 159 : required_for_ads, std::move(new_cluster), time_source_));
977 159 : ASSERT(inserted);
978 159 : }
979 : // If an LB is thread aware, create it here. The LB is not initialized until cluster pre-init
980 : // finishes. For RingHash/Maglev don't create the LB here if subset balancing is enabled,
981 : // because the thread_aware_lb_ field takes precedence over the subset lb).
982 159 : if (cluster_info->lbType() == LoadBalancerType::RingHash) {
983 0 : if (!cluster_info->lbSubsetInfo().isEnabled()) {
984 0 : auto& factory = Config::Utility::getAndCheckFactoryByName<TypedLoadBalancerFactory>(
985 0 : "envoy.load_balancing_policies.ring_hash");
986 0 : cluster_entry_it->second->thread_aware_lb_ = factory.create(
987 0 : {}, *cluster_info, cluster_reference.prioritySet(), runtime_, random_, time_source_);
988 0 : }
989 159 : } else if (cluster_info->lbType() == LoadBalancerType::Maglev) {
990 0 : if (!cluster_info->lbSubsetInfo().isEnabled()) {
991 0 : auto& factory = Config::Utility::getAndCheckFactoryByName<TypedLoadBalancerFactory>(
992 0 : "envoy.load_balancing_policies.maglev");
993 0 : cluster_entry_it->second->thread_aware_lb_ = factory.create(
994 0 : {}, *cluster_info, cluster_reference.prioritySet(), runtime_, random_, time_source_);
995 0 : }
996 159 : } else if (cluster_provided_lb) {
997 0 : cluster_entry_it->second->thread_aware_lb_ = std::move(lb);
998 159 : } else if (cluster_info->lbType() == LoadBalancerType::LoadBalancingPolicyConfig) {
999 159 : TypedLoadBalancerFactory* typed_lb_factory = cluster_info->loadBalancerFactory();
1000 159 : RELEASE_ASSERT(typed_lb_factory != nullptr, "ClusterInfo should contain a valid factory");
1001 159 : cluster_entry_it->second->thread_aware_lb_ =
1002 159 : typed_lb_factory->create(cluster_info->loadBalancerConfig(), *cluster_info,
1003 159 : cluster_reference.prioritySet(), runtime_, random_, time_source_);
1004 159 : }
1005 :
1006 159 : updateClusterCounts();
1007 159 : return result;
1008 159 : }
1009 :
1010 445 : void ClusterManagerImpl::updateClusterCounts() {
1011 : // This if/else block implements a control flow mechanism that can be used by an ADS
1012 : // implementation to properly sequence CDS and RDS updates. It is not enforcing on ADS. ADS can
1013 : // use it to detect when a previously sent cluster becomes warm before sending routes that depend
1014 : // on it. This can improve incidence of HTTP 503 responses from Envoy when a route is used before
1015 : // it's supporting cluster is ready.
1016 : //
1017 : // We achieve that by leaving CDS in the paused state as long as there is at least
1018 : // one cluster in the warming state. This prevents CDS ACK from being sent to ADS.
1019 : // Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
1020 : // signal to ADS to proceed with RDS updates.
1021 : // If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
1022 445 : const bool all_clusters_initialized =
1023 445 : init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
1024 445 : if (all_clusters_initialized && ads_mux_) {
1025 0 : const auto type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>();
1026 0 : if (last_recorded_warming_clusters_count_ == 0 && !warming_clusters_.empty()) {
1027 0 : resume_cds_ = ads_mux_->pause(type_url);
1028 0 : } else if (last_recorded_warming_clusters_count_ > 0 && warming_clusters_.empty()) {
1029 0 : ASSERT(resume_cds_ != nullptr);
1030 0 : resume_cds_.reset();
1031 0 : }
1032 0 : }
1033 445 : cm_stats_.active_clusters_.set(active_clusters_.size());
1034 445 : cm_stats_.warming_clusters_.set(warming_clusters_.size());
1035 445 : last_recorded_warming_clusters_count_ = warming_clusters_.size();
1036 445 : }
1037 :
1038 504 : ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view cluster) {
1039 504 : ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
1040 :
1041 504 : auto entry = cluster_manager.thread_local_clusters_.find(cluster);
1042 504 : if (entry != cluster_manager.thread_local_clusters_.end()) {
1043 504 : return entry->second.get();
1044 504 : } else {
1045 0 : return cluster_manager.initializeClusterInlineIfExists(cluster);
1046 0 : }
1047 504 : }
1048 :
1049 : void ClusterManagerImpl::maybePreconnect(
1050 : ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
1051 : const ClusterConnectivityState& state,
1052 251 : std::function<ConnectionPool::Instance*()> pick_preconnect_pool) {
1053 251 : auto peekahead_ratio = cluster_entry.info()->peekaheadRatio();
1054 251 : if (peekahead_ratio <= 1.0) {
1055 251 : return;
1056 251 : }
1057 :
1058 : // 3 here is arbitrary. Just as in ConnPoolImplBase::tryCreateNewConnections
1059 : // we want to limit the work which can be done on any given preconnect attempt.
1060 0 : for (int i = 0; i < 3; ++i) {
1061 : // See if adding this one new connection
1062 : // would put the cluster over desired capacity. If so, stop preconnecting.
1063 : //
1064 : // We anticipate the incoming stream here, because maybePreconnect is called
1065 : // before a new stream is established.
1066 0 : if (!ConnectionPool::ConnPoolImplBase::shouldConnect(
1067 0 : state.pending_streams_, state.active_streams_,
1068 0 : state.connecting_and_connected_stream_capacity_, peekahead_ratio, true)) {
1069 0 : return;
1070 0 : }
1071 0 : ConnectionPool::Instance* preconnect_pool = pick_preconnect_pool();
1072 0 : if (!preconnect_pool || !preconnect_pool->maybePreconnect(peekahead_ratio)) {
1073 : // Given that the next preconnect pick may be entirely different, we could
1074 : // opt to try again even if the first preconnect fails. Err on the side of
1075 : // caution and wait for the next attempt.
1076 0 : return;
1077 0 : }
1078 0 : }
1079 0 : }
1080 :
1081 : absl::optional<HttpPoolData>
1082 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
1083 : ResourcePriority priority, absl::optional<Http::Protocol> protocol,
1084 251 : LoadBalancerContext* context) {
1085 : // Select a host and create a connection pool for it if it does not already exist.
1086 251 : auto pool = httpConnPoolImpl(priority, protocol, context, false);
1087 251 : if (pool == nullptr) {
1088 0 : return absl::nullopt;
1089 0 : }
1090 :
1091 251 : HttpPoolData data(
1092 251 : [this, priority, protocol, context]() -> void {
1093 : // Now that a new stream is being established, attempt to preconnect.
1094 251 : maybePreconnect(*this, parent_.cluster_manager_state_,
1095 251 : [this, &priority, &protocol, &context]() {
1096 0 : return httpConnPoolImpl(priority, protocol, context, true);
1097 0 : });
1098 251 : },
1099 251 : pool);
1100 251 : return data;
1101 251 : }
1102 :
1103 : absl::optional<TcpPoolData>
1104 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
1105 0 : ResourcePriority priority, LoadBalancerContext* context) {
1106 : // Select a host and create a connection pool for it if it does not already exist.
1107 0 : auto pool = tcpConnPoolImpl(priority, context, false);
1108 0 : if (pool == nullptr) {
1109 0 : return absl::nullopt;
1110 0 : }
1111 :
1112 0 : TcpPoolData data(
1113 0 : [this, priority, context]() -> void {
1114 0 : maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() {
1115 0 : return tcpConnPoolImpl(priority, context, true);
1116 0 : });
1117 0 : },
1118 0 : pool);
1119 0 : return data;
1120 0 : }
1121 :
1122 : void ClusterManagerImpl::drainConnections(const std::string& cluster,
1123 0 : DrainConnectionsHostPredicate predicate) {
1124 0 : ENVOY_LOG_EVENT(debug, "drain_connections_call", "drainConnections called for cluster {}",
1125 0 : cluster);
1126 0 : tls_.runOnAllThreads([cluster, predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
1127 0 : auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster);
1128 0 : if (cluster_entry != cluster_manager->thread_local_clusters_.end()) {
1129 0 : cluster_entry->second->drainConnPools(
1130 0 : predicate, ConnectionPool::DrainBehavior::DrainExistingConnections);
1131 0 : }
1132 0 : });
1133 0 : }
1134 :
1135 0 : void ClusterManagerImpl::drainConnections(DrainConnectionsHostPredicate predicate) {
1136 0 : ENVOY_LOG_EVENT(debug, "drain_connections_call_for_all_clusters",
1137 0 : "drainConnections called for all clusters");
1138 0 : tls_.runOnAllThreads([predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
1139 0 : for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) {
1140 0 : cluster_entry.second->drainConnPools(predicate,
1141 0 : ConnectionPool::DrainBehavior::DrainExistingConnections);
1142 0 : }
1143 0 : });
1144 0 : }
1145 :
1146 28 : absl::Status ClusterManagerImpl::checkActiveStaticCluster(const std::string& cluster) {
1147 28 : const auto& it = active_clusters_.find(cluster);
1148 28 : if (it == active_clusters_.end()) {
1149 0 : return absl::InvalidArgumentError(fmt::format("Unknown gRPC client cluster '{}'", cluster));
1150 0 : }
1151 28 : if (it->second->added_via_api_) {
1152 0 : return absl::InvalidArgumentError(
1153 0 : fmt::format("gRPC client cluster '{}' is not static", cluster));
1154 0 : }
1155 28 : return absl::OkStatus();
1156 28 : }
1157 :
1158 : void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster,
1159 0 : const HostVector& hosts_removed) {
1160 : // Drain the connection pools for the given hosts. For deferred clusters have
1161 : // been created.
1162 0 : tls_.runOnAllThreads([name = cluster.info()->name(),
1163 0 : hosts_removed](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
1164 0 : cluster_manager->removeHosts(name, hosts_removed);
1165 0 : });
1166 0 : }
1167 :
1168 : bool ClusterManagerImpl::deferralIsSupportedForCluster(
1169 159 : const ClusterInfoConstSharedPtr& info) const {
1170 159 : if (!deferred_cluster_creation_) {
1171 159 : return false;
1172 159 : }
1173 :
1174 : // Certain cluster types are unsupported for deferred initialization.
1175 : // We need to check both the `clusterType()` (preferred) falling back to
1176 : // the `type()` due to how custom clusters were added leveraging an any
1177 : // config.
1178 0 : if (auto custom_cluster_type = info->clusterType(); custom_cluster_type.has_value()) {
1179 : // TODO(kbaichoo): make it configurable what custom types are supported?
1180 0 : static const std::array<std::string, 4> supported_well_known_cluster_types = {
1181 0 : "envoy.clusters.aggregate", "envoy.cluster.eds", "envoy.clusters.redis",
1182 0 : "envoy.cluster.static"};
1183 0 : if (std::find(supported_well_known_cluster_types.begin(),
1184 0 : supported_well_known_cluster_types.end(),
1185 0 : custom_cluster_type->name()) == supported_well_known_cluster_types.end()) {
1186 0 : return false;
1187 0 : }
1188 :
1189 0 : } else {
1190 : // Check DiscoveryType instead.
1191 0 : static constexpr std::array<envoy::config::cluster::v3::Cluster::DiscoveryType, 2>
1192 0 : supported_cluster_types = {envoy::config::cluster::v3::Cluster::EDS,
1193 0 : envoy::config::cluster::v3::Cluster::STATIC};
1194 0 : if (std::find(supported_cluster_types.begin(), supported_cluster_types.end(), info->type()) ==
1195 0 : supported_cluster_types.end()) {
1196 0 : return false;
1197 0 : }
1198 0 : }
1199 :
1200 0 : return true;
1201 0 : }
1202 :
1203 : void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
1204 159 : ThreadLocalClusterUpdateParams&& params) {
1205 159 : bool add_or_update_cluster = false;
1206 159 : if (!cm_cluster.addedOrUpdated()) {
1207 159 : add_or_update_cluster = true;
1208 159 : cm_cluster.setAddedOrUpdated();
1209 159 : }
1210 :
1211 159 : LoadBalancerFactorySharedPtr load_balancer_factory;
1212 159 : if (add_or_update_cluster) {
1213 159 : load_balancer_factory = cm_cluster.loadBalancerFactory();
1214 159 : }
1215 :
1216 159 : for (auto& per_priority : params.per_priority_update_params_) {
1217 159 : const auto& host_set =
1218 159 : cm_cluster.cluster().prioritySet().hostSetsPerPriority()[per_priority.priority_];
1219 159 : per_priority.update_hosts_params_ = HostSetImpl::updateHostsParams(*host_set);
1220 159 : per_priority.locality_weights_ = host_set->localityWeights();
1221 159 : per_priority.weighted_priority_health_ = host_set->weightedPriorityHealth();
1222 159 : per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
1223 159 : }
1224 :
1225 159 : HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap();
1226 :
1227 159 : pending_cluster_creations_.erase(cm_cluster.cluster().info()->name());
1228 :
1229 159 : const UnitFloat drop_overload = cm_cluster.cluster().dropOverload();
1230 : // Populate the cluster initialization object based on this update.
1231 159 : ClusterInitializationObjectConstSharedPtr cluster_initialization_object =
1232 159 : addOrUpdateClusterInitializationObjectIfSupported(
1233 159 : params, cm_cluster.cluster().info(), load_balancer_factory, host_map, drop_overload);
1234 :
1235 159 : tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
1236 159 : add_or_update_cluster, load_balancer_factory, map = std::move(host_map),
1237 159 : cluster_initialization_object = std::move(cluster_initialization_object),
1238 306 : drop_overload](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
1239 306 : ASSERT(cluster_manager.has_value(),
1240 306 : "Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");
1241 :
1242 : // Cluster Manager here provided by the particular thread, it will provide
1243 : // this allowing to make the relevant change.
1244 306 : if (const bool defer_unused_clusters =
1245 306 : cluster_initialization_object != nullptr &&
1246 306 : !cluster_manager->thread_local_clusters_.contains(info->name()) &&
1247 306 : !Envoy::Thread::MainThread::isMainThread();
1248 306 : defer_unused_clusters) {
1249 : // Save the cluster initialization object.
1250 0 : ENVOY_LOG(debug, "Deferring add or update for TLS cluster {}", info->name());
1251 0 : cluster_manager->thread_local_deferred_clusters_[info->name()] =
1252 0 : cluster_initialization_object;
1253 :
1254 : // Invoke similar logic of onClusterAddOrUpdate.
1255 0 : ThreadLocalClusterCommand command = [&cluster_manager,
1256 0 : cluster_name = info->name()]() -> ThreadLocalCluster& {
1257 : // If we have multiple callbacks only the first one needs to use the
1258 : // command to initialize the cluster.
1259 0 : auto existing_cluster_entry = cluster_manager->thread_local_clusters_.find(cluster_name);
1260 0 : if (existing_cluster_entry != cluster_manager->thread_local_clusters_.end()) {
1261 0 : return *existing_cluster_entry->second;
1262 0 : }
1263 :
1264 0 : auto* cluster_entry = cluster_manager->initializeClusterInlineIfExists(cluster_name);
1265 0 : ASSERT(cluster_entry != nullptr, "Deferred clusters initiailization should not fail.");
1266 0 : return *cluster_entry;
1267 0 : };
1268 0 : for (auto cb_it = cluster_manager->update_callbacks_.begin();
1269 0 : cb_it != cluster_manager->update_callbacks_.end();) {
1270 : // The current callback may remove itself from the list, so a handle for
1271 : // the next item is fetched before calling the callback.
1272 0 : auto curr_cb_it = cb_it;
1273 0 : ++cb_it;
1274 0 : (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
1275 0 : }
1276 :
1277 306 : } else {
1278 : // Broadcast
1279 306 : ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
1280 306 : if (add_or_update_cluster) {
1281 306 : if (cluster_manager->thread_local_clusters_.contains(info->name())) {
1282 0 : ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
1283 306 : } else {
1284 306 : ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
1285 306 : }
1286 :
1287 306 : new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
1288 306 : load_balancer_factory);
1289 306 : cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
1290 306 : cluster_manager->local_stats_.clusters_inflated_.set(
1291 306 : cluster_manager->thread_local_clusters_.size());
1292 306 : }
1293 :
1294 306 : if (cluster_manager->thread_local_clusters_[info->name()]) {
1295 306 : cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);
1296 306 : }
1297 306 : for (const auto& per_priority : params.per_priority_update_params_) {
1298 306 : cluster_manager->updateClusterMembership(
1299 306 : info->name(), per_priority.priority_, per_priority.update_hosts_params_,
1300 306 : per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,
1301 306 : per_priority.weighted_priority_health_, per_priority.overprovisioning_factor_, map);
1302 306 : }
1303 :
1304 306 : if (new_cluster != nullptr) {
1305 306 : ThreadLocalClusterCommand command = [&new_cluster]() -> ThreadLocalCluster& {
1306 0 : return *new_cluster;
1307 0 : };
1308 306 : for (auto cb_it = cluster_manager->update_callbacks_.begin();
1309 612 : cb_it != cluster_manager->update_callbacks_.end();) {
1310 : // The current callback may remove itself from the list, so a handle for
1311 : // the next item is fetched before calling the callback.
1312 306 : auto curr_cb_it = cb_it;
1313 306 : ++cb_it;
1314 306 : (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
1315 306 : }
1316 306 : }
1317 306 : }
1318 306 : });
1319 :
1320 : // By this time, the main thread has received the cluster initialization update, so we can start
1321 : // the ADS mux if the ADS mux is dependent on this cluster's initialization.
1322 159 : if (cm_cluster.requiredForAds() && !ads_mux_initialized_) {
1323 28 : ads_mux_->start();
1324 28 : ads_mux_initialized_ = true;
1325 28 : }
1326 159 : }
1327 :
1328 : ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr
1329 : ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
1330 : const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
1331 : LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
1332 159 : UnitFloat drop_overload) {
1333 159 : if (!deferralIsSupportedForCluster(cluster_info)) {
1334 159 : return nullptr;
1335 159 : }
1336 :
1337 0 : const std::string& cluster_name = cluster_info->name();
1338 0 : auto entry = cluster_initialization_map_.find(cluster_name);
1339 : // TODO(kbaichoo): if EDS can be configured via cluster_type() then modify the
1340 : // merging logic below.
1341 : //
1342 : // This method may be called multiple times to create multiple ClusterInitializationObject
1343 : // instances for the same cluster. And before the thread local clusters are actually initialized,
1344 : // the new instances will override the old instances in the work threads. But part of data is be
1345 : // created only once, such as load balancer factory. So we should always to merge the new instance
1346 : // with the old one to keep the latest instance have all necessary data.
1347 : //
1348 : // More specifically, this will happen in the following scenarios for now:
1349 : // 1. EDS clusters: the ClusterLoadAssignment of EDS cluster may be updated multiples before
1350 : // the thread local cluster is initialized.
1351 : // 2. Clusters in the unit tests: the cluster in the unit test may be updated multiples before
1352 : // the thread local cluster is initialized by calling 'updateHosts' manually.
1353 0 : const bool should_merge_with_prior_cluster =
1354 0 : entry != cluster_initialization_map_.end() && entry->second->cluster_info_ == cluster_info;
1355 :
1356 0 : if (should_merge_with_prior_cluster) {
1357 : // We need to copy from an existing Cluster Initialization Object. In
1358 : // particular, only update the params with changed priority.
1359 0 : auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
1360 0 : entry->second->per_priority_state_, params, std::move(cluster_info),
1361 0 : load_balancer_factory == nullptr ? entry->second->load_balancer_factory_
1362 0 : : load_balancer_factory,
1363 0 : map, drop_overload);
1364 0 : cluster_initialization_map_[cluster_name] = new_initialization_object;
1365 0 : return new_initialization_object;
1366 0 : } else {
1367 : // We need to create a fresh Cluster Initialization Object.
1368 0 : auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
1369 0 : params, std::move(cluster_info), load_balancer_factory, map, drop_overload);
1370 0 : cluster_initialization_map_[cluster_name] = new_initialization_object;
1371 0 : return new_initialization_object;
1372 0 : }
1373 0 : }
1374 :
1375 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry*
1376 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExists(
1377 0 : absl::string_view cluster) {
1378 0 : auto entry = thread_local_deferred_clusters_.find(cluster);
1379 0 : if (entry == thread_local_deferred_clusters_.end()) {
1380 : // Unknown cluster.
1381 0 : return nullptr;
1382 0 : }
1383 :
1384 : // Create the cluster inline.
1385 0 : const ClusterInitializationObjectConstSharedPtr& initialization_object = entry->second;
1386 0 : ENVOY_LOG(debug, "initializing TLS cluster {} inline", cluster);
1387 0 : auto cluster_entry = std::make_unique<ClusterEntry>(
1388 0 : *this, initialization_object->cluster_info_, initialization_object->load_balancer_factory_);
1389 0 : ClusterEntry* cluster_entry_ptr = cluster_entry.get();
1390 :
1391 0 : thread_local_clusters_[cluster] = std::move(cluster_entry);
1392 0 : local_stats_.clusters_inflated_.set(thread_local_clusters_.size());
1393 :
1394 0 : for (const auto& [_, per_priority] : initialization_object->per_priority_state_) {
1395 0 : updateClusterMembership(initialization_object->cluster_info_->name(), per_priority.priority_,
1396 0 : per_priority.update_hosts_params_, per_priority.locality_weights_,
1397 0 : per_priority.hosts_added_, per_priority.hosts_removed_,
1398 0 : per_priority.weighted_priority_health_,
1399 0 : per_priority.overprovisioning_factor_,
1400 0 : initialization_object->cross_priority_host_map_);
1401 0 : }
1402 0 : thread_local_clusters_[cluster]->setDropOverload(initialization_object->drop_overload_);
1403 :
1404 : // Remove the CIO as we've initialized the cluster.
1405 0 : thread_local_deferred_clusters_.erase(entry);
1406 :
1407 0 : return cluster_entry_ptr;
1408 0 : }
1409 :
1410 : ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
1411 : const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
1412 : LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
1413 : UnitFloat drop_overload)
1414 : : cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory),
1415 0 : cross_priority_host_map_(map), drop_overload_(drop_overload) {
1416 : // Copy the update since the map is empty.
1417 0 : for (const auto& update : params.per_priority_update_params_) {
1418 0 : per_priority_state_.emplace(update.priority_, update);
1419 0 : }
1420 0 : }
1421 :
1422 : ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
1423 : const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>& per_priority_state,
1424 : const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
1425 : LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
1426 : UnitFloat drop_overload)
1427 : : per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)),
1428 : load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map),
1429 0 : drop_overload_(drop_overload) {
1430 :
1431 : // Because EDS Clusters receive the entire ClusterLoadAssignment but only
1432 : // provides the delta we must process the hosts_added and hosts_removed and
1433 : // not simply overwrite with hosts added.
1434 0 : for (const auto& update : update_params.per_priority_update_params_) {
1435 0 : auto it = per_priority_state_.find(update.priority_);
1436 0 : if (it != per_priority_state_.end()) {
1437 0 : auto& priority_state = it->second;
1438 : // Merge the two per_priorities.
1439 0 : priority_state.update_hosts_params_ = update.update_hosts_params_;
1440 0 : priority_state.locality_weights_ = update.locality_weights_;
1441 0 : priority_state.weighted_priority_health_ = update.weighted_priority_health_;
1442 0 : priority_state.overprovisioning_factor_ = update.overprovisioning_factor_;
1443 :
1444 : // Merge the hosts vectors to just have hosts added.
1445 : // Assumes that the old host_added_ is exclusive to new hosts_added_ and
1446 : // new hosts_removed_ only refers to the old hosts_added_.
1447 0 : ASSERT(priority_state.hosts_removed_.empty(),
1448 0 : "Cluster Initialization Object should apply hosts "
1449 0 : "removed updates to hosts_added vector!");
1450 :
1451 : // TODO(kbaichoo): replace with a more efficient algorithm. For example
1452 : // if the EDS cluster exposed the LoadAssignment we could just merge by
1453 : // overwriting hosts_added.
1454 0 : if (!update.hosts_removed_.empty()) {
1455 : // Remove all hosts to be removed from the old host_added.
1456 0 : auto& host_added = priority_state.hosts_added_;
1457 0 : auto removed_section = std::remove_if(
1458 0 : host_added.begin(), host_added.end(),
1459 0 : [hosts_removed = std::cref(update.hosts_removed_)](const HostSharedPtr& ptr) {
1460 0 : return std::find(hosts_removed.get().begin(), hosts_removed.get().end(), ptr) !=
1461 0 : hosts_removed.get().end();
1462 0 : });
1463 0 : priority_state.hosts_added_.erase(removed_section, priority_state.hosts_added_.end());
1464 0 : }
1465 :
1466 : // Add updated host_added.
1467 0 : priority_state.hosts_added_.reserve(priority_state.hosts_added_.size() +
1468 0 : update.hosts_added_.size());
1469 0 : std::copy(update.hosts_added_.begin(), update.hosts_added_.end(),
1470 0 : std::back_inserter(priority_state.hosts_added_));
1471 :
1472 0 : } else {
1473 : // Just copy the new priority.
1474 0 : per_priority_state_.emplace(update.priority_, update);
1475 0 : }
1476 0 : }
1477 0 : }
1478 :
1479 0 : void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) {
1480 0 : tls_.runOnAllThreads([host](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
1481 0 : cluster_manager->onHostHealthFailure(host);
1482 0 : });
1483 0 : }
1484 :
1485 : Host::CreateConnectionData ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConn(
1486 0 : LoadBalancerContext* context) {
1487 0 : HostConstSharedPtr logical_host = chooseHost(context);
1488 0 : if (logical_host) {
1489 0 : auto conn_info = logical_host->createConnection(
1490 0 : parent_.thread_local_dispatcher_, nullptr,
1491 0 : context == nullptr ? nullptr : context->upstreamTransportSocketOptions());
1492 0 : if ((cluster_info_->features() &
1493 0 : ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) &&
1494 0 : conn_info.connection_ != nullptr) {
1495 0 : auto conn_map_iter = parent_.host_tcp_conn_map_.find(logical_host);
1496 0 : if (conn_map_iter == parent_.host_tcp_conn_map_.end()) {
1497 0 : conn_map_iter =
1498 0 : parent_.host_tcp_conn_map_.try_emplace(logical_host, logical_host->acquireHandle())
1499 0 : .first;
1500 0 : }
1501 0 : auto& conn_map = conn_map_iter->second;
1502 0 : conn_map.connections_.emplace(
1503 0 : conn_info.connection_.get(),
1504 0 : std::make_unique<ThreadLocalClusterManagerImpl::TcpConnContainer>(
1505 0 : parent_, logical_host, *conn_info.connection_));
1506 0 : }
1507 0 : return conn_info;
1508 0 : } else {
1509 0 : cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
1510 0 : return {nullptr, nullptr};
1511 0 : }
1512 0 : }
1513 :
1514 : Http::AsyncClient&
1515 68 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient() {
1516 68 : if (lazy_http_async_client_ == nullptr) {
1517 33 : lazy_http_async_client_ = std::make_unique<Http::AsyncClientImpl>(
1518 33 : cluster_info_, parent_.parent_.stats_, parent_.thread_local_dispatcher_,
1519 33 : parent_.parent_.local_info_, parent_.parent_, parent_.parent_.runtime_,
1520 33 : parent_.parent_.random_,
1521 33 : Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent_.parent_)},
1522 33 : parent_.parent_.http_context_, parent_.parent_.router_context_);
1523 33 : }
1524 68 : return *lazy_http_async_client_;
1525 68 : }
1526 :
1527 : Tcp::AsyncTcpClientPtr
1528 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpAsyncClient(
1529 0 : LoadBalancerContext* context, Tcp::AsyncTcpClientOptionsConstSharedPtr options) {
1530 0 : return std::make_unique<Tcp::AsyncTcpClientImpl>(parent_.thread_local_dispatcher_, *this, context,
1531 0 : options->enable_half_close);
1532 0 : }
1533 :
1534 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHosts(
1535 : const std::string& name, uint32_t priority,
1536 : PrioritySet::UpdateHostsParams&& update_hosts_params,
1537 : LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
1538 : const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
1539 : absl::optional<uint32_t> overprovisioning_factor,
1540 306 : HostMapConstSharedPtr cross_priority_host_map) {
1541 306 : ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name,
1542 306 : hosts_added.size(), hosts_removed.size());
1543 306 : priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights),
1544 306 : hosts_added, hosts_removed, weighted_priority_health,
1545 306 : overprovisioning_factor, std::move(cross_priority_host_map));
1546 : // If an LB is thread aware, create a new worker local LB on membership changes.
1547 306 : if (lb_factory_ != nullptr && lb_factory_->recreateOnHostChange()) {
1548 0 : ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
1549 0 : lb_ = lb_factory_->create({priority_set_, parent_.local_priority_set_});
1550 0 : }
1551 306 : }
1552 :
1553 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
1554 306 : const HostVector& hosts_removed) {
1555 306 : for (const auto& host : hosts_removed) {
1556 306 : parent_.drainOrCloseConnPools(host, ConnectionPool::DrainBehavior::DrainAndDelete);
1557 306 : }
1558 306 : }
1559 :
1560 306 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools() {
1561 306 : for (auto& host_set : priority_set_.hostSetsPerPriority()) {
1562 306 : drainConnPools(host_set->hosts());
1563 306 : }
1564 306 : }
1565 :
1566 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
1567 0 : DrainConnectionsHostPredicate predicate, ConnectionPool::DrainBehavior behavior) {
1568 0 : for (auto& host_set : priority_set_.hostSetsPerPriority()) {
1569 0 : for (const auto& host : host_set->hosts()) {
1570 0 : if (predicate != nullptr && !predicate(*host)) {
1571 0 : continue;
1572 0 : }
1573 :
1574 0 : parent_.drainOrCloseConnPools(host, behavior);
1575 0 : }
1576 0 : }
1577 0 : }
1578 :
1579 : ClusterUpdateCallbacksHandlePtr
1580 0 : ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) {
1581 0 : ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
1582 0 : return cluster_manager.addClusterUpdateCallbacks(cb);
1583 0 : }
1584 :
1585 : OdCdsApiHandlePtr
1586 : ClusterManagerImpl::allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config,
1587 : OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
1588 0 : ProtobufMessage::ValidationVisitor& validation_visitor) {
1589 : // TODO(krnowak): Instead of creating a new handle every time, store the handles internally and
1590 : // return an already existing one if the config or locator matches. Note that this may need a
1591 : // way to clean up the unused handles, so we can close the unnecessary connections.
1592 0 : auto odcds = OdCdsApiImpl::create(odcds_config, odcds_resources_locator, *this, *this,
1593 0 : *stats_.rootScope(), validation_visitor);
1594 0 : return OdCdsApiHandleImpl::create(*this, std::move(odcds));
1595 0 : }
1596 :
1597 : ClusterDiscoveryCallbackHandlePtr
1598 : ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name,
1599 : ClusterDiscoveryCallbackPtr callback,
1600 0 : std::chrono::milliseconds timeout) {
1601 0 : ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
1602 :
1603 0 : auto [handle, discovery_in_progress, invoker] =
1604 0 : cluster_manager.cdm_.addCallback(name, std::move(callback));
1605 : // This check will catch requests for discoveries from this thread only. If other thread
1606 : // requested the same discovery, we will detect it in the main thread later.
1607 0 : if (discovery_in_progress) {
1608 0 : ENVOY_LOG(debug,
1609 0 : "cm odcds: on-demand discovery for cluster {} is already in progress, something else "
1610 0 : "in thread {} has already requested it",
1611 0 : name, cluster_manager.thread_local_dispatcher_.name());
1612 : // This worker thread has already requested a discovery of a cluster with this name, so
1613 : // nothing more left to do here.
1614 : //
1615 : // We can't "just" return handle here, because handle is a part of the structured binding done
1616 : // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like
1617 : // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here
1618 : // - it needs to be moved.
1619 0 : return std::move(handle);
1620 0 : }
1621 0 : ENVOY_LOG(
1622 0 : debug,
1623 0 : "cm odcds: forwarding the on-demand discovery request for cluster {} to the main thread",
1624 0 : name);
1625 : // This seems to be the first request for discovery of this cluster in this worker thread. Rest
1626 : // of the process may only happen in the main thread.
1627 0 : dispatcher_.post([this, odcds = std::move(odcds), timeout, name = std::move(name),
1628 0 : invoker = std::move(invoker),
1629 0 : &thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] {
1630 : // Check for the cluster here too. It might have been added between the time when this closure
1631 : // was posted and when it is being executed.
1632 0 : if (getThreadLocalCluster(name) != nullptr) {
1633 0 : ENVOY_LOG(
1634 0 : debug,
1635 0 : "cm odcds: the requested cluster {} is already known, posting the callback back to {}",
1636 0 : name, thread_local_dispatcher.name());
1637 0 : thread_local_dispatcher.post([invoker = std::move(invoker)] {
1638 0 : invoker.invokeCallback(ClusterDiscoveryStatus::Available);
1639 0 : });
1640 0 : return;
1641 0 : }
1642 :
1643 0 : if (auto it = pending_cluster_creations_.find(name); it != pending_cluster_creations_.end()) {
1644 0 : ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name);
1645 : // We already began the discovery process for this cluster, nothing to do. If we got here,
1646 : // it means that it was other worker thread that requested the discovery.
1647 0 : return;
1648 0 : }
1649 : // Start the discovery. If the cluster gets discovered, cluster manager will warm it up and
1650 : // invoke the cluster lifecycle callbacks, that will in turn invoke our callback.
1651 0 : odcds->updateOnDemand(name);
1652 : // Setup the discovery timeout timer to avoid keeping callbacks indefinitely.
1653 0 : auto timer = dispatcher_.createTimer([this, name] { notifyExpiredDiscovery(name); });
1654 0 : timer->enableTimer(timeout);
1655 : // Keep odcds handle alive for the duration of the discovery process.
1656 0 : pending_cluster_creations_.insert(
1657 0 : {std::move(name), ClusterCreation{std::move(odcds), std::move(timer)}});
1658 0 : });
1659 :
1660 : // We can't "just" return handle here, because handle is a part of the structured binding done
1661 : // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like
1662 : // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here -
1663 : // it needs to be moved.
1664 0 : return std::move(handle);
1665 0 : }
1666 :
1667 0 : void ClusterManagerImpl::notifyMissingCluster(absl::string_view name) {
1668 0 : ENVOY_LOG(debug, "cm odcds: cluster {} not found during on-demand discovery", name);
1669 0 : notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Missing);
1670 0 : }
1671 :
1672 0 : void ClusterManagerImpl::notifyExpiredDiscovery(absl::string_view name) {
1673 0 : ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} timed out", name);
1674 0 : notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Timeout);
1675 0 : }
1676 :
1677 : void ClusterManagerImpl::notifyClusterDiscoveryStatus(absl::string_view name,
1678 0 : ClusterDiscoveryStatus status) {
1679 0 : auto map_node_handle = pending_cluster_creations_.extract(name);
1680 0 : if (map_node_handle.empty()) {
1681 : // Not a cluster we are interested in. This may happen when ODCDS
1682 : // receives some cluster name in removed resources field and
1683 : // notifies the cluster manager about it.
1684 0 : return;
1685 0 : }
1686 : // Let all the worker threads know that the discovery timed out.
1687 0 : tls_.runOnAllThreads(
1688 0 : [name = std::string(name), status](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
1689 0 : ENVOY_LOG(
1690 0 : trace,
1691 0 : "cm cdm: starting processing cluster name {} (status {}) from the expired timer in {}",
1692 0 : name, enumToInt(status), cluster_manager->thread_local_dispatcher_.name());
1693 0 : cluster_manager->cdm_.processClusterName(name, status);
1694 0 : });
1695 0 : }
1696 :
1697 0 : Config::EdsResourcesCacheOptRef ClusterManagerImpl::edsResourcesCache() {
1698 : // EDS caching is only supported for ADS.
1699 0 : if (ads_mux_) {
1700 0 : return ads_mux_->edsResourcesCache();
1701 0 : }
1702 0 : return {};
1703 0 : }
1704 :
1705 : ClusterDiscoveryManager
1706 0 : ClusterManagerImpl::createAndSwapClusterDiscoveryManager(std::string thread_name) {
1707 0 : ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
1708 0 : ClusterDiscoveryManager cdm(std::move(thread_name), cluster_manager);
1709 :
1710 0 : cluster_manager.cdm_.swap(cdm);
1711 :
1712 0 : return cdm;
1713 0 : }
1714 :
1715 : ProtobufTypes::MessagePtr
1716 98 : ClusterManagerImpl::dumpClusterConfigs(const Matchers::StringMatcher& name_matcher) {
1717 98 : auto config_dump = std::make_unique<envoy::admin::v3::ClustersConfigDump>();
1718 98 : config_dump->set_version_info(cds_api_ != nullptr ? cds_api_->versionInfo() : "");
1719 131 : for (const auto& active_cluster_pair : active_clusters_) {
1720 131 : const auto& cluster = *active_cluster_pair.second;
1721 131 : if (!name_matcher.match(cluster.cluster_config_.name())) {
1722 0 : continue;
1723 0 : }
1724 131 : if (!cluster.added_via_api_) {
1725 131 : auto& static_cluster = *config_dump->mutable_static_clusters()->Add();
1726 131 : static_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
1727 131 : TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
1728 131 : *(static_cluster.mutable_last_updated()));
1729 131 : } else {
1730 0 : auto& dynamic_cluster = *config_dump->mutable_dynamic_active_clusters()->Add();
1731 0 : dynamic_cluster.set_version_info(cluster.version_info_);
1732 0 : dynamic_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
1733 0 : TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
1734 0 : *(dynamic_cluster.mutable_last_updated()));
1735 0 : }
1736 131 : }
1737 :
1738 98 : for (const auto& warming_cluster_pair : warming_clusters_) {
1739 0 : const auto& cluster = *warming_cluster_pair.second;
1740 0 : if (!name_matcher.match(cluster.cluster_config_.name())) {
1741 0 : continue;
1742 0 : }
1743 0 : auto& dynamic_cluster = *config_dump->mutable_dynamic_warming_clusters()->Add();
1744 0 : dynamic_cluster.set_version_info(cluster.version_info_);
1745 0 : dynamic_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
1746 0 : TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
1747 0 : *(dynamic_cluster.mutable_last_updated()));
1748 0 : }
1749 :
1750 98 : return config_dump;
1751 98 : }
1752 :
1753 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
1754 : ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
1755 : const absl::optional<LocalClusterParams>& local_cluster_params)
1756 : : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(dispatcher.name(), *this),
1757 223 : local_stats_(generateStats(*parent.stats_.rootScope(), dispatcher.name())) {
1758 : // If local cluster is defined then we need to initialize it first.
1759 223 : if (local_cluster_params.has_value()) {
1760 0 : const auto& local_cluster_name = local_cluster_params->info_->name();
1761 0 : ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name);
1762 0 : thread_local_clusters_[local_cluster_name] = std::make_unique<ClusterEntry>(
1763 0 : *this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_);
1764 0 : local_priority_set_ = &thread_local_clusters_[local_cluster_name]->prioritySet();
1765 0 : local_stats_.clusters_inflated_.set(thread_local_clusters_.size());
1766 0 : }
1767 223 : }
1768 :
1769 223 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImpl() {
1770 : // Clear out connection pools as well as the thread local cluster map so that we release all
1771 : // cluster pointers. Currently we have to free all non-local clusters before we free
1772 : // the local cluster. This is because non-local clusters with a zone aware load balancer have a
1773 : // member update callback registered with the local cluster.
1774 223 : ENVOY_LOG(debug, "shutting down thread local cluster manager");
1775 223 : destroying_ = true;
1776 223 : host_http_conn_pool_map_.clear();
1777 223 : host_tcp_conn_pool_map_.clear();
1778 223 : ASSERT(host_tcp_conn_map_.empty());
1779 337 : for (auto& cluster : thread_local_clusters_) {
1780 306 : if (&cluster.second->prioritySet() != local_priority_set_) {
1781 306 : cluster.second.reset();
1782 306 : }
1783 306 : }
1784 223 : thread_local_clusters_.clear();
1785 :
1786 : // Ensure that all pools are completely destructed.
1787 223 : thread_local_dispatcher_.clearDeferredDeleteList();
1788 223 : }
1789 :
1790 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeTcpConn(
1791 0 : const HostConstSharedPtr& host, Network::ClientConnection& connection) {
1792 0 : auto host_tcp_conn_map_it = host_tcp_conn_map_.find(host);
1793 0 : ASSERT(host_tcp_conn_map_it != host_tcp_conn_map_.end());
1794 0 : auto& connections_map = host_tcp_conn_map_it->second.connections_;
1795 0 : auto it = connections_map.find(&connection);
1796 0 : ASSERT(it != connections_map.end());
1797 0 : connection.dispatcher().deferredDelete(std::move(it->second));
1798 0 : connections_map.erase(it);
1799 0 : if (connections_map.empty()) {
1800 0 : host_tcp_conn_map_.erase(host_tcp_conn_map_it);
1801 0 : }
1802 0 : }
1803 :
1804 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(
1805 0 : const std::string& name, const HostVector& hosts_removed) {
1806 0 : auto entry = thread_local_clusters_.find(name);
1807 : // The if should only be possible if deferred cluster creation is enabled.
1808 0 : if (entry == thread_local_clusters_.end()) {
1809 0 : ASSERT(
1810 0 : parent_.deferred_cluster_creation_,
1811 0 : fmt::format("Cannot find ThreadLocalCluster {}, but deferred cluster creation is disabled.",
1812 0 : name));
1813 0 : ASSERT(thread_local_deferred_clusters_.find(name) != thread_local_deferred_clusters_.end(),
1814 0 : "Cluster with removed host is neither deferred or inflated!");
1815 0 : return;
1816 0 : }
1817 0 : const auto& cluster_entry = entry->second;
1818 0 : ENVOY_LOG(debug, "removing hosts for TLS cluster {} removed {}", name, hosts_removed.size());
1819 :
1820 : // We need to go through and purge any connection pools for hosts that got deleted.
1821 : // Even if two hosts actually point to the same address this will be safe, since if a
1822 : // host is readded it will be a different physical HostSharedPtr.
1823 0 : cluster_entry->drainConnPools(hosts_removed);
1824 0 : }
1825 :
1826 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
1827 : const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params,
1828 : LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
1829 : const HostVector& hosts_removed, bool weighted_priority_health,
1830 306 : uint64_t overprovisioning_factor, HostMapConstSharedPtr cross_priority_host_map) {
1831 306 : ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end());
1832 306 : const auto& cluster_entry = thread_local_clusters_[name];
1833 306 : cluster_entry->updateHosts(name, priority, std::move(update_hosts_params),
1834 306 : std::move(locality_weights), hosts_added, hosts_removed,
1835 306 : weighted_priority_health, overprovisioning_factor,
1836 306 : std::move(cross_priority_host_map));
1837 306 : }
1838 :
1839 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
1840 0 : const HostSharedPtr& host) {
1841 0 : if (host->cluster().features() &
1842 0 : ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
1843 0 : drainOrCloseConnPools(host, absl::nullopt);
1844 :
1845 : // Close non connection pool TCP connections obtained from tcpConn()
1846 : //
1847 : // TODO(jono): The only remaining user of the non-pooled connections seems to be the statsd
1848 : // TCP client. Perhaps it could be rewritten to use a connection pool, and this code deleted.
1849 : //
1850 : // Each connection will remove itself from the TcpConnectionsMap when it closes, via its
1851 : // Network::ConnectionCallbacks. The last removed tcp conn will remove the TcpConnectionsMap
1852 : // from host_tcp_conn_map_, so do not cache it between iterations.
1853 : //
1854 : // TODO(ggreenway) PERF: If there are a large number of connections, this could take a long
1855 : // time and halt other useful work. Consider breaking up this work. Note that this behavior is
1856 : // noted in the configuration documentation in cluster setting
1857 : // "close_connections_on_host_health_failure". Update the docs if this if this changes.
1858 0 : while (true) {
1859 0 : const auto& it = host_tcp_conn_map_.find(host);
1860 0 : if (it == host_tcp_conn_map_.end()) {
1861 0 : break;
1862 0 : }
1863 0 : TcpConnectionsMap& container = it->second;
1864 0 : container.connections_.begin()->first->close(
1865 0 : Network::ConnectionCloseType::NoFlush,
1866 0 : StreamInfo::LocalCloseReasons::get().NonPooledTcpConnectionHostHealthFailure);
1867 0 : }
1868 0 : } else {
1869 0 : drainOrCloseConnPools(host, ConnectionPool::DrainBehavior::DrainExistingConnections);
1870 0 : }
1871 0 : }
1872 :
1873 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ConnPoolsContainer*
1874 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer(
1875 730 : const HostConstSharedPtr& host, bool allocate) {
1876 730 : auto container_iter = host_http_conn_pool_map_.find(host);
1877 730 : if (container_iter == host_http_conn_pool_map_.end()) {
1878 479 : if (!allocate) {
1879 306 : return nullptr;
1880 306 : }
1881 173 : container_iter =
1882 173 : host_http_conn_pool_map_.try_emplace(host, thread_local_dispatcher_, host).first;
1883 173 : }
1884 :
1885 424 : return &container_iter->second;
1886 730 : }
1887 :
1888 : ClusterUpdateCallbacksHandlePtr
1889 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::addClusterUpdateCallbacks(
1890 223 : ClusterUpdateCallbacks& cb) {
1891 223 : return std::make_unique<ClusterUpdateCallbacksHandleImpl>(cb, update_callbacks_);
1892 223 : }
1893 :
1894 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
1895 : ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
1896 : const LoadBalancerFactorySharedPtr& lb_factory)
1897 : : parent_(parent), cluster_info_(cluster), lb_factory_(lb_factory),
1898 306 : override_host_statuses_(HostUtility::createOverrideHostStatus(cluster_info_->lbConfig())) {
1899 306 : priority_set_.getOrCreateHostSet(0);
1900 :
1901 : // TODO(mattklein123): Consider converting other LBs over to thread local. All of them could
1902 : // benefit given the healthy panic, locality, and priority calculations that take place.
1903 306 : if (cluster->lbSubsetInfo().isEnabled()) {
1904 0 : auto& factory = Config::Utility::getAndCheckFactoryByName<NonThreadAwareLoadBalancerFactory>(
1905 0 : "envoy.load_balancing_policies.subset");
1906 0 : lb_ = factory.create(*cluster, priority_set_, parent_.local_priority_set_,
1907 0 : parent.parent_.runtime_, parent.parent_.random_,
1908 0 : parent_.thread_local_dispatcher_.timeSource());
1909 306 : } else {
1910 306 : switch (cluster->lbType()) {
1911 0 : case LoadBalancerType::LeastRequest: {
1912 0 : ASSERT(lb_factory_ == nullptr);
1913 0 : lb_ = std::make_unique<LeastRequestLoadBalancer>(
1914 0 : priority_set_, parent_.local_priority_set_, cluster->lbStats(), parent.parent_.runtime_,
1915 0 : parent.parent_.random_, cluster->lbConfig(), cluster->lbLeastRequestConfig(),
1916 0 : parent.thread_local_dispatcher_.timeSource());
1917 0 : break;
1918 0 : }
1919 0 : case LoadBalancerType::Random: {
1920 0 : ASSERT(lb_factory_ == nullptr);
1921 0 : lb_ = std::make_unique<RandomLoadBalancer>(priority_set_, parent_.local_priority_set_,
1922 0 : cluster->lbStats(), parent.parent_.runtime_,
1923 0 : parent.parent_.random_, cluster->lbConfig());
1924 0 : break;
1925 0 : }
1926 0 : case LoadBalancerType::RoundRobin: {
1927 0 : ASSERT(lb_factory_ == nullptr);
1928 0 : lb_ = std::make_unique<RoundRobinLoadBalancer>(
1929 0 : priority_set_, parent_.local_priority_set_, cluster->lbStats(), parent.parent_.runtime_,
1930 0 : parent.parent_.random_, cluster->lbConfig(), cluster->lbRoundRobinConfig(),
1931 0 : parent.thread_local_dispatcher_.timeSource());
1932 0 : break;
1933 0 : }
1934 0 : case LoadBalancerType::ClusterProvided:
1935 306 : case LoadBalancerType::LoadBalancingPolicyConfig:
1936 306 : case LoadBalancerType::RingHash:
1937 306 : case LoadBalancerType::Maglev:
1938 306 : case LoadBalancerType::OriginalDst: {
1939 306 : ASSERT(lb_factory_ != nullptr);
1940 306 : lb_ = lb_factory_->create({priority_set_, parent_.local_priority_set_});
1941 306 : break;
1942 306 : }
1943 306 : }
1944 306 : }
1945 306 : }
1946 :
1947 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainOrCloseConnPools(
1948 306 : const HostSharedPtr& host, absl::optional<ConnectionPool::DrainBehavior> drain_behavior) {
1949 : // Drain or close any HTTP connection pool for the host.
1950 306 : {
1951 306 : const auto container = getHttpConnPoolsContainer(host);
1952 306 : if (container != nullptr) {
1953 0 : container->do_not_delete_ = true;
1954 0 : if (drain_behavior.has_value()) {
1955 0 : container->pools_->drainConnections(drain_behavior.value());
1956 0 : } else {
1957 : // TODO(wbpcode): 'CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE' and 'closeConnections'
1958 : // is only supported for TCP connection pools for now. Use 'DrainExistingConnections'
1959 : // drain here as alternative.
1960 0 : container->pools_->drainConnections(
1961 0 : ConnectionPool::DrainBehavior::DrainExistingConnections);
1962 0 : }
1963 0 : container->do_not_delete_ = false;
1964 :
1965 0 : if (container->pools_->empty()) {
1966 0 : host_http_conn_pool_map_.erase(host);
1967 0 : }
1968 0 : }
1969 306 : }
1970 : // Drain or close any TCP connection pool for the host.
1971 306 : {
1972 306 : const auto container = host_tcp_conn_pool_map_.find(host);
1973 306 : if (container != host_tcp_conn_pool_map_.end()) {
1974 : // Draining pools or closing connections can cause pool deletion if it becomes
1975 : // idle. Copy `pools_` so that we aren't iterating through a container that
1976 : // gets mutated by callbacks deleting from it.
1977 0 : std::vector<Tcp::ConnectionPool::Instance*> pools;
1978 0 : for (const auto& pair : container->second.pools_) {
1979 0 : pools.push_back(pair.second.get());
1980 0 : }
1981 :
1982 0 : for (auto* pool : pools) {
1983 0 : if (drain_behavior.has_value()) {
1984 0 : pool->drainConnections(drain_behavior.value());
1985 0 : } else {
1986 0 : pool->closeConnections();
1987 0 : }
1988 0 : }
1989 0 : }
1990 306 : }
1991 306 : }
1992 :
1993 306 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() {
1994 : // We need to drain all connection pools for the cluster being removed. Then we can remove the
1995 : // cluster.
1996 : //
1997 : // TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of
1998 : // the hosts inside of the HostImpl destructor. That is a change with wide implications, so we
1999 : // are going with a more targeted approach for now.
2000 306 : drainConnPools();
2001 306 : }
2002 :
2003 : Http::ConnectionPool::Instance*
2004 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl(
2005 : ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
2006 251 : LoadBalancerContext* context, bool peek) {
2007 251 : HostConstSharedPtr host = (peek ? peekAnotherHost(context) : chooseHost(context));
2008 251 : if (!host) {
2009 0 : if (!peek) {
2010 0 : ENVOY_LOG(debug, "no healthy host for HTTP connection pool");
2011 0 : cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
2012 0 : }
2013 0 : return nullptr;
2014 0 : }
2015 :
2016 : // Right now, HTTP, HTTP/2 and ALPN pools are considered separate.
2017 : // We could do better here, and always use the ALPN pool and simply make sure
2018 : // we end up on a connection of the correct protocol, but for simplicity we're
2019 : // starting with something simpler.
2020 251 : auto upstream_protocols = host->cluster().upstreamHttpProtocol(downstream_protocol);
2021 251 : std::vector<uint8_t> hash_key;
2022 251 : hash_key.reserve(upstream_protocols.size());
2023 251 : for (auto protocol : upstream_protocols) {
2024 251 : hash_key.push_back(uint8_t(protocol));
2025 251 : }
2026 :
2027 251 : absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>
2028 251 : alternate_protocol_options = host->cluster().alternateProtocolsCacheOptions();
2029 251 : Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
2030 251 : if (context) {
2031 : // Inherit socket options from downstream connection, if set.
2032 251 : if (context->downstreamConnection()) {
2033 183 : addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
2034 183 : }
2035 251 : addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
2036 251 : }
2037 :
2038 : // Use the socket options for computing connection pool hash key, if any.
2039 : // This allows socket options to control connection pooling so that connections with
2040 : // different options are not pooled together.
2041 251 : for (const auto& option : *upstream_options) {
2042 0 : option->hashKey(hash_key);
2043 0 : }
2044 :
2045 251 : bool have_transport_socket_options = false;
2046 251 : if (context && context->upstreamTransportSocketOptions()) {
2047 183 : host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
2048 183 : have_transport_socket_options = true;
2049 183 : }
2050 :
2051 : // If configured, use the downstream connection id in pool hash key
2052 251 : if (cluster_info_->connectionPoolPerDownstreamConnection() && context &&
2053 251 : context->downstreamConnection()) {
2054 0 : context->downstreamConnection()->hashKey(hash_key);
2055 0 : }
2056 :
2057 251 : ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);
2058 :
2059 : // Note: to simplify this, we assume that the factory is only called in the scope of this
2060 : // function. Otherwise, we'd need to capture a few of these variables by value.
2061 251 : ConnPoolsContainer::ConnPools::PoolOptRef pool =
2062 251 : container.pools_->getPool(priority, hash_key, [&]() {
2063 173 : auto pool = parent_.parent_.factory_.allocateConnPool(
2064 173 : parent_.thread_local_dispatcher_, host, priority, upstream_protocols,
2065 173 : alternate_protocol_options, !upstream_options->empty() ? upstream_options : nullptr,
2066 173 : have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
2067 173 : parent_.parent_.time_source_, parent_.cluster_manager_state_, quic_info_);
2068 :
2069 173 : pool->addIdleCallback([&parent = parent_, host, priority, hash_key]() {
2070 173 : parent.httpConnPoolIsIdle(host, priority, hash_key);
2071 173 : });
2072 :
2073 173 : return pool;
2074 173 : });
2075 :
2076 251 : if (pool.has_value()) {
2077 251 : return &(pool.value().get());
2078 251 : } else {
2079 0 : return nullptr;
2080 0 : }
2081 251 : }
2082 :
2083 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::httpConnPoolIsIdle(
2084 173 : HostConstSharedPtr host, ResourcePriority priority, const std::vector<uint8_t>& hash_key) {
2085 173 : if (destroying_) {
2086 : // If the Cluster is being destroyed, this pool will be cleaned up by that
2087 : // process.
2088 0 : return;
2089 0 : }
2090 :
2091 173 : ConnPoolsContainer* container = getHttpConnPoolsContainer(host);
2092 173 : if (container == nullptr) {
2093 : // This could happen if we have cleaned out the host before iterating through every
2094 : // connection pool. Handle it by just continuing.
2095 0 : return;
2096 0 : }
2097 :
2098 173 : ENVOY_LOG(trace, "Erasing idle pool for host {}", *host);
2099 173 : container->pools_->erasePool(priority, hash_key);
2100 :
2101 : // Guard deletion of the container with `do_not_delete_` to avoid deletion while
2102 : // iterating through the container in `container->pools_->startDrain()`. See
2103 : // comment in `ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools`.
2104 173 : if (!container->do_not_delete_ && container->pools_->empty()) {
2105 173 : ENVOY_LOG(trace, "Pool container empty for host {}, erasing host entry", *host);
2106 173 : host_http_conn_pool_map_.erase(
2107 173 : host); // NOTE: `container` is erased after this point in the lambda.
2108 173 : }
2109 173 : }
2110 :
2111 : HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::chooseHost(
2112 251 : LoadBalancerContext* context) {
2113 251 : auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
2114 251 : HostConstSharedPtr host = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
2115 251 : override_host_statuses_, context);
2116 251 : if (host != nullptr) {
2117 0 : return host;
2118 0 : }
2119 251 : if (!HostUtility::allowLBChooseHost(context)) {
2120 0 : return nullptr;
2121 0 : }
2122 251 : return lb_->chooseHost(context);
2123 251 : }
2124 :
2125 : HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::peekAnotherHost(
2126 0 : LoadBalancerContext* context) {
2127 0 : auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
2128 0 : HostConstSharedPtr host = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
2129 0 : override_host_statuses_, context);
2130 0 : if (host != nullptr) {
2131 0 : return host;
2132 0 : }
2133 0 : return lb_->peekAnotherHost(context);
2134 0 : }
2135 :
2136 : Tcp::ConnectionPool::Instance*
2137 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolImpl(
2138 0 : ResourcePriority priority, LoadBalancerContext* context, bool peek) {
2139 :
2140 0 : HostConstSharedPtr host = (peek ? peekAnotherHost(context) : chooseHost(context));
2141 0 : if (!host) {
2142 0 : if (!peek) {
2143 0 : ENVOY_LOG(debug, "no healthy host for TCP connection pool");
2144 0 : cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
2145 0 : }
2146 0 : return nullptr;
2147 0 : }
2148 :
2149 : // Inherit socket options from downstream connection, if set.
2150 0 : std::vector<uint8_t> hash_key = {uint8_t(priority)};
2151 :
2152 : // Use downstream connection socket options for computing connection pool hash key, if any.
2153 : // This allows socket options to control connection pooling so that connections with
2154 : // different options are not pooled together.
2155 0 : Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
2156 0 : if (context) {
2157 0 : if (context->downstreamConnection()) {
2158 0 : addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
2159 0 : }
2160 0 : addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
2161 0 : }
2162 :
2163 0 : for (const auto& option : *upstream_options) {
2164 0 : option->hashKey(hash_key);
2165 0 : }
2166 :
2167 0 : bool have_transport_socket_options = false;
2168 0 : if (context != nullptr && context->upstreamTransportSocketOptions() != nullptr) {
2169 0 : have_transport_socket_options = true;
2170 0 : host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
2171 0 : }
2172 :
2173 0 : auto container_iter = parent_.host_tcp_conn_pool_map_.find(host);
2174 0 : if (container_iter == parent_.host_tcp_conn_pool_map_.end()) {
2175 0 : container_iter = parent_.host_tcp_conn_pool_map_.try_emplace(host, host->acquireHandle()).first;
2176 0 : }
2177 0 : TcpConnPoolsContainer& container = container_iter->second;
2178 0 : auto pool_iter = container.pools_.find(hash_key);
2179 0 : if (pool_iter == container.pools_.end()) {
2180 0 : bool inserted;
2181 0 : std::tie(pool_iter, inserted) = container.pools_.emplace(
2182 0 : hash_key,
2183 0 : parent_.parent_.factory_.allocateTcpConnPool(
2184 0 : parent_.thread_local_dispatcher_, host, priority,
2185 0 : !upstream_options->empty() ? upstream_options : nullptr,
2186 0 : have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
2187 0 : parent_.cluster_manager_state_, cluster_info_->tcpPoolIdleTimeout()));
2188 0 : ASSERT(inserted);
2189 0 : pool_iter->second->addIdleCallback(
2190 0 : [&parent = parent_, host, hash_key]() { parent.tcpConnPoolIsIdle(host, hash_key); });
2191 0 : }
2192 :
2193 0 : return pool_iter->second.get();
2194 0 : }
2195 :
2196 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::tcpConnPoolIsIdle(
2197 0 : HostConstSharedPtr host, const std::vector<uint8_t>& hash_key) {
2198 0 : if (destroying_) {
2199 : // If the Cluster is being destroyed, this pool will be cleaned up by that process.
2200 0 : return;
2201 0 : }
2202 :
2203 0 : auto it = host_tcp_conn_pool_map_.find(host);
2204 0 : if (it != host_tcp_conn_pool_map_.end()) {
2205 0 : TcpConnPoolsContainer& container = it->second;
2206 :
2207 0 : auto erase_iter = container.pools_.find(hash_key);
2208 0 : if (erase_iter != container.pools_.end()) {
2209 0 : ENVOY_LOG(trace, "Idle pool, erasing pool for host {}", *host);
2210 0 : thread_local_dispatcher_.deferredDelete(std::move(erase_iter->second));
2211 0 : container.pools_.erase(erase_iter);
2212 0 : }
2213 :
2214 0 : if (container.pools_.empty()) {
2215 0 : host_tcp_conn_pool_map_.erase(
2216 0 : host); // NOTE: `container` is erased after this point in the lambda.
2217 0 : }
2218 0 : }
2219 0 : }
2220 :
2221 : ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto(
2222 131 : const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
2223 131 : auto cluster_manager_impl = std::unique_ptr<ClusterManagerImpl>{new ClusterManagerImpl(
2224 131 : bootstrap, *this, stats_, tls_, context_.runtime(), context_.localInfo(),
2225 131 : context_.accessLogManager(), context_.mainThreadDispatcher(), context_.admin(),
2226 131 : context_.messageValidationContext(), context_.api(), http_context_, context_.grpcContext(),
2227 131 : context_.routerContext(), server_)};
2228 131 : THROW_IF_NOT_OK(cluster_manager_impl->init(bootstrap));
2229 131 : return cluster_manager_impl;
2230 131 : }
2231 :
2232 : Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
2233 : Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
2234 : std::vector<Http::Protocol>& protocols,
2235 : const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>&
2236 : alternate_protocol_options,
2237 : const Network::ConnectionSocket::OptionsSharedPtr& options,
2238 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
2239 173 : TimeSource& source, ClusterConnectivityState& state, Http::PersistentQuicInfoPtr& quic_info) {
2240 :
2241 173 : Http::HttpServerPropertiesCacheSharedPtr alternate_protocols_cache;
2242 173 : if (alternate_protocol_options.has_value()) {
2243 : // If there is configuration for an alternate protocols cache, always create one.
2244 0 : alternate_protocols_cache = alternate_protocols_cache_manager_->getCache(
2245 0 : alternate_protocol_options.value(), dispatcher);
2246 173 : } else if (!alternate_protocol_options.has_value() &&
2247 173 : (protocols.size() == 2 ||
2248 173 : (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2))) {
2249 : // If there is no configuration for an alternate protocols cache, still
2250 : // create one if there's an HTTP/2 upstream (either explicitly, or for mixed
2251 : // HTTP/1.1 and HTTP/2 pools) to track the max concurrent streams across
2252 : // connections.
2253 105 : envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
2254 105 : default_options.set_name(host->cluster().name());
2255 105 : alternate_protocols_cache =
2256 105 : alternate_protocols_cache_manager_->getCache(default_options, dispatcher);
2257 105 : }
2258 :
2259 173 : absl::optional<Http::HttpServerPropertiesCache::Origin> origin =
2260 173 : getOrigin(transport_socket_options, host);
2261 173 : if (protocols.size() == 3 &&
2262 173 : context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100) &&
2263 173 : !transport_socket_options->http11ProxyInfo()) {
2264 0 : ASSERT(contains(protocols,
2265 0 : {Http::Protocol::Http11, Http::Protocol::Http2, Http::Protocol::Http3}));
2266 0 : ASSERT(alternate_protocol_options.has_value());
2267 0 : ASSERT(alternate_protocols_cache);
2268 0 : #ifdef ENVOY_ENABLE_QUIC
2269 0 : Envoy::Http::ConnectivityGrid::ConnectivityOptions coptions{protocols};
2270 0 : if (quic_info == nullptr) {
2271 0 : quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster());
2272 0 : }
2273 0 : return std::make_unique<Http::ConnectivityGrid>(
2274 0 : dispatcher, context_.api().randomGenerator(), host, priority, options,
2275 0 : transport_socket_options, state, source, alternate_protocols_cache, coptions,
2276 0 : quic_stat_names_, *stats_.rootScope(), *quic_info);
2277 : #else
2278 : (void)quic_info;
2279 : // Should be blocked by configuration checking at an earlier point.
2280 : PANIC("unexpected");
2281 : #endif
2282 0 : }
2283 173 : if (protocols.size() >= 2) {
2284 0 : if (origin.has_value()) {
2285 0 : envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
2286 0 : default_options.set_name(host->cluster().name());
2287 0 : alternate_protocols_cache =
2288 0 : alternate_protocols_cache_manager_->getCache(default_options, dispatcher);
2289 0 : }
2290 :
2291 0 : ASSERT(contains(protocols, {Http::Protocol::Http11, Http::Protocol::Http2}));
2292 0 : return std::make_unique<Http::HttpConnPoolImplMixed>(
2293 0 : dispatcher, context_.api().randomGenerator(), host, priority, options,
2294 0 : transport_socket_options, state, origin, alternate_protocols_cache);
2295 0 : }
2296 173 : if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2 &&
2297 173 : context_.runtime().snapshot().featureEnabled("upstream.use_http2", 100)) {
2298 105 : return Http::Http2::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
2299 105 : priority, options, transport_socket_options, state, origin,
2300 105 : alternate_protocols_cache);
2301 105 : }
2302 68 : if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http3 &&
2303 68 : context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
2304 0 : #ifdef ENVOY_ENABLE_QUIC
2305 0 : if (quic_info == nullptr) {
2306 0 : quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster());
2307 0 : }
2308 0 : return Http::Http3::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
2309 0 : priority, options, transport_socket_options, state,
2310 0 : quic_stat_names_, {}, *stats_.rootScope(), {}, *quic_info);
2311 : #else
2312 : UNREFERENCED_PARAMETER(source);
2313 : // Should be blocked by configuration checking at an earlier point.
2314 : PANIC("unexpected");
2315 : #endif
2316 0 : }
2317 68 : ASSERT(protocols.size() == 1 && protocols[0] == Http::Protocol::Http11);
2318 68 : return Http::Http1::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority,
2319 68 : options, transport_socket_options, state);
2320 68 : }
2321 :
2322 : Tcp::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateTcpConnPool(
2323 : Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
2324 : const Network::ConnectionSocket::OptionsSharedPtr& options,
2325 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
2326 : ClusterConnectivityState& state,
2327 0 : absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout) {
2328 0 : ENVOY_LOG_MISC(debug, "Allocating TCP conn pool");
2329 0 : return std::make_unique<Tcp::ConnPoolImpl>(
2330 0 : dispatcher, host, priority, options, transport_socket_options, state, tcp_pool_idle_timeout);
2331 0 : }
2332 :
2333 : absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
2334 : ProdClusterManagerFactory::clusterFromProto(const envoy::config::cluster::v3::Cluster& cluster,
2335 : ClusterManager& cm,
2336 : Outlier::EventLoggerSharedPtr outlier_event_logger,
2337 159 : bool added_via_api) {
2338 159 : return ClusterFactoryImplBase::create(cluster, context_, cm, dns_resolver_fn_,
2339 159 : ssl_context_manager_, outlier_event_logger, added_via_api);
2340 159 : }
2341 :
2342 : CdsApiPtr
2343 : ProdClusterManagerFactory::createCds(const envoy::config::core::v3::ConfigSource& cds_config,
2344 : const xds::core::v3::ResourceLocator* cds_resources_locator,
2345 28 : ClusterManager& cm) {
2346 : // TODO(htuch): Differentiate static vs. dynamic validation visitors.
2347 28 : return CdsApiImpl::create(cds_config, cds_resources_locator, cm, *stats_.rootScope(),
2348 28 : context_.messageValidationContext().dynamicValidationVisitor());
2349 28 : }
2350 :
2351 : } // namespace Upstream
2352 : } // namespace Envoy
|