LCOV - code coverage report
Current view: top level - source/common/upstream - cluster_manager_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 767 1575 48.7 %
Date: 2024-01-05 06:35:25 Functions: 63 113 55.8 %

          Line data    Source code
       1             : #include "source/common/upstream/cluster_manager_impl.h"
       2             : 
       3             : #include <chrono>
       4             : #include <cstdint>
       5             : #include <functional>
       6             : #include <memory>
       7             : #include <string>
       8             : #include <vector>
       9             : 
      10             : #include "envoy/admin/v3/config_dump.pb.h"
      11             : #include "envoy/config/bootstrap/v3/bootstrap.pb.h"
      12             : #include "envoy/config/cluster/v3/cluster.pb.h"
      13             : #include "envoy/config/core/v3/config_source.pb.h"
      14             : #include "envoy/config/core/v3/protocol.pb.h"
      15             : #include "envoy/config/xds_resources_delegate.h"
      16             : #include "envoy/event/dispatcher.h"
      17             : #include "envoy/network/dns.h"
      18             : #include "envoy/runtime/runtime.h"
      19             : #include "envoy/stats/scope.h"
      20             : #include "envoy/tcp/async_tcp_client.h"
      21             : #include "envoy/upstream/load_balancer.h"
      22             : #include "envoy/upstream/upstream.h"
      23             : 
      24             : #include "source/common/common/assert.h"
      25             : #include "source/common/common/enum_to_int.h"
      26             : #include "source/common/common/fmt.h"
      27             : #include "source/common/common/utility.h"
      28             : #include "source/common/config/custom_config_validators_impl.h"
      29             : #include "source/common/config/null_grpc_mux_impl.h"
      30             : #include "source/common/config/utility.h"
      31             : #include "source/common/config/xds_resource.h"
      32             : #include "source/common/grpc/async_client_manager_impl.h"
      33             : #include "source/common/http/async_client_impl.h"
      34             : #include "source/common/http/http1/conn_pool.h"
      35             : #include "source/common/http/http2/conn_pool.h"
      36             : #include "source/common/http/mixed_conn_pool.h"
      37             : #include "source/common/network/resolver_impl.h"
      38             : #include "source/common/network/utility.h"
      39             : #include "source/common/protobuf/utility.h"
      40             : #include "source/common/router/shadow_writer_impl.h"
      41             : #include "source/common/runtime/runtime_features.h"
      42             : #include "source/common/tcp/conn_pool.h"
      43             : #include "source/common/upstream/cds_api_impl.h"
      44             : #include "source/common/upstream/cluster_factory_impl.h"
      45             : #include "source/common/upstream/load_balancer_impl.h"
      46             : #include "source/common/upstream/priority_conn_pool_map_impl.h"
      47             : 
      48             : #ifdef ENVOY_ENABLE_QUIC
      49             : #include "source/common/http/conn_pool_grid.h"
      50             : #include "source/common/http/http3/conn_pool.h"
      51             : #include "source/common/quic/client_connection_factory_impl.h"
      52             : #endif
      53             : 
      54             : namespace Envoy {
      55             : namespace Upstream {
      56             : namespace {
      57             : 
      58             : void addOptionsIfNotNull(Network::Socket::OptionsSharedPtr& options,
      59         434 :                          const Network::Socket::OptionsSharedPtr& to_add) {
      60         434 :   if (to_add != nullptr) {
      61         183 :     Network::Socket::appendOptions(options, to_add);
      62         183 :   }
      63         434 : }
      64             : 
      65             : // Helper function to make sure each protocol in expected_protocols is present
      66             : // in protocols (only used for an ASSERT in debug builds)
      67             : bool contains(const std::vector<Http::Protocol>& protocols,
      68           0 :               const std::vector<Http::Protocol>& expected_protocols) {
      69           0 :   for (auto protocol : expected_protocols) {
      70           0 :     if (std::find(protocols.begin(), protocols.end(), protocol) == protocols.end()) {
      71           0 :       return false;
      72           0 :     }
      73           0 :   }
      74           0 :   return true;
      75           0 : }
      76             : 
      77             : absl::optional<Http::HttpServerPropertiesCache::Origin>
      78         173 : getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostConstSharedPtr host) {
      79         173 :   std::string sni = std::string(host->transportSocketFactory().defaultServerNameIndication());
      80         173 :   if (options && options->serverNameOverride().has_value()) {
      81           0 :     sni = options->serverNameOverride().value();
      82           0 :   }
      83         173 :   if (sni.empty() || !host->address() || !host->address()->ip()) {
      84         173 :     return absl::nullopt;
      85         173 :   }
      86           0 :   return {{"https", sni, host->address()->ip()->port()}};
      87         173 : }
      88             : 
      89             : bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
      90         131 :                           absl::string_view cluster_name) {
      91         131 :   if (bootstrap.dynamic_resources().has_ads_config()) {
      92          56 :     const auto& ads_config_source = bootstrap.dynamic_resources().ads_config();
      93             :     // We only care about EnvoyGrpc, not GoogleGrpc, because we only need to delay ADS mux
      94             :     // initialization if it uses an Envoy cluster that needs to be initialized first. We don't
      95             :     // depend on the same cluster initialization when opening a gRPC stream for GoogleGrpc.
      96          56 :     return (ads_config_source.grpc_services_size() > 0 &&
      97          56 :             ads_config_source.grpc_services(0).has_envoy_grpc() &&
      98          56 :             ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
      99          56 :   }
     100          75 :   return false;
     101         131 : }
     102             : 
     103             : } // namespace
     104             : 
     105         159 : void ClusterManagerInitHelper::addCluster(ClusterManagerCluster& cm_cluster) {
     106             :   // See comments in ClusterManagerImpl::addOrUpdateCluster() for why this is only called during
     107             :   // server initialization.
     108         159 :   ASSERT(state_ != State::AllClustersInitialized);
     109             : 
     110         159 :   const auto initialize_cb = [&cm_cluster, this] {
     111         131 :     onClusterInit(cm_cluster);
     112         131 :     cm_cluster.cluster().info()->configUpdateStats().warming_state_.set(0);
     113         131 :   };
     114         159 :   Cluster& cluster = cm_cluster.cluster();
     115             : 
     116         159 :   cluster.info()->configUpdateStats().warming_state_.set(1);
     117         159 :   if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
     118             :     // Remove the previous cluster before the cluster object is destroyed.
     119         131 :     primary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
     120         131 :     cluster.initialize(initialize_cb);
     121         131 :   } else {
     122          28 :     ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
     123             :     // Remove the previous cluster before the cluster object is destroyed.
     124          28 :     secondary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
     125          28 :     if (started_secondary_initialize_) {
     126             :       // This can happen if we get a second CDS update that adds new clusters after we have
     127             :       // already started secondary init. In this case, just immediately initialize.
     128           0 :       cluster.initialize(initialize_cb);
     129           0 :     }
     130          28 :   }
     131             : 
     132         159 :   ENVOY_LOG(debug, "cm init: adding: cluster={} primary={} secondary={}", cluster.info()->name(),
     133         159 :             primary_init_clusters_.size(), secondary_init_clusters_.size());
     134         159 : }
     135             : 
     136         159 : void ClusterManagerInitHelper::onClusterInit(ClusterManagerCluster& cluster) {
     137         159 :   ASSERT(state_ != State::AllClustersInitialized);
     138         159 :   per_cluster_init_callback_(cluster);
     139         159 :   removeCluster(cluster);
     140         159 : }
     141             : 
     142         159 : void ClusterManagerInitHelper::removeCluster(ClusterManagerCluster& cluster) {
     143         159 :   if (state_ == State::AllClustersInitialized) {
     144           0 :     return;
     145           0 :   }
     146             : 
     147             :   // There is a remote edge case where we can remove a cluster via CDS that has not yet been
     148             :   // initialized. When called via the remove cluster API this code catches that case.
     149         159 :   absl::flat_hash_map<std::string, ClusterManagerCluster*>* cluster_map;
     150         159 :   if (cluster.cluster().initializePhase() == Cluster::InitializePhase::Primary) {
     151         131 :     cluster_map = &primary_init_clusters_;
     152         131 :   } else {
     153          28 :     ASSERT(cluster.cluster().initializePhase() == Cluster::InitializePhase::Secondary);
     154          28 :     cluster_map = &secondary_init_clusters_;
     155          28 :   }
     156             : 
     157             :   // It is possible that the cluster we are removing has already been initialized, and is not
     158             :   // present in the initializer map. If so, this is fine as a CDS update may happen for a
     159             :   // cluster with the same name. See the case "UpdateAlreadyInitialized" of the
     160             :   // target //test/common/upstream:cluster_manager_impl_test.
     161         159 :   auto iter = cluster_map->find(cluster.cluster().info()->name());
     162         159 :   if (iter != cluster_map->end() && iter->second == &cluster) {
     163         159 :     cluster_map->erase(iter);
     164         159 :   }
     165         159 :   ENVOY_LOG(debug, "cm init: init complete: cluster={} primary={} secondary={}",
     166         159 :             cluster.cluster().info()->name(), primary_init_clusters_.size(),
     167         159 :             secondary_init_clusters_.size());
     168         159 :   maybeFinishInitialize();
     169         159 : }
     170             : 
     171          28 : void ClusterManagerInitHelper::initializeSecondaryClusters() {
     172          28 :   started_secondary_initialize_ = true;
     173             :   // Cluster::initialize() method can modify the map of secondary_init_clusters_ to remove
     174             :   // the item currently being initialized, so we eschew range-based-for and do this complicated
     175             :   // dance to increment the iterator before calling initialize.
     176          56 :   for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) {
     177          28 :     ClusterManagerCluster* cluster = iter->second;
     178          28 :     ENVOY_LOG(debug, "initializing secondary cluster {}", iter->first);
     179          28 :     ++iter;
     180          28 :     cluster->cluster().initialize([cluster, this] { onClusterInit(*cluster); });
     181          28 :   }
     182          28 : }
     183             : 
     184         426 : void ClusterManagerInitHelper::maybeFinishInitialize() {
     185             :   // Do not do anything if we are still doing the initial static load or if we are waiting for
     186             :   // CDS initialize.
     187         426 :   ENVOY_LOG(debug, "maybe finish initialize state: {}", enumToInt(state_));
     188         426 :   if (state_ == State::Loading || state_ == State::WaitingToStartCdsInitialization) {
     189         131 :     return;
     190         131 :   }
     191             : 
     192         295 :   ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
     193         295 :          state_ == State::CdsInitialized ||
     194         295 :          state_ == State::WaitingForPrimaryInitializationToComplete);
     195         295 :   ENVOY_LOG(debug, "maybe finish initialize primary init clusters empty: {}",
     196         295 :             primary_init_clusters_.empty());
     197             :   // If we are still waiting for primary clusters to initialize, do nothing.
     198         295 :   if (!primary_init_clusters_.empty()) {
     199           0 :     return;
     200         295 :   } else if (state_ == State::WaitingForPrimaryInitializationToComplete) {
     201         129 :     state_ = State::WaitingToStartSecondaryInitialization;
     202         129 :     if (primary_clusters_initialized_callback_) {
     203           0 :       primary_clusters_initialized_callback_();
     204           0 :     }
     205         129 :     return;
     206         129 :   }
     207             : 
     208             :   // If we are still waiting for secondary clusters to initialize, see if we need to first call
     209             :   // initialize on them. This is only done once.
     210         166 :   ENVOY_LOG(debug, "maybe finish initialize secondary init clusters empty: {}",
     211         166 :             secondary_init_clusters_.empty());
     212         166 :   if (!secondary_init_clusters_.empty()) {
     213          28 :     if (!started_secondary_initialize_) {
     214          28 :       ENVOY_LOG(info, "cm init: initializing secondary clusters");
     215             :       // If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
     216             :       // should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
     217             :       // avoid double pause ClusterLoadAssignment.
     218          28 :       Config::ScopedResume maybe_resume_eds_leds_sds;
     219          28 :       if (cm_.adsMux()) {
     220          28 :         const std::vector<std::string> paused_xds_types{
     221          28 :             Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(),
     222          28 :             Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>(),
     223          28 :             Config::getTypeUrl<envoy::extensions::transport_sockets::tls::v3::Secret>()};
     224          28 :         maybe_resume_eds_leds_sds = cm_.adsMux()->pause(paused_xds_types);
     225          28 :       }
     226          28 :       initializeSecondaryClusters();
     227          28 :     }
     228          28 :     return;
     229          28 :   }
     230             : 
     231             :   // At this point, if we are doing static init, and we have CDS, start CDS init. Otherwise, move
     232             :   // directly to initialized.
     233         138 :   started_secondary_initialize_ = false;
     234         138 :   ENVOY_LOG(debug, "maybe finish initialize cds api ready: {}", cds_ != nullptr);
     235         138 :   if (state_ == State::WaitingToStartSecondaryInitialization && cds_) {
     236          28 :     ENVOY_LOG(info, "cm init: initializing cds");
     237          28 :     state_ = State::WaitingToStartCdsInitialization;
     238          28 :     cds_->initialize();
     239         110 :   } else {
     240         110 :     ENVOY_LOG(info, "cm init: all clusters initialized");
     241         110 :     state_ = State::AllClustersInitialized;
     242         110 :     if (initialized_callback_) {
     243          28 :       initialized_callback_();
     244          28 :     }
     245         110 :   }
     246         138 : }
     247             : 
     248         129 : void ClusterManagerInitHelper::onStaticLoadComplete() {
     249         129 :   ASSERT(state_ == State::Loading);
     250             :   // After initialization of primary clusters has completed, transition to
     251             :   // waiting for signal to initialize secondary clusters and then CDS.
     252         129 :   state_ = State::WaitingForPrimaryInitializationToComplete;
     253         129 :   maybeFinishInitialize();
     254         129 : }
     255             : 
     256         110 : void ClusterManagerInitHelper::startInitializingSecondaryClusters() {
     257         110 :   ASSERT(state_ == State::WaitingToStartSecondaryInitialization);
     258         110 :   ENVOY_LOG(debug, "continue initializing secondary clusters");
     259         110 :   maybeFinishInitialize();
     260         110 : }
     261             : 
     262         129 : void ClusterManagerInitHelper::setCds(CdsApi* cds) {
     263         129 :   ASSERT(state_ == State::Loading);
     264         129 :   cds_ = cds;
     265         129 :   if (cds_) {
     266          28 :     cds_->setInitializedCb([this]() -> void {
     267          28 :       ASSERT(state_ == State::WaitingToStartCdsInitialization);
     268          28 :       state_ = State::CdsInitialized;
     269          28 :       maybeFinishInitialize();
     270          28 :     });
     271          28 :   }
     272         129 : }
     273             : 
     274             : void ClusterManagerInitHelper::setInitializedCb(
     275          98 :     ClusterManager::InitializationCompleteCallback callback) {
     276          98 :   if (state_ == State::AllClustersInitialized) {
     277          70 :     callback();
     278          98 :   } else {
     279          28 :     initialized_callback_ = callback;
     280          28 :   }
     281          98 : }
     282             : 
     283             : void ClusterManagerInitHelper::setPrimaryClustersInitializedCb(
     284         111 :     ClusterManager::PrimaryClustersReadyCallback callback) {
     285             :   // The callback must be set before or at the `WaitingToStartSecondaryInitialization` state.
     286         111 :   ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
     287         111 :          state_ == State::WaitingForPrimaryInitializationToComplete || state_ == State::Loading);
     288         111 :   if (state_ == State::WaitingToStartSecondaryInitialization) {
     289             :     // This is the case where all clusters are STATIC and without health checking.
     290         111 :     callback();
     291         111 :   } else {
     292           0 :     primary_clusters_initialized_callback_ = callback;
     293           0 :   }
     294         111 : }
     295             : 
     296             : ClusterManagerImpl::ClusterManagerImpl(
     297             :     const envoy::config::bootstrap::v3::Bootstrap& bootstrap, ClusterManagerFactory& factory,
     298             :     Stats::Store& stats, ThreadLocal::Instance& tls, Runtime::Loader& runtime,
     299             :     const LocalInfo::LocalInfo& local_info, AccessLog::AccessLogManager& log_manager,
     300             :     Event::Dispatcher& main_thread_dispatcher, OptRef<Server::Admin> admin,
     301             :     ProtobufMessage::ValidationContext& validation_context, Api::Api& api,
     302             :     Http::Context& http_context, Grpc::Context& grpc_context, Router::Context& router_context,
     303             :     const Server::Instance& server)
     304             :     : server_(server), factory_(factory), runtime_(runtime), stats_(stats), tls_(tls),
     305             :       random_(api.randomGenerator()),
     306             :       deferred_cluster_creation_(bootstrap.cluster_manager().enable_deferred_cluster_creation()),
     307             :       bind_config_(bootstrap.cluster_manager().has_upstream_bind_config()
     308             :                        ? absl::make_optional(bootstrap.cluster_manager().upstream_bind_config())
     309             :                        : absl::nullopt),
     310             :       local_info_(local_info), cm_stats_(generateStats(*stats.rootScope())),
     311         159 :       init_helper_(*this, [this](ClusterManagerCluster& cluster) { onClusterInit(cluster); }),
     312             :       time_source_(main_thread_dispatcher.timeSource()), dispatcher_(main_thread_dispatcher),
     313             :       http_context_(http_context), validation_context_(validation_context),
     314             :       router_context_(router_context), cluster_stat_names_(stats.symbolTable()),
     315             :       cluster_config_update_stat_names_(stats.symbolTable()),
     316             :       cluster_lb_stat_names_(stats.symbolTable()),
     317             :       cluster_endpoint_stat_names_(stats.symbolTable()),
     318             :       cluster_load_report_stat_names_(stats.symbolTable()),
     319             :       cluster_circuit_breakers_stat_names_(stats.symbolTable()),
     320             :       cluster_request_response_size_stat_names_(stats.symbolTable()),
     321             :       cluster_timeout_budget_stat_names_(stats.symbolTable()),
     322             :       common_lb_config_pool_(
     323             :           std::make_shared<SharedPool::ObjectSharedPool<
     324             :               const envoy::config::cluster::v3::Cluster::CommonLbConfig, MessageUtil, MessageUtil>>(
     325             :               main_thread_dispatcher)),
     326         131 :       shutdown_(false) {
     327         131 :   if (admin.has_value()) {
     328         131 :     config_tracker_entry_ = admin->getConfigTracker().add(
     329         131 :         "clusters", [this](const Matchers::StringMatcher& name_matcher) {
     330          98 :           return dumpClusterConfigs(name_matcher);
     331          98 :         });
     332         131 :   }
     333         131 :   async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
     334         131 :       *this, tls, time_source_, api, grpc_context.statNames(),
     335         131 :       bootstrap.grpc_async_client_manager_config());
     336         131 :   const auto& cm_config = bootstrap.cluster_manager();
     337         131 :   if (cm_config.has_outlier_detection()) {
     338           0 :     const std::string event_log_file_path = cm_config.outlier_detection().event_log_path();
     339           0 :     if (!event_log_file_path.empty()) {
     340           0 :       outlier_event_logger_ = std::make_shared<Outlier::EventLoggerImpl>(
     341           0 :           log_manager, event_log_file_path, time_source_);
     342           0 :     }
     343           0 :   }
     344             : 
     345             :   // We need to know whether we're zone aware early on, so make sure we do this lookup
     346             :   // before we load any clusters.
     347         131 :   if (!cm_config.local_cluster_name().empty()) {
     348           0 :     local_cluster_name_ = cm_config.local_cluster_name();
     349           0 :   }
     350             : 
     351             :   // Initialize the XdsResourceDelegate extension, if set on the bootstrap config.
     352         131 :   if (bootstrap.has_xds_delegate_extension()) {
     353           0 :     auto& factory = Config::Utility::getAndCheckFactory<Config::XdsResourcesDelegateFactory>(
     354           0 :         bootstrap.xds_delegate_extension());
     355           0 :     xds_resources_delegate_ = factory.createXdsResourcesDelegate(
     356           0 :         bootstrap.xds_delegate_extension().typed_config(),
     357           0 :         validation_context.dynamicValidationVisitor(), api, main_thread_dispatcher);
     358           0 :   }
     359             : 
     360         131 :   if (bootstrap.has_xds_config_tracker_extension()) {
     361           0 :     auto& tracer_factory = Config::Utility::getAndCheckFactory<Config::XdsConfigTrackerFactory>(
     362           0 :         bootstrap.xds_config_tracker_extension());
     363           0 :     xds_config_tracker_ = tracer_factory.createXdsConfigTracker(
     364           0 :         bootstrap.xds_config_tracker_extension().typed_config(),
     365           0 :         validation_context.dynamicValidationVisitor(), api, main_thread_dispatcher);
     366           0 :   }
     367             : 
     368         131 :   subscription_factory_ = std::make_unique<Config::SubscriptionFactoryImpl>(
     369         131 :       local_info, main_thread_dispatcher, *this, validation_context.dynamicValidationVisitor(), api,
     370         131 :       server, makeOptRefFromPtr(xds_resources_delegate_.get()),
     371         131 :       makeOptRefFromPtr(xds_config_tracker_.get()));
     372         131 : }
     373             : 
     374         131 : absl::Status ClusterManagerImpl::init(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
     375         131 :   ASSERT(!initialized_);
     376         131 :   initialized_ = true;
     377             : 
     378             :   // Cluster loading happens in two phases: first all the primary clusters are loaded, and then all
     379             :   // the secondary clusters are loaded. As it currently stands all non-EDS clusters and EDS which
     380             :   // load endpoint definition from file are primary and
     381             :   // (REST,GRPC,DELTA_GRPC) EDS clusters are secondary. This two phase
     382             :   // loading is done because in v2 configuration each EDS cluster individually sets up a
     383             :   // subscription. When this subscription is an API source the cluster will depend on a non-EDS
     384             :   // cluster, so the non-EDS clusters must be loaded first.
     385         295 :   auto is_primary_cluster = [](const envoy::config::cluster::v3::Cluster& cluster) -> bool {
     386         262 :     return cluster.type() != envoy::config::cluster::v3::Cluster::EDS ||
     387         262 :            (cluster.type() == envoy::config::cluster::v3::Cluster::EDS &&
     388           0 :             Config::SubscriptionFactory::isPathBasedConfigSource(
     389           0 :                 cluster.eds_cluster_config().eds_config().config_source_specifier_case()));
     390         262 :   };
     391             :   // Build book-keeping for which clusters are primary. This is useful when we
     392             :   // invoke loadCluster() below and it needs the complete set of primaries.
     393         164 :   for (const auto& cluster : bootstrap.static_resources().clusters()) {
     394         131 :     if (is_primary_cluster(cluster)) {
     395         131 :       primary_clusters_.insert(cluster.name());
     396         131 :     }
     397         131 :   }
     398             : 
     399         131 :   bool has_ads_cluster = false;
     400             :   // Load all the primary clusters.
     401         164 :   for (const auto& cluster : bootstrap.static_resources().clusters()) {
     402         131 :     if (is_primary_cluster(cluster)) {
     403         131 :       const bool required_for_ads = isBlockingAdsCluster(bootstrap, cluster.name());
     404         131 :       has_ads_cluster |= required_for_ads;
     405             :       // TODO(abeyad): Consider passing a lambda for a "post-cluster-init" callback, which would
     406             :       // include a conditional ads_mux_->start() call, if other uses cases for "post-cluster-init"
     407             :       // functionality pops up.
     408         131 :       auto status_or_cluster =
     409         131 :           loadCluster(cluster, MessageUtil::hash(cluster), "", /*added_via_api=*/false,
     410         131 :                       required_for_ads, active_clusters_);
     411         131 :       RETURN_IF_STATUS_NOT_OK(status_or_cluster);
     412         131 :     }
     413         131 :   }
     414             : 
     415         131 :   const auto& dyn_resources = bootstrap.dynamic_resources();
     416             : 
     417             :   // Now setup ADS if needed, this might rely on a primary cluster.
     418             :   // This is the only point where distinction between delta ADS and state-of-the-world ADS is made.
     419             :   // After here, we just have a GrpcMux interface held in ads_mux_, which hides
     420             :   // whether the backing implementation is delta or SotW.
     421         131 :   if (dyn_resources.has_ads_config()) {
     422          31 :     Config::CustomConfigValidatorsPtr custom_config_validators =
     423          31 :         std::make_unique<Config::CustomConfigValidatorsImpl>(
     424          31 :             validation_context_.dynamicValidationVisitor(), server_,
     425          31 :             dyn_resources.ads_config().config_validators());
     426             : 
     427          31 :     JitteredExponentialBackOffStrategyPtr backoff_strategy =
     428          31 :         Config::Utility::prepareJitteredExponentialBackOffStrategy(
     429          31 :             dyn_resources.ads_config(), random_,
     430          31 :             Envoy::Config::SubscriptionFactory::RetryInitialDelayMs,
     431          31 :             Envoy::Config::SubscriptionFactory::RetryMaxDelayMs);
     432             : 
     433          31 :     const bool use_eds_cache =
     434          31 :         Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads");
     435          31 :     if (dyn_resources.ads_config().api_type() ==
     436          31 :         envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
     437           8 :       absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
     438           8 :       RETURN_IF_NOT_OK(status);
     439           8 :       std::string name;
     440           8 :       if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
     441           4 :         name = "envoy.config_mux.delta_grpc_mux_factory";
     442           4 :       } else {
     443           4 :         name = "envoy.config_mux.new_grpc_mux_factory";
     444           4 :       }
     445           8 :       auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
     446           8 :       if (!factory) {
     447           0 :         return absl::InvalidArgumentError(fmt::format("{} not found", name));
     448           0 :       }
     449           8 :       ads_mux_ = factory->create(
     450           8 :           Config::Utility::factoryForGrpcApiConfigSource(
     451           8 :               *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false)
     452           8 :               ->createUncachedRawAsyncClient(),
     453           8 :           dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
     454           8 :           std::move(custom_config_validators), std::move(backoff_strategy),
     455           8 :           makeOptRefFromPtr(xds_config_tracker_.get()), {}, use_eds_cache);
     456          23 :     } else {
     457          23 :       absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
     458          23 :       RETURN_IF_NOT_OK(status);
     459          23 :       auto xds_delegate_opt_ref = makeOptRefFromPtr(xds_resources_delegate_.get());
     460          23 :       std::string name;
     461          23 :       if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
     462          10 :         name = "envoy.config_mux.sotw_grpc_mux_factory";
     463          13 :       } else {
     464          13 :         name = "envoy.config_mux.grpc_mux_factory";
     465          13 :       }
     466             : 
     467          23 :       auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
     468          23 :       if (!factory) {
     469           0 :         return absl::InvalidArgumentError(fmt::format("{} not found", name));
     470           0 :       }
     471          23 :       ads_mux_ = factory->create(
     472          23 :           Config::Utility::factoryForGrpcApiConfigSource(
     473          23 :               *async_client_manager_, dyn_resources.ads_config(), *stats_.rootScope(), false)
     474          23 :               ->createUncachedRawAsyncClient(),
     475          23 :           dispatcher_, random_, *stats_.rootScope(), dyn_resources.ads_config(), local_info_,
     476          23 :           std::move(custom_config_validators), std::move(backoff_strategy),
     477          23 :           makeOptRefFromPtr(xds_config_tracker_.get()), xds_delegate_opt_ref, use_eds_cache);
     478          23 :     }
     479         128 :   } else {
     480         100 :     ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
     481         100 :   }
     482             : 
     483             :   // After ADS is initialized, load EDS static clusters as EDS config may potentially need ADS.
     484         164 :   for (const auto& cluster : bootstrap.static_resources().clusters()) {
     485             :     // Now load all the secondary clusters.
     486         131 :     if (cluster.type() == envoy::config::cluster::v3::Cluster::EDS &&
     487         131 :         !Config::SubscriptionFactory::isPathBasedConfigSource(
     488           0 :             cluster.eds_cluster_config().eds_config().config_source_specifier_case())) {
     489           0 :       const bool required_for_ads = isBlockingAdsCluster(bootstrap, cluster.name());
     490           0 :       has_ads_cluster |= required_for_ads;
     491           0 :       auto status_or_cluster =
     492           0 :           loadCluster(cluster, MessageUtil::hash(cluster), "", /*added_via_api=*/false,
     493           0 :                       required_for_ads, active_clusters_);
     494           0 :       if (!status_or_cluster.status().ok()) {
     495           0 :         return status_or_cluster.status();
     496           0 :       }
     497           0 :     }
     498         131 :   }
     499             : 
     500         131 :   cm_stats_.cluster_added_.add(bootstrap.static_resources().clusters().size());
     501         131 :   updateClusterCounts();
     502             : 
     503         131 :   absl::optional<ThreadLocalClusterManagerImpl::LocalClusterParams> local_cluster_params;
     504         131 :   if (local_cluster_name_) {
     505           0 :     auto local_cluster = active_clusters_.find(local_cluster_name_.value());
     506           0 :     if (local_cluster == active_clusters_.end()) {
     507           0 :       return absl::InvalidArgumentError(
     508           0 :           fmt::format("local cluster '{}' must be defined", local_cluster_name_.value()));
     509           0 :     }
     510           0 :     local_cluster_params.emplace();
     511           0 :     local_cluster_params->info_ = local_cluster->second->cluster().info();
     512           0 :     local_cluster_params->load_balancer_factory_ = local_cluster->second->loadBalancerFactory();
     513           0 :     local_cluster->second->setAddedOrUpdated();
     514           0 :   }
     515             : 
     516             :   // Once the initial set of static bootstrap clusters are created (including the local cluster),
     517             :   // we can instantiate the thread local cluster manager.
     518         225 :   tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) {
     519         223 :     return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_params);
     520         223 :   });
     521             : 
     522             :   // We can now potentially create the CDS API once the backing cluster exists.
     523         131 :   if (dyn_resources.has_cds_config() || !dyn_resources.cds_resources_locator().empty()) {
     524          28 :     std::unique_ptr<xds::core::v3::ResourceLocator> cds_resources_locator;
     525          28 :     if (!dyn_resources.cds_resources_locator().empty()) {
     526           0 :       cds_resources_locator = std::make_unique<xds::core::v3::ResourceLocator>(
     527           0 :           Config::XdsResourceIdentifier::decodeUrl(dyn_resources.cds_resources_locator()));
     528           0 :     }
     529          28 :     cds_api_ = factory_.createCds(dyn_resources.cds_config(), cds_resources_locator.get(), *this);
     530          28 :     init_helper_.setCds(cds_api_.get());
     531         131 :   } else {
     532         103 :     init_helper_.setCds(nullptr);
     533         103 :   }
     534             : 
     535             :   // Proceed to add all static bootstrap clusters to the init manager. This will immediately
     536             :   // initialize any primary clusters. Post-init processing further initializes any thread
     537             :   // aware load balancer and sets up the per-worker host set updates.
     538         164 :   for (auto& cluster : active_clusters_) {
     539         131 :     init_helper_.addCluster(*cluster.second);
     540         131 :   }
     541             : 
     542             :   // Potentially move to secondary initialization on the static bootstrap clusters if all primary
     543             :   // clusters have already initialized. (E.g., if all static).
     544         131 :   init_helper_.onStaticLoadComplete();
     545             : 
     546         131 :   if (!has_ads_cluster) {
     547             :     // There is no ADS cluster, so we won't be starting the ADS mux after a cluster has finished
     548             :     // initializing, so we must start ADS here.
     549         101 :     ads_mux_->start();
     550         101 :   }
     551         131 :   return absl::OkStatus();
     552         131 : }
     553             : 
     554             : absl::Status ClusterManagerImpl::initializeSecondaryClusters(
     555         110 :     const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
     556         110 :   init_helper_.startInitializingSecondaryClusters();
     557             : 
     558         110 :   const auto& cm_config = bootstrap.cluster_manager();
     559         110 :   if (cm_config.has_load_stats_config()) {
     560           2 :     const auto& load_stats_config = cm_config.load_stats_config();
     561             : 
     562           2 :     absl::Status status = Config::Utility::checkTransportVersion(load_stats_config);
     563           2 :     RETURN_IF_NOT_OK(status);
     564           2 :     load_stats_reporter_ = std::make_unique<LoadStatsReporter>(
     565           2 :         local_info_, *this, *stats_.rootScope(),
     566           2 :         Config::Utility::factoryForGrpcApiConfigSource(*async_client_manager_, load_stats_config,
     567           2 :                                                        *stats_.rootScope(), false)
     568           2 :             ->createUncachedRawAsyncClient(),
     569           2 :         dispatcher_);
     570           2 :   }
     571         110 :   return absl::OkStatus();
     572         110 : }
     573             : 
     574         131 : ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) {
     575         131 :   const std::string final_prefix = "cluster_manager.";
     576         131 :   return {ALL_CLUSTER_MANAGER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix),
     577         131 :                                     POOL_GAUGE_PREFIX(scope, final_prefix))};
     578         131 : }
     579             : 
     580             : ThreadLocalClusterManagerStats
     581             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::generateStats(Stats::Scope& scope,
     582         223 :                                                                  const std::string& thread_name) {
     583         223 :   const std::string final_prefix = absl::StrCat("thread_local_cluster_manager.", thread_name);
     584         223 :   return {ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(POOL_GAUGE_PREFIX(scope, final_prefix))};
     585         223 : }
     586             : 
     587         159 : void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) {
     588             :   // This routine is called when a cluster has finished initializing. The cluster has not yet
     589             :   // been setup for cross-thread updates to avoid needless updates during initialization. The order
     590             :   // of operations here is important. We start by initializing the thread aware load balancer if
     591             :   // needed. This must happen first so cluster updates are heard first by the load balancer.
     592             :   // Also, it assures that all of clusters which this function is called should be always active.
     593         159 :   auto& cluster = cm_cluster.cluster();
     594         159 :   auto cluster_data = warming_clusters_.find(cluster.info()->name());
     595             :   // We have a situation that clusters will be immediately active, such as static and primary
     596             :   // cluster. So we must have this prevention logic here.
     597         159 :   if (cluster_data != warming_clusters_.end()) {
     598          28 :     clusterWarmingToActive(cluster.info()->name());
     599          28 :     updateClusterCounts();
     600          28 :   }
     601         159 :   cluster_data = active_clusters_.find(cluster.info()->name());
     602             : 
     603         159 :   if (cluster_data->second->thread_aware_lb_ != nullptr) {
     604         159 :     cluster_data->second->thread_aware_lb_->initialize();
     605         159 :   }
     606             : 
     607             :   // Now setup for cross-thread updates.
     608             :   // This is used by cluster types such as EDS clusters to drain the connection pools of removed
     609             :   // hosts.
     610         159 :   cluster_data->second->member_update_cb_ = cluster.prioritySet().addMemberUpdateCb(
     611         159 :       [&cluster, this](const HostVector&, const HostVector& hosts_removed) -> void {
     612          28 :         if (cluster.info()->lbConfig().close_connections_on_host_set_change()) {
     613           0 :           for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
     614             :             // This will drain all tcp and http connection pools.
     615           0 :             postThreadLocalRemoveHosts(cluster, host_set->hosts());
     616           0 :           }
     617          28 :         } else {
     618             :           // TODO(snowp): Should this be subject to merge windows?
     619             : 
     620             :           // Whenever hosts are removed from the cluster, we make each TLS cluster drain it's
     621             :           // connection pools for the removed hosts. If `close_connections_on_host_set_change` is
     622             :           // enabled, this case will be covered by first `if` statement, where all
     623             :           // connection pools are drained.
     624          28 :           if (!hosts_removed.empty()) {
     625           0 :             postThreadLocalRemoveHosts(cluster, hosts_removed);
     626           0 :           }
     627          28 :         }
     628          28 :       });
     629             : 
     630             :   // This is used by cluster types such as EDS clusters to update the cluster
     631             :   // without draining the cluster.
     632         159 :   cluster_data->second->priority_update_cb_ = cluster.prioritySet().addPriorityUpdateCb(
     633         159 :       [&cm_cluster, this](uint32_t priority, const HostVector& hosts_added,
     634         159 :                           const HostVector& hosts_removed) {
     635             :         // This fires when a cluster is about to have an updated member set. We need to send this
     636             :         // out to all of the thread local configurations.
     637             : 
     638             :         // Should we save this update and merge it with other updates?
     639             :         //
     640             :         // Note that we can only _safely_ merge updates that have no added/removed hosts. That is,
     641             :         // only those updates that signal a change in host healthcheck state, weight or metadata.
     642             :         //
     643             :         // We've discussed merging updates related to hosts being added/removed, but it's really
     644             :         // tricky to merge those given that downstream consumers of these updates expect to see the
     645             :         // full list of updates, not a condensed one. This is because they use the broadcasted
     646             :         // HostSharedPtrs within internal maps to track hosts. If we fail to broadcast the entire
     647             :         // list of removals, these maps will leak those HostSharedPtrs.
     648             :         //
     649             :         // See https://github.com/envoyproxy/envoy/pull/3941 for more context.
     650           0 :         bool scheduled = false;
     651           0 :         const auto merge_timeout = PROTOBUF_GET_MS_OR_DEFAULT(
     652           0 :             cm_cluster.cluster().info()->lbConfig(), update_merge_window, 1000);
     653             :         // Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes.
     654           0 :         const bool is_mergeable = hosts_added.empty() && hosts_removed.empty();
     655             : 
     656           0 :         if (merge_timeout > 0) {
     657             :           // If this is not mergeable, we should cancel any scheduled updates since
     658             :           // we'll deliver it immediately.
     659           0 :           scheduled = scheduleUpdate(cm_cluster, priority, is_mergeable, merge_timeout);
     660           0 :         }
     661             : 
     662             :         // If an update was not scheduled for later, deliver it immediately.
     663           0 :         if (!scheduled) {
     664           0 :           cm_stats_.cluster_updated_.inc();
     665           0 :           postThreadLocalClusterUpdate(
     666           0 :               cm_cluster, ThreadLocalClusterUpdateParams(priority, hosts_added, hosts_removed));
     667           0 :         }
     668           0 :       });
     669             : 
     670             :   // Finally, post updates cross-thread so the per-thread load balancers are ready. First we
     671             :   // populate any update information that may be available after cluster init.
     672         159 :   ThreadLocalClusterUpdateParams params;
     673         159 :   for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
     674         159 :     if (host_set->hosts().empty()) {
     675           0 :       continue;
     676           0 :     }
     677         159 :     params.per_priority_update_params_.emplace_back(host_set->priority(), host_set->hosts(),
     678         159 :                                                     HostVector{});
     679         159 :   }
     680             :   // NOTE: In all cases *other* than the local cluster, this is when a cluster is added/updated
     681             :   // The local cluster must currently be statically defined and must exist prior to other
     682             :   // clusters being added/updated. We could gate the below update on hosts being available on
     683             :   // the cluster or the cluster not already existing, but the special logic is not worth it.
     684         159 :   postThreadLocalClusterUpdate(cm_cluster, std::move(params));
     685         159 : }
     686             : 
     687             : bool ClusterManagerImpl::scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority,
     688           0 :                                         bool mergeable, const uint64_t timeout) {
     689             :   // Find pending updates for this cluster.
     690           0 :   auto& updates_by_prio = updates_map_[cluster.cluster().info()->name()];
     691           0 :   if (!updates_by_prio) {
     692           0 :     updates_by_prio = std::make_unique<PendingUpdatesByPriorityMap>();
     693           0 :   }
     694             : 
     695             :   // Find pending updates for this priority.
     696           0 :   auto& updates = (*updates_by_prio)[priority];
     697           0 :   if (!updates) {
     698           0 :     updates = std::make_unique<PendingUpdates>();
     699           0 :   }
     700             : 
     701             :   // Has an update_merge_window gone by since the last update? If so, don't schedule
     702             :   // the update so it can be applied immediately. Ditto if this is not a mergeable update.
     703           0 :   const auto delta = time_source_.monotonicTime() - updates->last_updated_;
     704           0 :   const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
     705           0 :   const bool out_of_merge_window = delta_ms > timeout;
     706           0 :   if (out_of_merge_window || !mergeable) {
     707             :     // If there was a pending update, we cancel the pending merged update.
     708             :     //
     709             :     // Note: it's possible that even though we are outside of a merge window (delta_ms > timeout),
     710             :     // a timer is enabled. This race condition is fine, since we'll disable the timer here and
     711             :     // deliver the update immediately.
     712             : 
     713             :     // Why wasn't the update scheduled for later delivery? We keep some stats that are helpful
     714             :     // to understand why merging did not happen. There's 2 things we are tracking here:
     715             : 
     716             :     // 1) Was this update out of a merge window?
     717           0 :     if (mergeable && out_of_merge_window) {
     718           0 :       cm_stats_.update_out_of_merge_window_.inc();
     719           0 :     }
     720             : 
     721             :     // 2) Were there previous updates that we are cancelling (and delivering immediately)?
     722           0 :     if (updates->disableTimer()) {
     723           0 :       cm_stats_.update_merge_cancelled_.inc();
     724           0 :     }
     725             : 
     726           0 :     updates->last_updated_ = time_source_.monotonicTime();
     727           0 :     return false;
     728           0 :   }
     729             : 
     730             :   // If there's no timer, create one.
     731           0 :   if (updates->timer_ == nullptr) {
     732           0 :     updates->timer_ = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void {
     733           0 :       applyUpdates(cluster, priority, *updates);
     734           0 :     });
     735           0 :   }
     736             : 
     737             :   // Ensure there's a timer set to deliver these updates.
     738           0 :   if (!updates->timer_->enabled()) {
     739           0 :     updates->enableTimer(timeout);
     740           0 :   }
     741             : 
     742           0 :   return true;
     743           0 : }
     744             : 
     745             : void ClusterManagerImpl::applyUpdates(ClusterManagerCluster& cluster, uint32_t priority,
     746           0 :                                       PendingUpdates& updates) {
     747             :   // Deliver pending updates.
     748             : 
     749             :   // Remember that these merged updates are _only_ for updates related to
     750             :   // HC/weight/metadata changes. That's why added/removed are empty. All
     751             :   // adds/removals were already immediately broadcasted.
     752           0 :   static const HostVector hosts_added;
     753           0 :   static const HostVector hosts_removed;
     754             : 
     755           0 :   postThreadLocalClusterUpdate(
     756           0 :       cluster, ThreadLocalClusterUpdateParams(priority, hosts_added, hosts_removed));
     757             : 
     758           0 :   cm_stats_.cluster_updated_via_merge_.inc();
     759           0 :   updates.last_updated_ = time_source_.monotonicTime();
     760           0 : }
     761             : 
     762             : bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cluster& cluster,
     763          28 :                                             const std::string& version_info) {
     764             :   // First we need to see if this new config is new or an update to an existing dynamic cluster.
     765             :   // We don't allow updates to statically configured clusters in the main configuration. We check
     766             :   // both the warming clusters and the active clusters to see if we need an update or the update
     767             :   // should be blocked.
     768          28 :   const std::string& cluster_name = cluster.name();
     769          28 :   const auto existing_active_cluster = active_clusters_.find(cluster_name);
     770          28 :   const auto existing_warming_cluster = warming_clusters_.find(cluster_name);
     771          28 :   const uint64_t new_hash = MessageUtil::hash(cluster);
     772          28 :   if (existing_warming_cluster != warming_clusters_.end()) {
     773             :     // If the cluster is the same as the warming cluster of the same name, block the update.
     774           0 :     if (existing_warming_cluster->second->blockUpdate(new_hash)) {
     775           0 :       return false;
     776           0 :     }
     777             :     // NB: https://github.com/envoyproxy/envoy/issues/14598
     778             :     // Always proceed if the cluster is different from the existing warming cluster.
     779          28 :   } else if (existing_active_cluster != active_clusters_.end() &&
     780          28 :              existing_active_cluster->second->blockUpdate(new_hash)) {
     781             :     // If there's no warming cluster of the same name, and if the cluster is the same as the active
     782             :     // cluster of the same name, block the update.
     783           0 :     return false;
     784           0 :   }
     785             : 
     786          28 :   if (existing_active_cluster != active_clusters_.end() ||
     787          28 :       existing_warming_cluster != warming_clusters_.end()) {
     788           0 :     if (existing_active_cluster != active_clusters_.end()) {
     789             :       // The following init manager remove call is a NOP in the case we are already initialized.
     790             :       // It's just kept here to avoid additional logic.
     791           0 :       init_helper_.removeCluster(*existing_active_cluster->second);
     792           0 :     }
     793           0 :     cm_stats_.cluster_modified_.inc();
     794          28 :   } else {
     795          28 :     cm_stats_.cluster_added_.inc();
     796          28 :   }
     797             : 
     798             :   // There are two discrete paths here depending on when we are adding/updating a cluster.
     799             :   // 1) During initial server load we use the init manager which handles complex logic related to
     800             :   //    primary/secondary init, static/CDS init, warming all clusters, etc.
     801             :   // 2) After initial server load, we handle warming independently for each cluster in the warming
     802             :   //    map.
     803             :   // Note: It's likely possible that all warming logic could be centralized in the init manager, but
     804             :   //       a decision was made to split the logic given how complex the init manager already is. In
     805             :   //       the future we may decide to undergo a refactor to unify the logic but the effort/risk to
     806             :   //       do that right now does not seem worth it given that the logic is generally pretty clean
     807             :   //       and easy to understand.
     808          28 :   const bool all_clusters_initialized =
     809          28 :       init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
     810             :   // Preserve the previous cluster data to avoid early destroy. The same cluster should be added
     811             :   // before destroy to avoid early initialization complete.
     812          28 :   auto status_or_cluster = loadCluster(cluster, new_hash, version_info, /*added_via_api=*/true,
     813          28 :                                        /*required_for_ads=*/false, warming_clusters_);
     814          28 :   THROW_IF_STATUS_NOT_OK(status_or_cluster, throw);
     815          28 :   const ClusterDataPtr previous_cluster = std::move(status_or_cluster.value());
     816          28 :   auto& cluster_entry = warming_clusters_.at(cluster_name);
     817          28 :   cluster_entry->cluster_->info()->configUpdateStats().warming_state_.set(1);
     818          28 :   if (!all_clusters_initialized) {
     819          28 :     ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
     820          28 :     init_helper_.addCluster(*cluster_entry);
     821          28 :   } else {
     822           0 :     ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
     823           0 :     cluster_entry->cluster_->initialize([this, cluster_name] {
     824           0 :       ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
     825           0 :       auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
     826           0 :       state_changed_cluster_entry->second->cluster_->info()->configUpdateStats().warming_state_.set(
     827           0 :           0);
     828           0 :       onClusterInit(*state_changed_cluster_entry->second);
     829           0 :     });
     830           0 :   }
     831             : 
     832          28 :   return true;
     833          28 : }
     834             : 
     835          28 : void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
     836          28 :   auto warming_it = warming_clusters_.find(cluster_name);
     837          28 :   ASSERT(warming_it != warming_clusters_.end());
     838             : 
     839             :   // If the cluster is being updated, we need to cancel any pending merged updates.
     840             :   // Otherwise, applyUpdates() will fire with a dangling cluster reference.
     841          28 :   updates_map_.erase(cluster_name);
     842             : 
     843          28 :   active_clusters_[cluster_name] = std::move(warming_it->second);
     844          28 :   warming_clusters_.erase(warming_it);
     845          28 : }
     846             : 
     847          40 : bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
     848          40 :   bool removed = false;
     849          40 :   auto existing_active_cluster = active_clusters_.find(cluster_name);
     850          40 :   if (existing_active_cluster != active_clusters_.end() &&
     851          40 :       existing_active_cluster->second->added_via_api_) {
     852           0 :     removed = true;
     853           0 :     init_helper_.removeCluster(*existing_active_cluster->second);
     854           0 :     active_clusters_.erase(existing_active_cluster);
     855             : 
     856           0 :     ENVOY_LOG(debug, "removing cluster {}", cluster_name);
     857           0 :     tls_.runOnAllThreads([cluster_name](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
     858           0 :       ASSERT(cluster_manager->thread_local_clusters_.contains(cluster_name) ||
     859           0 :              cluster_manager->thread_local_deferred_clusters_.contains(cluster_name));
     860           0 :       ENVOY_LOG(debug, "removing TLS cluster {}", cluster_name);
     861           0 :       for (auto cb_it = cluster_manager->update_callbacks_.begin();
     862           0 :            cb_it != cluster_manager->update_callbacks_.end();) {
     863             :         // The current callback may remove itself from the list, so a handle for
     864             :         // the next item is fetched before calling the callback.
     865           0 :         auto curr_cb_it = cb_it;
     866           0 :         ++cb_it;
     867           0 :         (*curr_cb_it)->onClusterRemoval(cluster_name);
     868           0 :       }
     869           0 :       cluster_manager->thread_local_clusters_.erase(cluster_name);
     870           0 :       cluster_manager->thread_local_deferred_clusters_.erase(cluster_name);
     871           0 :       cluster_manager->local_stats_.clusters_inflated_.set(
     872           0 :           cluster_manager->thread_local_clusters_.size());
     873           0 :     });
     874           0 :     cluster_initialization_map_.erase(cluster_name);
     875           0 :   }
     876             : 
     877          40 :   auto existing_warming_cluster = warming_clusters_.find(cluster_name);
     878          40 :   if (existing_warming_cluster != warming_clusters_.end() &&
     879          40 :       existing_warming_cluster->second->added_via_api_) {
     880           0 :     removed = true;
     881           0 :     init_helper_.removeCluster(*existing_warming_cluster->second);
     882           0 :     warming_clusters_.erase(existing_warming_cluster);
     883           0 :     ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
     884           0 :   }
     885             : 
     886          40 :   if (removed) {
     887           0 :     cm_stats_.cluster_removed_.inc();
     888           0 :     updateClusterCounts();
     889             :     // Cancel any pending merged updates.
     890           0 :     updates_map_.erase(cluster_name);
     891           0 :   }
     892             : 
     893          40 :   return removed;
     894          40 : }
     895             : 
     896             : absl::StatusOr<ClusterManagerImpl::ClusterDataPtr>
     897             : ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
     898             :                                 const uint64_t cluster_hash, const std::string& version_info,
     899             :                                 bool added_via_api, const bool required_for_ads,
     900         159 :                                 ClusterMap& cluster_map) {
     901         159 :   absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
     902         159 :       new_cluster_pair_or_error =
     903         159 :           factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
     904             : 
     905         159 :   if (!new_cluster_pair_or_error.ok()) {
     906           0 :     return absl::InvalidArgumentError(std::string(new_cluster_pair_or_error.status().message()));
     907           0 :   }
     908         159 :   auto& new_cluster = new_cluster_pair_or_error->first;
     909         159 :   auto& lb = new_cluster_pair_or_error->second;
     910         159 :   Cluster& cluster_reference = *new_cluster;
     911             : 
     912         159 :   const auto cluster_info = cluster_reference.info();
     913             : 
     914         159 :   if (!added_via_api) {
     915         131 :     if (cluster_map.find(cluster_info->name()) != cluster_map.end()) {
     916           0 :       return absl::InvalidArgumentError(
     917           0 :           fmt::format("cluster manager: duplicate cluster '{}'", cluster_info->name()));
     918           0 :     }
     919         131 :   }
     920             : 
     921             :   // Check if the cluster provided load balancing policy is used. We need handle it as special
     922             :   // case.
     923         159 :   bool cluster_provided_lb = cluster_info->lbType() == LoadBalancerType::ClusterProvided;
     924         159 :   if (cluster_info->lbType() == LoadBalancerType::LoadBalancingPolicyConfig) {
     925         159 :     TypedLoadBalancerFactory* typed_lb_factory = cluster_info->loadBalancerFactory();
     926         159 :     RELEASE_ASSERT(typed_lb_factory != nullptr, "ClusterInfo should contain a valid factory");
     927         159 :     cluster_provided_lb =
     928         159 :         typed_lb_factory->name() == "envoy.load_balancing_policies.cluster_provided";
     929         159 :   }
     930             : 
     931         159 :   if (cluster_provided_lb && lb == nullptr) {
     932           0 :     return absl::InvalidArgumentError(
     933           0 :         fmt::format("cluster manager: cluster provided LB specified but cluster "
     934           0 :                     "'{}' did not provide one. Check cluster documentation.",
     935           0 :                     cluster_info->name()));
     936           0 :   }
     937         159 :   if (!cluster_provided_lb && lb != nullptr) {
     938           0 :     return absl::InvalidArgumentError(
     939           0 :         fmt::format("cluster manager: cluster provided LB not specified but cluster "
     940           0 :                     "'{}' provided one. Check cluster documentation.",
     941           0 :                     cluster_info->name()));
     942           0 :   }
     943             : 
     944         159 :   if (new_cluster->healthChecker() != nullptr) {
     945           0 :     new_cluster->healthChecker()->addHostCheckCompleteCb(
     946           0 :         [this](HostSharedPtr host, HealthTransition changed_state) {
     947           0 :           if (changed_state == HealthTransition::Changed &&
     948           0 :               host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
     949           0 :             postThreadLocalHealthFailure(host);
     950           0 :           }
     951           0 :         });
     952           0 :   }
     953             : 
     954         159 :   if (new_cluster->outlierDetector() != nullptr) {
     955           0 :     new_cluster->outlierDetector()->addChangedStateCb([this](HostSharedPtr host) {
     956           0 :       if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
     957           0 :         ENVOY_LOG_EVENT(debug, "outlier_detection_ejection",
     958           0 :                         "host {} in cluster {} was ejected by the outlier detector",
     959           0 :                         host->address()->asStringView(), host->cluster().name());
     960           0 :         postThreadLocalHealthFailure(host);
     961           0 :       }
     962           0 :     });
     963           0 :   }
     964         159 :   ClusterDataPtr result;
     965         159 :   auto cluster_entry_it = cluster_map.find(cluster_info->name());
     966         159 :   if (cluster_entry_it != cluster_map.end()) {
     967           0 :     result = std::exchange(cluster_entry_it->second,
     968           0 :                            std::make_unique<ClusterData>(cluster, cluster_hash, version_info,
     969           0 :                                                          added_via_api, required_for_ads,
     970           0 :                                                          std::move(new_cluster), time_source_));
     971         159 :   } else {
     972         159 :     bool inserted = false;
     973         159 :     std::tie(cluster_entry_it, inserted) = cluster_map.emplace(
     974         159 :         cluster_info->name(),
     975         159 :         std::make_unique<ClusterData>(cluster, cluster_hash, version_info, added_via_api,
     976         159 :                                       required_for_ads, std::move(new_cluster), time_source_));
     977         159 :     ASSERT(inserted);
     978         159 :   }
     979             :   // If an LB is thread aware, create it here. The LB is not initialized until cluster pre-init
     980             :   // finishes. For RingHash/Maglev don't create the LB here if subset balancing is enabled,
     981             :   // because the thread_aware_lb_ field takes precedence over the subset lb).
     982         159 :   if (cluster_info->lbType() == LoadBalancerType::RingHash) {
     983           0 :     if (!cluster_info->lbSubsetInfo().isEnabled()) {
     984           0 :       auto& factory = Config::Utility::getAndCheckFactoryByName<TypedLoadBalancerFactory>(
     985           0 :           "envoy.load_balancing_policies.ring_hash");
     986           0 :       cluster_entry_it->second->thread_aware_lb_ = factory.create(
     987           0 :           {}, *cluster_info, cluster_reference.prioritySet(), runtime_, random_, time_source_);
     988           0 :     }
     989         159 :   } else if (cluster_info->lbType() == LoadBalancerType::Maglev) {
     990           0 :     if (!cluster_info->lbSubsetInfo().isEnabled()) {
     991           0 :       auto& factory = Config::Utility::getAndCheckFactoryByName<TypedLoadBalancerFactory>(
     992           0 :           "envoy.load_balancing_policies.maglev");
     993           0 :       cluster_entry_it->second->thread_aware_lb_ = factory.create(
     994           0 :           {}, *cluster_info, cluster_reference.prioritySet(), runtime_, random_, time_source_);
     995           0 :     }
     996         159 :   } else if (cluster_provided_lb) {
     997           0 :     cluster_entry_it->second->thread_aware_lb_ = std::move(lb);
     998         159 :   } else if (cluster_info->lbType() == LoadBalancerType::LoadBalancingPolicyConfig) {
     999         159 :     TypedLoadBalancerFactory* typed_lb_factory = cluster_info->loadBalancerFactory();
    1000         159 :     RELEASE_ASSERT(typed_lb_factory != nullptr, "ClusterInfo should contain a valid factory");
    1001         159 :     cluster_entry_it->second->thread_aware_lb_ =
    1002         159 :         typed_lb_factory->create(cluster_info->loadBalancerConfig(), *cluster_info,
    1003         159 :                                  cluster_reference.prioritySet(), runtime_, random_, time_source_);
    1004         159 :   }
    1005             : 
    1006         159 :   updateClusterCounts();
    1007         159 :   return result;
    1008         159 : }
    1009             : 
    1010         445 : void ClusterManagerImpl::updateClusterCounts() {
    1011             :   // This if/else block implements a control flow mechanism that can be used by an ADS
    1012             :   // implementation to properly sequence CDS and RDS updates. It is not enforcing on ADS. ADS can
    1013             :   // use it to detect when a previously sent cluster becomes warm before sending routes that depend
    1014             :   // on it. This can improve incidence of HTTP 503 responses from Envoy when a route is used before
    1015             :   // it's supporting cluster is ready.
    1016             :   //
    1017             :   // We achieve that by leaving CDS in the paused state as long as there is at least
    1018             :   // one cluster in the warming state. This prevents CDS ACK from being sent to ADS.
    1019             :   // Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
    1020             :   // signal to ADS to proceed with RDS updates.
    1021             :   // If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
    1022         445 :   const bool all_clusters_initialized =
    1023         445 :       init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
    1024         445 :   if (all_clusters_initialized && ads_mux_) {
    1025           0 :     const auto type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>();
    1026           0 :     if (last_recorded_warming_clusters_count_ == 0 && !warming_clusters_.empty()) {
    1027           0 :       resume_cds_ = ads_mux_->pause(type_url);
    1028           0 :     } else if (last_recorded_warming_clusters_count_ > 0 && warming_clusters_.empty()) {
    1029           0 :       ASSERT(resume_cds_ != nullptr);
    1030           0 :       resume_cds_.reset();
    1031           0 :     }
    1032           0 :   }
    1033         445 :   cm_stats_.active_clusters_.set(active_clusters_.size());
    1034         445 :   cm_stats_.warming_clusters_.set(warming_clusters_.size());
    1035         445 :   last_recorded_warming_clusters_count_ = warming_clusters_.size();
    1036         445 : }
    1037             : 
    1038         504 : ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view cluster) {
    1039         504 :   ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
    1040             : 
    1041         504 :   auto entry = cluster_manager.thread_local_clusters_.find(cluster);
    1042         504 :   if (entry != cluster_manager.thread_local_clusters_.end()) {
    1043         504 :     return entry->second.get();
    1044         504 :   } else {
    1045           0 :     return cluster_manager.initializeClusterInlineIfExists(cluster);
    1046           0 :   }
    1047         504 : }
    1048             : 
    1049             : void ClusterManagerImpl::maybePreconnect(
    1050             :     ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
    1051             :     const ClusterConnectivityState& state,
    1052         251 :     std::function<ConnectionPool::Instance*()> pick_preconnect_pool) {
    1053         251 :   auto peekahead_ratio = cluster_entry.info()->peekaheadRatio();
    1054         251 :   if (peekahead_ratio <= 1.0) {
    1055         251 :     return;
    1056         251 :   }
    1057             : 
    1058             :   // 3 here is arbitrary. Just as in ConnPoolImplBase::tryCreateNewConnections
    1059             :   // we want to limit the work which can be done on any given preconnect attempt.
    1060           0 :   for (int i = 0; i < 3; ++i) {
    1061             :     // See if adding this one new connection
    1062             :     // would put the cluster over desired capacity. If so, stop preconnecting.
    1063             :     //
    1064             :     // We anticipate the incoming stream here, because maybePreconnect is called
    1065             :     // before a new stream is established.
    1066           0 :     if (!ConnectionPool::ConnPoolImplBase::shouldConnect(
    1067           0 :             state.pending_streams_, state.active_streams_,
    1068           0 :             state.connecting_and_connected_stream_capacity_, peekahead_ratio, true)) {
    1069           0 :       return;
    1070           0 :     }
    1071           0 :     ConnectionPool::Instance* preconnect_pool = pick_preconnect_pool();
    1072           0 :     if (!preconnect_pool || !preconnect_pool->maybePreconnect(peekahead_ratio)) {
    1073             :       // Given that the next preconnect pick may be entirely different, we could
    1074             :       // opt to try again even if the first preconnect fails. Err on the side of
    1075             :       // caution and wait for the next attempt.
    1076           0 :       return;
    1077           0 :     }
    1078           0 :   }
    1079           0 : }
    1080             : 
    1081             : absl::optional<HttpPoolData>
    1082             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
    1083             :     ResourcePriority priority, absl::optional<Http::Protocol> protocol,
    1084         251 :     LoadBalancerContext* context) {
    1085             :   // Select a host and create a connection pool for it if it does not already exist.
    1086         251 :   auto pool = httpConnPoolImpl(priority, protocol, context, false);
    1087         251 :   if (pool == nullptr) {
    1088           0 :     return absl::nullopt;
    1089           0 :   }
    1090             : 
    1091         251 :   HttpPoolData data(
    1092         251 :       [this, priority, protocol, context]() -> void {
    1093             :         // Now that a new stream is being established, attempt to preconnect.
    1094         251 :         maybePreconnect(*this, parent_.cluster_manager_state_,
    1095         251 :                         [this, &priority, &protocol, &context]() {
    1096           0 :                           return httpConnPoolImpl(priority, protocol, context, true);
    1097           0 :                         });
    1098         251 :       },
    1099         251 :       pool);
    1100         251 :   return data;
    1101         251 : }
    1102             : 
    1103             : absl::optional<TcpPoolData>
    1104             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
    1105           0 :     ResourcePriority priority, LoadBalancerContext* context) {
    1106             :   // Select a host and create a connection pool for it if it does not already exist.
    1107           0 :   auto pool = tcpConnPoolImpl(priority, context, false);
    1108           0 :   if (pool == nullptr) {
    1109           0 :     return absl::nullopt;
    1110           0 :   }
    1111             : 
    1112           0 :   TcpPoolData data(
    1113           0 :       [this, priority, context]() -> void {
    1114           0 :         maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() {
    1115           0 :           return tcpConnPoolImpl(priority, context, true);
    1116           0 :         });
    1117           0 :       },
    1118           0 :       pool);
    1119           0 :   return data;
    1120           0 : }
    1121             : 
    1122             : void ClusterManagerImpl::drainConnections(const std::string& cluster,
    1123           0 :                                           DrainConnectionsHostPredicate predicate) {
    1124           0 :   ENVOY_LOG_EVENT(debug, "drain_connections_call", "drainConnections called for cluster {}",
    1125           0 :                   cluster);
    1126           0 :   tls_.runOnAllThreads([cluster, predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
    1127           0 :     auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster);
    1128           0 :     if (cluster_entry != cluster_manager->thread_local_clusters_.end()) {
    1129           0 :       cluster_entry->second->drainConnPools(
    1130           0 :           predicate, ConnectionPool::DrainBehavior::DrainExistingConnections);
    1131           0 :     }
    1132           0 :   });
    1133           0 : }
    1134             : 
    1135           0 : void ClusterManagerImpl::drainConnections(DrainConnectionsHostPredicate predicate) {
    1136           0 :   ENVOY_LOG_EVENT(debug, "drain_connections_call_for_all_clusters",
    1137           0 :                   "drainConnections called for all clusters");
    1138           0 :   tls_.runOnAllThreads([predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
    1139           0 :     for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) {
    1140           0 :       cluster_entry.second->drainConnPools(predicate,
    1141           0 :                                            ConnectionPool::DrainBehavior::DrainExistingConnections);
    1142           0 :     }
    1143           0 :   });
    1144           0 : }
    1145             : 
    1146          28 : absl::Status ClusterManagerImpl::checkActiveStaticCluster(const std::string& cluster) {
    1147          28 :   const auto& it = active_clusters_.find(cluster);
    1148          28 :   if (it == active_clusters_.end()) {
    1149           0 :     return absl::InvalidArgumentError(fmt::format("Unknown gRPC client cluster '{}'", cluster));
    1150           0 :   }
    1151          28 :   if (it->second->added_via_api_) {
    1152           0 :     return absl::InvalidArgumentError(
    1153           0 :         fmt::format("gRPC client cluster '{}' is not static", cluster));
    1154           0 :   }
    1155          28 :   return absl::OkStatus();
    1156          28 : }
    1157             : 
    1158             : void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster,
    1159           0 :                                                     const HostVector& hosts_removed) {
    1160             :   // Drain the connection pools for the given hosts. For deferred clusters have
    1161             :   // been created.
    1162           0 :   tls_.runOnAllThreads([name = cluster.info()->name(),
    1163           0 :                         hosts_removed](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
    1164           0 :     cluster_manager->removeHosts(name, hosts_removed);
    1165           0 :   });
    1166           0 : }
    1167             : 
    1168             : bool ClusterManagerImpl::deferralIsSupportedForCluster(
    1169         159 :     const ClusterInfoConstSharedPtr& info) const {
    1170         159 :   if (!deferred_cluster_creation_) {
    1171         159 :     return false;
    1172         159 :   }
    1173             : 
    1174             :   // Certain cluster types are unsupported for deferred initialization.
    1175             :   // We need to check both the `clusterType()` (preferred) falling back to
    1176             :   // the `type()` due to how custom clusters were added leveraging an any
    1177             :   // config.
    1178           0 :   if (auto custom_cluster_type = info->clusterType(); custom_cluster_type.has_value()) {
    1179             :     // TODO(kbaichoo): make it configurable what custom types are supported?
    1180           0 :     static const std::array<std::string, 4> supported_well_known_cluster_types = {
    1181           0 :         "envoy.clusters.aggregate", "envoy.cluster.eds", "envoy.clusters.redis",
    1182           0 :         "envoy.cluster.static"};
    1183           0 :     if (std::find(supported_well_known_cluster_types.begin(),
    1184           0 :                   supported_well_known_cluster_types.end(),
    1185           0 :                   custom_cluster_type->name()) == supported_well_known_cluster_types.end()) {
    1186           0 :       return false;
    1187           0 :     }
    1188             : 
    1189           0 :   } else {
    1190             :     // Check DiscoveryType instead.
    1191           0 :     static constexpr std::array<envoy::config::cluster::v3::Cluster::DiscoveryType, 2>
    1192           0 :         supported_cluster_types = {envoy::config::cluster::v3::Cluster::EDS,
    1193           0 :                                    envoy::config::cluster::v3::Cluster::STATIC};
    1194           0 :     if (std::find(supported_cluster_types.begin(), supported_cluster_types.end(), info->type()) ==
    1195           0 :         supported_cluster_types.end()) {
    1196           0 :       return false;
    1197           0 :     }
    1198           0 :   }
    1199             : 
    1200           0 :   return true;
    1201           0 : }
    1202             : 
    1203             : void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
    1204         159 :                                                       ThreadLocalClusterUpdateParams&& params) {
    1205         159 :   bool add_or_update_cluster = false;
    1206         159 :   if (!cm_cluster.addedOrUpdated()) {
    1207         159 :     add_or_update_cluster = true;
    1208         159 :     cm_cluster.setAddedOrUpdated();
    1209         159 :   }
    1210             : 
    1211         159 :   LoadBalancerFactorySharedPtr load_balancer_factory;
    1212         159 :   if (add_or_update_cluster) {
    1213         159 :     load_balancer_factory = cm_cluster.loadBalancerFactory();
    1214         159 :   }
    1215             : 
    1216         159 :   for (auto& per_priority : params.per_priority_update_params_) {
    1217         159 :     const auto& host_set =
    1218         159 :         cm_cluster.cluster().prioritySet().hostSetsPerPriority()[per_priority.priority_];
    1219         159 :     per_priority.update_hosts_params_ = HostSetImpl::updateHostsParams(*host_set);
    1220         159 :     per_priority.locality_weights_ = host_set->localityWeights();
    1221         159 :     per_priority.weighted_priority_health_ = host_set->weightedPriorityHealth();
    1222         159 :     per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
    1223         159 :   }
    1224             : 
    1225         159 :   HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap();
    1226             : 
    1227         159 :   pending_cluster_creations_.erase(cm_cluster.cluster().info()->name());
    1228             : 
    1229         159 :   const UnitFloat drop_overload = cm_cluster.cluster().dropOverload();
    1230             :   // Populate the cluster initialization object based on this update.
    1231         159 :   ClusterInitializationObjectConstSharedPtr cluster_initialization_object =
    1232         159 :       addOrUpdateClusterInitializationObjectIfSupported(
    1233         159 :           params, cm_cluster.cluster().info(), load_balancer_factory, host_map, drop_overload);
    1234             : 
    1235         159 :   tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
    1236         159 :                         add_or_update_cluster, load_balancer_factory, map = std::move(host_map),
    1237         159 :                         cluster_initialization_object = std::move(cluster_initialization_object),
    1238         306 :                         drop_overload](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
    1239         306 :     ASSERT(cluster_manager.has_value(),
    1240         306 :            "Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");
    1241             : 
    1242             :     // Cluster Manager here provided by the particular thread, it will provide
    1243             :     // this allowing to make the relevant change.
    1244         306 :     if (const bool defer_unused_clusters =
    1245         306 :             cluster_initialization_object != nullptr &&
    1246         306 :             !cluster_manager->thread_local_clusters_.contains(info->name()) &&
    1247         306 :             !Envoy::Thread::MainThread::isMainThread();
    1248         306 :         defer_unused_clusters) {
    1249             :       // Save the cluster initialization object.
    1250           0 :       ENVOY_LOG(debug, "Deferring add or update for TLS cluster {}", info->name());
    1251           0 :       cluster_manager->thread_local_deferred_clusters_[info->name()] =
    1252           0 :           cluster_initialization_object;
    1253             : 
    1254             :       // Invoke similar logic of onClusterAddOrUpdate.
    1255           0 :       ThreadLocalClusterCommand command = [&cluster_manager,
    1256           0 :                                            cluster_name = info->name()]() -> ThreadLocalCluster& {
    1257             :         // If we have multiple callbacks only the first one needs to use the
    1258             :         // command to initialize the cluster.
    1259           0 :         auto existing_cluster_entry = cluster_manager->thread_local_clusters_.find(cluster_name);
    1260           0 :         if (existing_cluster_entry != cluster_manager->thread_local_clusters_.end()) {
    1261           0 :           return *existing_cluster_entry->second;
    1262           0 :         }
    1263             : 
    1264           0 :         auto* cluster_entry = cluster_manager->initializeClusterInlineIfExists(cluster_name);
    1265           0 :         ASSERT(cluster_entry != nullptr, "Deferred clusters initiailization should not fail.");
    1266           0 :         return *cluster_entry;
    1267           0 :       };
    1268           0 :       for (auto cb_it = cluster_manager->update_callbacks_.begin();
    1269           0 :            cb_it != cluster_manager->update_callbacks_.end();) {
    1270             :         // The current callback may remove itself from the list, so a handle for
    1271             :         // the next item is fetched before calling the callback.
    1272           0 :         auto curr_cb_it = cb_it;
    1273           0 :         ++cb_it;
    1274           0 :         (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
    1275           0 :       }
    1276             : 
    1277         306 :     } else {
    1278             :       // Broadcast
    1279         306 :       ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
    1280         306 :       if (add_or_update_cluster) {
    1281         306 :         if (cluster_manager->thread_local_clusters_.contains(info->name())) {
    1282           0 :           ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
    1283         306 :         } else {
    1284         306 :           ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
    1285         306 :         }
    1286             : 
    1287         306 :         new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
    1288         306 :                                                                       load_balancer_factory);
    1289         306 :         cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
    1290         306 :         cluster_manager->local_stats_.clusters_inflated_.set(
    1291         306 :             cluster_manager->thread_local_clusters_.size());
    1292         306 :       }
    1293             : 
    1294         306 :       if (cluster_manager->thread_local_clusters_[info->name()]) {
    1295         306 :         cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);
    1296         306 :       }
    1297         306 :       for (const auto& per_priority : params.per_priority_update_params_) {
    1298         306 :         cluster_manager->updateClusterMembership(
    1299         306 :             info->name(), per_priority.priority_, per_priority.update_hosts_params_,
    1300         306 :             per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,
    1301         306 :             per_priority.weighted_priority_health_, per_priority.overprovisioning_factor_, map);
    1302         306 :       }
    1303             : 
    1304         306 :       if (new_cluster != nullptr) {
    1305         306 :         ThreadLocalClusterCommand command = [&new_cluster]() -> ThreadLocalCluster& {
    1306           0 :           return *new_cluster;
    1307           0 :         };
    1308         306 :         for (auto cb_it = cluster_manager->update_callbacks_.begin();
    1309         612 :              cb_it != cluster_manager->update_callbacks_.end();) {
    1310             :           // The current callback may remove itself from the list, so a handle for
    1311             :           // the next item is fetched before calling the callback.
    1312         306 :           auto curr_cb_it = cb_it;
    1313         306 :           ++cb_it;
    1314         306 :           (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
    1315         306 :         }
    1316         306 :       }
    1317         306 :     }
    1318         306 :   });
    1319             : 
    1320             :   // By this time, the main thread has received the cluster initialization update, so we can start
    1321             :   // the ADS mux if the ADS mux is dependent on this cluster's initialization.
    1322         159 :   if (cm_cluster.requiredForAds() && !ads_mux_initialized_) {
    1323          28 :     ads_mux_->start();
    1324          28 :     ads_mux_initialized_ = true;
    1325          28 :   }
    1326         159 : }
    1327             : 
    1328             : ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr
    1329             : ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
    1330             :     const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
    1331             :     LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
    1332         159 :     UnitFloat drop_overload) {
    1333         159 :   if (!deferralIsSupportedForCluster(cluster_info)) {
    1334         159 :     return nullptr;
    1335         159 :   }
    1336             : 
    1337           0 :   const std::string& cluster_name = cluster_info->name();
    1338           0 :   auto entry = cluster_initialization_map_.find(cluster_name);
    1339             :   // TODO(kbaichoo): if EDS can be configured via cluster_type() then modify the
    1340             :   // merging logic below.
    1341             :   //
    1342             :   // This method may be called multiple times to create multiple ClusterInitializationObject
    1343             :   // instances for the same cluster. And before the thread local clusters are actually initialized,
    1344             :   // the new instances will override the old instances in the work threads. But part of data is be
    1345             :   // created only once, such as load balancer factory. So we should always to merge the new instance
    1346             :   // with the old one to keep the latest instance have all necessary data.
    1347             :   //
    1348             :   // More specifically, this will happen in the following scenarios for now:
    1349             :   // 1. EDS clusters: the ClusterLoadAssignment of EDS cluster may be updated multiples before
    1350             :   //   the thread local cluster is initialized.
    1351             :   // 2. Clusters in the unit tests: the cluster in the unit test may be updated multiples before
    1352             :   //   the thread local cluster is initialized by calling 'updateHosts' manually.
    1353           0 :   const bool should_merge_with_prior_cluster =
    1354           0 :       entry != cluster_initialization_map_.end() && entry->second->cluster_info_ == cluster_info;
    1355             : 
    1356           0 :   if (should_merge_with_prior_cluster) {
    1357             :     // We need to copy from an existing Cluster Initialization Object. In
    1358             :     // particular, only update the params with changed priority.
    1359           0 :     auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
    1360           0 :         entry->second->per_priority_state_, params, std::move(cluster_info),
    1361           0 :         load_balancer_factory == nullptr ? entry->second->load_balancer_factory_
    1362           0 :                                          : load_balancer_factory,
    1363           0 :         map, drop_overload);
    1364           0 :     cluster_initialization_map_[cluster_name] = new_initialization_object;
    1365           0 :     return new_initialization_object;
    1366           0 :   } else {
    1367             :     // We need to create a fresh Cluster Initialization Object.
    1368           0 :     auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
    1369           0 :         params, std::move(cluster_info), load_balancer_factory, map, drop_overload);
    1370           0 :     cluster_initialization_map_[cluster_name] = new_initialization_object;
    1371           0 :     return new_initialization_object;
    1372           0 :   }
    1373           0 : }
    1374             : 
    1375             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry*
    1376             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExists(
    1377           0 :     absl::string_view cluster) {
    1378           0 :   auto entry = thread_local_deferred_clusters_.find(cluster);
    1379           0 :   if (entry == thread_local_deferred_clusters_.end()) {
    1380             :     // Unknown cluster.
    1381           0 :     return nullptr;
    1382           0 :   }
    1383             : 
    1384             :   // Create the cluster inline.
    1385           0 :   const ClusterInitializationObjectConstSharedPtr& initialization_object = entry->second;
    1386           0 :   ENVOY_LOG(debug, "initializing TLS cluster {} inline", cluster);
    1387           0 :   auto cluster_entry = std::make_unique<ClusterEntry>(
    1388           0 :       *this, initialization_object->cluster_info_, initialization_object->load_balancer_factory_);
    1389           0 :   ClusterEntry* cluster_entry_ptr = cluster_entry.get();
    1390             : 
    1391           0 :   thread_local_clusters_[cluster] = std::move(cluster_entry);
    1392           0 :   local_stats_.clusters_inflated_.set(thread_local_clusters_.size());
    1393             : 
    1394           0 :   for (const auto& [_, per_priority] : initialization_object->per_priority_state_) {
    1395           0 :     updateClusterMembership(initialization_object->cluster_info_->name(), per_priority.priority_,
    1396           0 :                             per_priority.update_hosts_params_, per_priority.locality_weights_,
    1397           0 :                             per_priority.hosts_added_, per_priority.hosts_removed_,
    1398           0 :                             per_priority.weighted_priority_health_,
    1399           0 :                             per_priority.overprovisioning_factor_,
    1400           0 :                             initialization_object->cross_priority_host_map_);
    1401           0 :   }
    1402           0 :   thread_local_clusters_[cluster]->setDropOverload(initialization_object->drop_overload_);
    1403             : 
    1404             :   // Remove the CIO as we've initialized the cluster.
    1405           0 :   thread_local_deferred_clusters_.erase(entry);
    1406             : 
    1407           0 :   return cluster_entry_ptr;
    1408           0 : }
    1409             : 
    1410             : ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
    1411             :     const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
    1412             :     LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
    1413             :     UnitFloat drop_overload)
    1414             :     : cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory),
    1415           0 :       cross_priority_host_map_(map), drop_overload_(drop_overload) {
    1416             :   // Copy the update since the map is empty.
    1417           0 :   for (const auto& update : params.per_priority_update_params_) {
    1418           0 :     per_priority_state_.emplace(update.priority_, update);
    1419           0 :   }
    1420           0 : }
    1421             : 
    1422             : ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
    1423             :     const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>& per_priority_state,
    1424             :     const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
    1425             :     LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
    1426             :     UnitFloat drop_overload)
    1427             :     : per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)),
    1428             :       load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map),
    1429           0 :       drop_overload_(drop_overload) {
    1430             : 
    1431             :   // Because EDS Clusters receive the entire ClusterLoadAssignment but only
    1432             :   // provides the delta we must process the hosts_added and hosts_removed and
    1433             :   // not simply overwrite with hosts added.
    1434           0 :   for (const auto& update : update_params.per_priority_update_params_) {
    1435           0 :     auto it = per_priority_state_.find(update.priority_);
    1436           0 :     if (it != per_priority_state_.end()) {
    1437           0 :       auto& priority_state = it->second;
    1438             :       // Merge the two per_priorities.
    1439           0 :       priority_state.update_hosts_params_ = update.update_hosts_params_;
    1440           0 :       priority_state.locality_weights_ = update.locality_weights_;
    1441           0 :       priority_state.weighted_priority_health_ = update.weighted_priority_health_;
    1442           0 :       priority_state.overprovisioning_factor_ = update.overprovisioning_factor_;
    1443             : 
    1444             :       // Merge the hosts vectors to just have hosts added.
    1445             :       // Assumes that the old host_added_ is exclusive to new hosts_added_ and
    1446             :       // new hosts_removed_ only refers to the old hosts_added_.
    1447           0 :       ASSERT(priority_state.hosts_removed_.empty(),
    1448           0 :              "Cluster Initialization Object should apply hosts "
    1449           0 :              "removed updates to hosts_added vector!");
    1450             : 
    1451             :       // TODO(kbaichoo): replace with a more efficient algorithm. For example
    1452             :       // if the EDS cluster exposed the LoadAssignment we could just merge by
    1453             :       // overwriting hosts_added.
    1454           0 :       if (!update.hosts_removed_.empty()) {
    1455             :         // Remove all hosts to be removed from the old host_added.
    1456           0 :         auto& host_added = priority_state.hosts_added_;
    1457           0 :         auto removed_section = std::remove_if(
    1458           0 :             host_added.begin(), host_added.end(),
    1459           0 :             [hosts_removed = std::cref(update.hosts_removed_)](const HostSharedPtr& ptr) {
    1460           0 :               return std::find(hosts_removed.get().begin(), hosts_removed.get().end(), ptr) !=
    1461           0 :                      hosts_removed.get().end();
    1462           0 :             });
    1463           0 :         priority_state.hosts_added_.erase(removed_section, priority_state.hosts_added_.end());
    1464           0 :       }
    1465             : 
    1466             :       // Add updated host_added.
    1467           0 :       priority_state.hosts_added_.reserve(priority_state.hosts_added_.size() +
    1468           0 :                                           update.hosts_added_.size());
    1469           0 :       std::copy(update.hosts_added_.begin(), update.hosts_added_.end(),
    1470           0 :                 std::back_inserter(priority_state.hosts_added_));
    1471             : 
    1472           0 :     } else {
    1473             :       // Just copy the new priority.
    1474           0 :       per_priority_state_.emplace(update.priority_, update);
    1475           0 :     }
    1476           0 :   }
    1477           0 : }
    1478             : 
    1479           0 : void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) {
    1480           0 :   tls_.runOnAllThreads([host](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
    1481           0 :     cluster_manager->onHostHealthFailure(host);
    1482           0 :   });
    1483           0 : }
    1484             : 
    1485             : Host::CreateConnectionData ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConn(
    1486           0 :     LoadBalancerContext* context) {
    1487           0 :   HostConstSharedPtr logical_host = chooseHost(context);
    1488           0 :   if (logical_host) {
    1489           0 :     auto conn_info = logical_host->createConnection(
    1490           0 :         parent_.thread_local_dispatcher_, nullptr,
    1491           0 :         context == nullptr ? nullptr : context->upstreamTransportSocketOptions());
    1492           0 :     if ((cluster_info_->features() &
    1493           0 :          ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) &&
    1494           0 :         conn_info.connection_ != nullptr) {
    1495           0 :       auto conn_map_iter = parent_.host_tcp_conn_map_.find(logical_host);
    1496           0 :       if (conn_map_iter == parent_.host_tcp_conn_map_.end()) {
    1497           0 :         conn_map_iter =
    1498           0 :             parent_.host_tcp_conn_map_.try_emplace(logical_host, logical_host->acquireHandle())
    1499           0 :                 .first;
    1500           0 :       }
    1501           0 :       auto& conn_map = conn_map_iter->second;
    1502           0 :       conn_map.connections_.emplace(
    1503           0 :           conn_info.connection_.get(),
    1504           0 :           std::make_unique<ThreadLocalClusterManagerImpl::TcpConnContainer>(
    1505           0 :               parent_, logical_host, *conn_info.connection_));
    1506           0 :     }
    1507           0 :     return conn_info;
    1508           0 :   } else {
    1509           0 :     cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
    1510           0 :     return {nullptr, nullptr};
    1511           0 :   }
    1512           0 : }
    1513             : 
    1514             : Http::AsyncClient&
    1515          68 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient() {
    1516          68 :   if (lazy_http_async_client_ == nullptr) {
    1517          33 :     lazy_http_async_client_ = std::make_unique<Http::AsyncClientImpl>(
    1518          33 :         cluster_info_, parent_.parent_.stats_, parent_.thread_local_dispatcher_,
    1519          33 :         parent_.parent_.local_info_, parent_.parent_, parent_.parent_.runtime_,
    1520          33 :         parent_.parent_.random_,
    1521          33 :         Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent_.parent_)},
    1522          33 :         parent_.parent_.http_context_, parent_.parent_.router_context_);
    1523          33 :   }
    1524          68 :   return *lazy_http_async_client_;
    1525          68 : }
    1526             : 
    1527             : Tcp::AsyncTcpClientPtr
    1528             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpAsyncClient(
    1529           0 :     LoadBalancerContext* context, Tcp::AsyncTcpClientOptionsConstSharedPtr options) {
    1530           0 :   return std::make_unique<Tcp::AsyncTcpClientImpl>(parent_.thread_local_dispatcher_, *this, context,
    1531           0 :                                                    options->enable_half_close);
    1532           0 : }
    1533             : 
    1534             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHosts(
    1535             :     const std::string& name, uint32_t priority,
    1536             :     PrioritySet::UpdateHostsParams&& update_hosts_params,
    1537             :     LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
    1538             :     const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
    1539             :     absl::optional<uint32_t> overprovisioning_factor,
    1540         306 :     HostMapConstSharedPtr cross_priority_host_map) {
    1541         306 :   ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name,
    1542         306 :             hosts_added.size(), hosts_removed.size());
    1543         306 :   priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights),
    1544         306 :                             hosts_added, hosts_removed, weighted_priority_health,
    1545         306 :                             overprovisioning_factor, std::move(cross_priority_host_map));
    1546             :   // If an LB is thread aware, create a new worker local LB on membership changes.
    1547         306 :   if (lb_factory_ != nullptr && lb_factory_->recreateOnHostChange()) {
    1548           0 :     ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
    1549           0 :     lb_ = lb_factory_->create({priority_set_, parent_.local_priority_set_});
    1550           0 :   }
    1551         306 : }
    1552             : 
    1553             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
    1554         306 :     const HostVector& hosts_removed) {
    1555         306 :   for (const auto& host : hosts_removed) {
    1556         306 :     parent_.drainOrCloseConnPools(host, ConnectionPool::DrainBehavior::DrainAndDelete);
    1557         306 :   }
    1558         306 : }
    1559             : 
    1560         306 : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools() {
    1561         306 :   for (auto& host_set : priority_set_.hostSetsPerPriority()) {
    1562         306 :     drainConnPools(host_set->hosts());
    1563         306 :   }
    1564         306 : }
    1565             : 
    1566             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
    1567           0 :     DrainConnectionsHostPredicate predicate, ConnectionPool::DrainBehavior behavior) {
    1568           0 :   for (auto& host_set : priority_set_.hostSetsPerPriority()) {
    1569           0 :     for (const auto& host : host_set->hosts()) {
    1570           0 :       if (predicate != nullptr && !predicate(*host)) {
    1571           0 :         continue;
    1572           0 :       }
    1573             : 
    1574           0 :       parent_.drainOrCloseConnPools(host, behavior);
    1575           0 :     }
    1576           0 :   }
    1577           0 : }
    1578             : 
    1579             : ClusterUpdateCallbacksHandlePtr
    1580           0 : ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) {
    1581           0 :   ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
    1582           0 :   return cluster_manager.addClusterUpdateCallbacks(cb);
    1583           0 : }
    1584             : 
    1585             : OdCdsApiHandlePtr
    1586             : ClusterManagerImpl::allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config,
    1587             :                                      OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
    1588           0 :                                      ProtobufMessage::ValidationVisitor& validation_visitor) {
    1589             :   // TODO(krnowak): Instead of creating a new handle every time, store the handles internally and
    1590             :   // return an already existing one if the config or locator matches. Note that this may need a
    1591             :   // way to clean up the unused handles, so we can close the unnecessary connections.
    1592           0 :   auto odcds = OdCdsApiImpl::create(odcds_config, odcds_resources_locator, *this, *this,
    1593           0 :                                     *stats_.rootScope(), validation_visitor);
    1594           0 :   return OdCdsApiHandleImpl::create(*this, std::move(odcds));
    1595           0 : }
    1596             : 
    1597             : ClusterDiscoveryCallbackHandlePtr
    1598             : ClusterManagerImpl::requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name,
    1599             :                                                     ClusterDiscoveryCallbackPtr callback,
    1600           0 :                                                     std::chrono::milliseconds timeout) {
    1601           0 :   ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
    1602             : 
    1603           0 :   auto [handle, discovery_in_progress, invoker] =
    1604           0 :       cluster_manager.cdm_.addCallback(name, std::move(callback));
    1605             :   // This check will catch requests for discoveries from this thread only. If other thread
    1606             :   // requested the same discovery, we will detect it in the main thread later.
    1607           0 :   if (discovery_in_progress) {
    1608           0 :     ENVOY_LOG(debug,
    1609           0 :               "cm odcds: on-demand discovery for cluster {} is already in progress, something else "
    1610           0 :               "in thread {} has already requested it",
    1611           0 :               name, cluster_manager.thread_local_dispatcher_.name());
    1612             :     // This worker thread has already requested a discovery of a cluster with this name, so
    1613             :     // nothing more left to do here.
    1614             :     //
    1615             :     // We can't "just" return handle here, because handle is a part of the structured binding done
    1616             :     // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like
    1617             :     // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here
    1618             :     // - it needs to be moved.
    1619           0 :     return std::move(handle);
    1620           0 :   }
    1621           0 :   ENVOY_LOG(
    1622           0 :       debug,
    1623           0 :       "cm odcds: forwarding the on-demand discovery request for cluster {} to the main thread",
    1624           0 :       name);
    1625             :   // This seems to be the first request for discovery of this cluster in this worker thread. Rest
    1626             :   // of the process may only happen in the main thread.
    1627           0 :   dispatcher_.post([this, odcds = std::move(odcds), timeout, name = std::move(name),
    1628           0 :                     invoker = std::move(invoker),
    1629           0 :                     &thread_local_dispatcher = cluster_manager.thread_local_dispatcher_] {
    1630             :     // Check for the cluster here too. It might have been added between the time when this closure
    1631             :     // was posted and when it is being executed.
    1632           0 :     if (getThreadLocalCluster(name) != nullptr) {
    1633           0 :       ENVOY_LOG(
    1634           0 :           debug,
    1635           0 :           "cm odcds: the requested cluster {} is already known, posting the callback back to {}",
    1636           0 :           name, thread_local_dispatcher.name());
    1637           0 :       thread_local_dispatcher.post([invoker = std::move(invoker)] {
    1638           0 :         invoker.invokeCallback(ClusterDiscoveryStatus::Available);
    1639           0 :       });
    1640           0 :       return;
    1641           0 :     }
    1642             : 
    1643           0 :     if (auto it = pending_cluster_creations_.find(name); it != pending_cluster_creations_.end()) {
    1644           0 :       ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name);
    1645             :       // We already began the discovery process for this cluster, nothing to do. If we got here,
    1646             :       // it means that it was other worker thread that requested the discovery.
    1647           0 :       return;
    1648           0 :     }
    1649             :     // Start the discovery. If the cluster gets discovered, cluster manager will warm it up and
    1650             :     // invoke the cluster lifecycle callbacks, that will in turn invoke our callback.
    1651           0 :     odcds->updateOnDemand(name);
    1652             :     // Setup the discovery timeout timer to avoid keeping callbacks indefinitely.
    1653           0 :     auto timer = dispatcher_.createTimer([this, name] { notifyExpiredDiscovery(name); });
    1654           0 :     timer->enableTimer(timeout);
    1655             :     // Keep odcds handle alive for the duration of the discovery process.
    1656           0 :     pending_cluster_creations_.insert(
    1657           0 :         {std::move(name), ClusterCreation{std::move(odcds), std::move(timer)}});
    1658           0 :   });
    1659             : 
    1660             :   // We can't "just" return handle here, because handle is a part of the structured binding done
    1661             :   // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like
    1662             :   // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here -
    1663             :   // it needs to be moved.
    1664           0 :   return std::move(handle);
    1665           0 : }
    1666             : 
    1667           0 : void ClusterManagerImpl::notifyMissingCluster(absl::string_view name) {
    1668           0 :   ENVOY_LOG(debug, "cm odcds: cluster {} not found during on-demand discovery", name);
    1669           0 :   notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Missing);
    1670           0 : }
    1671             : 
    1672           0 : void ClusterManagerImpl::notifyExpiredDiscovery(absl::string_view name) {
    1673           0 :   ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} timed out", name);
    1674           0 :   notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Timeout);
    1675           0 : }
    1676             : 
    1677             : void ClusterManagerImpl::notifyClusterDiscoveryStatus(absl::string_view name,
    1678           0 :                                                       ClusterDiscoveryStatus status) {
    1679           0 :   auto map_node_handle = pending_cluster_creations_.extract(name);
    1680           0 :   if (map_node_handle.empty()) {
    1681             :     // Not a cluster we are interested in. This may happen when ODCDS
    1682             :     // receives some cluster name in removed resources field and
    1683             :     // notifies the cluster manager about it.
    1684           0 :     return;
    1685           0 :   }
    1686             :   // Let all the worker threads know that the discovery timed out.
    1687           0 :   tls_.runOnAllThreads(
    1688           0 :       [name = std::string(name), status](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
    1689           0 :         ENVOY_LOG(
    1690           0 :             trace,
    1691           0 :             "cm cdm: starting processing cluster name {} (status {}) from the expired timer in {}",
    1692           0 :             name, enumToInt(status), cluster_manager->thread_local_dispatcher_.name());
    1693           0 :         cluster_manager->cdm_.processClusterName(name, status);
    1694           0 :       });
    1695           0 : }
    1696             : 
    1697           0 : Config::EdsResourcesCacheOptRef ClusterManagerImpl::edsResourcesCache() {
    1698             :   // EDS caching is only supported for ADS.
    1699           0 :   if (ads_mux_) {
    1700           0 :     return ads_mux_->edsResourcesCache();
    1701           0 :   }
    1702           0 :   return {};
    1703           0 : }
    1704             : 
    1705             : ClusterDiscoveryManager
    1706           0 : ClusterManagerImpl::createAndSwapClusterDiscoveryManager(std::string thread_name) {
    1707           0 :   ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
    1708           0 :   ClusterDiscoveryManager cdm(std::move(thread_name), cluster_manager);
    1709             : 
    1710           0 :   cluster_manager.cdm_.swap(cdm);
    1711             : 
    1712           0 :   return cdm;
    1713           0 : }
    1714             : 
    1715             : ProtobufTypes::MessagePtr
    1716          98 : ClusterManagerImpl::dumpClusterConfigs(const Matchers::StringMatcher& name_matcher) {
    1717          98 :   auto config_dump = std::make_unique<envoy::admin::v3::ClustersConfigDump>();
    1718          98 :   config_dump->set_version_info(cds_api_ != nullptr ? cds_api_->versionInfo() : "");
    1719         131 :   for (const auto& active_cluster_pair : active_clusters_) {
    1720         131 :     const auto& cluster = *active_cluster_pair.second;
    1721         131 :     if (!name_matcher.match(cluster.cluster_config_.name())) {
    1722           0 :       continue;
    1723           0 :     }
    1724         131 :     if (!cluster.added_via_api_) {
    1725         131 :       auto& static_cluster = *config_dump->mutable_static_clusters()->Add();
    1726         131 :       static_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
    1727         131 :       TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
    1728         131 :                                             *(static_cluster.mutable_last_updated()));
    1729         131 :     } else {
    1730           0 :       auto& dynamic_cluster = *config_dump->mutable_dynamic_active_clusters()->Add();
    1731           0 :       dynamic_cluster.set_version_info(cluster.version_info_);
    1732           0 :       dynamic_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
    1733           0 :       TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
    1734           0 :                                             *(dynamic_cluster.mutable_last_updated()));
    1735           0 :     }
    1736         131 :   }
    1737             : 
    1738          98 :   for (const auto& warming_cluster_pair : warming_clusters_) {
    1739           0 :     const auto& cluster = *warming_cluster_pair.second;
    1740           0 :     if (!name_matcher.match(cluster.cluster_config_.name())) {
    1741           0 :       continue;
    1742           0 :     }
    1743           0 :     auto& dynamic_cluster = *config_dump->mutable_dynamic_warming_clusters()->Add();
    1744           0 :     dynamic_cluster.set_version_info(cluster.version_info_);
    1745           0 :     dynamic_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
    1746           0 :     TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
    1747           0 :                                           *(dynamic_cluster.mutable_last_updated()));
    1748           0 :   }
    1749             : 
    1750          98 :   return config_dump;
    1751          98 : }
    1752             : 
    1753             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
    1754             :     ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
    1755             :     const absl::optional<LocalClusterParams>& local_cluster_params)
    1756             :     : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(dispatcher.name(), *this),
    1757         223 :       local_stats_(generateStats(*parent.stats_.rootScope(), dispatcher.name())) {
    1758             :   // If local cluster is defined then we need to initialize it first.
    1759         223 :   if (local_cluster_params.has_value()) {
    1760           0 :     const auto& local_cluster_name = local_cluster_params->info_->name();
    1761           0 :     ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name);
    1762           0 :     thread_local_clusters_[local_cluster_name] = std::make_unique<ClusterEntry>(
    1763           0 :         *this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_);
    1764           0 :     local_priority_set_ = &thread_local_clusters_[local_cluster_name]->prioritySet();
    1765           0 :     local_stats_.clusters_inflated_.set(thread_local_clusters_.size());
    1766           0 :   }
    1767         223 : }
    1768             : 
    1769         223 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImpl() {
    1770             :   // Clear out connection pools as well as the thread local cluster map so that we release all
    1771             :   // cluster pointers. Currently we have to free all non-local clusters before we free
    1772             :   // the local cluster. This is because non-local clusters with a zone aware load balancer have a
    1773             :   // member update callback registered with the local cluster.
    1774         223 :   ENVOY_LOG(debug, "shutting down thread local cluster manager");
    1775         223 :   destroying_ = true;
    1776         223 :   host_http_conn_pool_map_.clear();
    1777         223 :   host_tcp_conn_pool_map_.clear();
    1778         223 :   ASSERT(host_tcp_conn_map_.empty());
    1779         337 :   for (auto& cluster : thread_local_clusters_) {
    1780         306 :     if (&cluster.second->prioritySet() != local_priority_set_) {
    1781         306 :       cluster.second.reset();
    1782         306 :     }
    1783         306 :   }
    1784         223 :   thread_local_clusters_.clear();
    1785             : 
    1786             :   // Ensure that all pools are completely destructed.
    1787         223 :   thread_local_dispatcher_.clearDeferredDeleteList();
    1788         223 : }
    1789             : 
    1790             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeTcpConn(
    1791           0 :     const HostConstSharedPtr& host, Network::ClientConnection& connection) {
    1792           0 :   auto host_tcp_conn_map_it = host_tcp_conn_map_.find(host);
    1793           0 :   ASSERT(host_tcp_conn_map_it != host_tcp_conn_map_.end());
    1794           0 :   auto& connections_map = host_tcp_conn_map_it->second.connections_;
    1795           0 :   auto it = connections_map.find(&connection);
    1796           0 :   ASSERT(it != connections_map.end());
    1797           0 :   connection.dispatcher().deferredDelete(std::move(it->second));
    1798           0 :   connections_map.erase(it);
    1799           0 :   if (connections_map.empty()) {
    1800           0 :     host_tcp_conn_map_.erase(host_tcp_conn_map_it);
    1801           0 :   }
    1802           0 : }
    1803             : 
    1804             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(
    1805           0 :     const std::string& name, const HostVector& hosts_removed) {
    1806           0 :   auto entry = thread_local_clusters_.find(name);
    1807             :   // The if should only be possible if deferred cluster creation is enabled.
    1808           0 :   if (entry == thread_local_clusters_.end()) {
    1809           0 :     ASSERT(
    1810           0 :         parent_.deferred_cluster_creation_,
    1811           0 :         fmt::format("Cannot find ThreadLocalCluster {}, but deferred cluster creation is disabled.",
    1812           0 :                     name));
    1813           0 :     ASSERT(thread_local_deferred_clusters_.find(name) != thread_local_deferred_clusters_.end(),
    1814           0 :            "Cluster with removed host is neither deferred or inflated!");
    1815           0 :     return;
    1816           0 :   }
    1817           0 :   const auto& cluster_entry = entry->second;
    1818           0 :   ENVOY_LOG(debug, "removing hosts for TLS cluster {} removed {}", name, hosts_removed.size());
    1819             : 
    1820             :   // We need to go through and purge any connection pools for hosts that got deleted.
    1821             :   // Even if two hosts actually point to the same address this will be safe, since if a
    1822             :   // host is readded it will be a different physical HostSharedPtr.
    1823           0 :   cluster_entry->drainConnPools(hosts_removed);
    1824           0 : }
    1825             : 
    1826             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
    1827             :     const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params,
    1828             :     LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
    1829             :     const HostVector& hosts_removed, bool weighted_priority_health,
    1830         306 :     uint64_t overprovisioning_factor, HostMapConstSharedPtr cross_priority_host_map) {
    1831         306 :   ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end());
    1832         306 :   const auto& cluster_entry = thread_local_clusters_[name];
    1833         306 :   cluster_entry->updateHosts(name, priority, std::move(update_hosts_params),
    1834         306 :                              std::move(locality_weights), hosts_added, hosts_removed,
    1835         306 :                              weighted_priority_health, overprovisioning_factor,
    1836         306 :                              std::move(cross_priority_host_map));
    1837         306 : }
    1838             : 
    1839             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
    1840           0 :     const HostSharedPtr& host) {
    1841           0 :   if (host->cluster().features() &
    1842           0 :       ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
    1843           0 :     drainOrCloseConnPools(host, absl::nullopt);
    1844             : 
    1845             :     // Close non connection pool TCP connections obtained from tcpConn()
    1846             :     //
    1847             :     // TODO(jono): The only remaining user of the non-pooled connections seems to be the statsd
    1848             :     // TCP client. Perhaps it could be rewritten to use a connection pool, and this code deleted.
    1849             :     //
    1850             :     // Each connection will remove itself from the TcpConnectionsMap when it closes, via its
    1851             :     // Network::ConnectionCallbacks. The last removed tcp conn will remove the TcpConnectionsMap
    1852             :     // from host_tcp_conn_map_, so do not cache it between iterations.
    1853             :     //
    1854             :     // TODO(ggreenway) PERF: If there are a large number of connections, this could take a long
    1855             :     // time and halt other useful work. Consider breaking up this work. Note that this behavior is
    1856             :     // noted in the configuration documentation in cluster setting
    1857             :     // "close_connections_on_host_health_failure". Update the docs if this if this changes.
    1858           0 :     while (true) {
    1859           0 :       const auto& it = host_tcp_conn_map_.find(host);
    1860           0 :       if (it == host_tcp_conn_map_.end()) {
    1861           0 :         break;
    1862           0 :       }
    1863           0 :       TcpConnectionsMap& container = it->second;
    1864           0 :       container.connections_.begin()->first->close(
    1865           0 :           Network::ConnectionCloseType::NoFlush,
    1866           0 :           StreamInfo::LocalCloseReasons::get().NonPooledTcpConnectionHostHealthFailure);
    1867           0 :     }
    1868           0 :   } else {
    1869           0 :     drainOrCloseConnPools(host, ConnectionPool::DrainBehavior::DrainExistingConnections);
    1870           0 :   }
    1871           0 : }
    1872             : 
    1873             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ConnPoolsContainer*
    1874             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer(
    1875         730 :     const HostConstSharedPtr& host, bool allocate) {
    1876         730 :   auto container_iter = host_http_conn_pool_map_.find(host);
    1877         730 :   if (container_iter == host_http_conn_pool_map_.end()) {
    1878         479 :     if (!allocate) {
    1879         306 :       return nullptr;
    1880         306 :     }
    1881         173 :     container_iter =
    1882         173 :         host_http_conn_pool_map_.try_emplace(host, thread_local_dispatcher_, host).first;
    1883         173 :   }
    1884             : 
    1885         424 :   return &container_iter->second;
    1886         730 : }
    1887             : 
    1888             : ClusterUpdateCallbacksHandlePtr
    1889             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::addClusterUpdateCallbacks(
    1890         223 :     ClusterUpdateCallbacks& cb) {
    1891         223 :   return std::make_unique<ClusterUpdateCallbacksHandleImpl>(cb, update_callbacks_);
    1892         223 : }
    1893             : 
    1894             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
    1895             :     ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
    1896             :     const LoadBalancerFactorySharedPtr& lb_factory)
    1897             :     : parent_(parent), cluster_info_(cluster), lb_factory_(lb_factory),
    1898         306 :       override_host_statuses_(HostUtility::createOverrideHostStatus(cluster_info_->lbConfig())) {
    1899         306 :   priority_set_.getOrCreateHostSet(0);
    1900             : 
    1901             :   // TODO(mattklein123): Consider converting other LBs over to thread local. All of them could
    1902             :   // benefit given the healthy panic, locality, and priority calculations that take place.
    1903         306 :   if (cluster->lbSubsetInfo().isEnabled()) {
    1904           0 :     auto& factory = Config::Utility::getAndCheckFactoryByName<NonThreadAwareLoadBalancerFactory>(
    1905           0 :         "envoy.load_balancing_policies.subset");
    1906           0 :     lb_ = factory.create(*cluster, priority_set_, parent_.local_priority_set_,
    1907           0 :                          parent.parent_.runtime_, parent.parent_.random_,
    1908           0 :                          parent_.thread_local_dispatcher_.timeSource());
    1909         306 :   } else {
    1910         306 :     switch (cluster->lbType()) {
    1911           0 :     case LoadBalancerType::LeastRequest: {
    1912           0 :       ASSERT(lb_factory_ == nullptr);
    1913           0 :       lb_ = std::make_unique<LeastRequestLoadBalancer>(
    1914           0 :           priority_set_, parent_.local_priority_set_, cluster->lbStats(), parent.parent_.runtime_,
    1915           0 :           parent.parent_.random_, cluster->lbConfig(), cluster->lbLeastRequestConfig(),
    1916           0 :           parent.thread_local_dispatcher_.timeSource());
    1917           0 :       break;
    1918           0 :     }
    1919           0 :     case LoadBalancerType::Random: {
    1920           0 :       ASSERT(lb_factory_ == nullptr);
    1921           0 :       lb_ = std::make_unique<RandomLoadBalancer>(priority_set_, parent_.local_priority_set_,
    1922           0 :                                                  cluster->lbStats(), parent.parent_.runtime_,
    1923           0 :                                                  parent.parent_.random_, cluster->lbConfig());
    1924           0 :       break;
    1925           0 :     }
    1926           0 :     case LoadBalancerType::RoundRobin: {
    1927           0 :       ASSERT(lb_factory_ == nullptr);
    1928           0 :       lb_ = std::make_unique<RoundRobinLoadBalancer>(
    1929           0 :           priority_set_, parent_.local_priority_set_, cluster->lbStats(), parent.parent_.runtime_,
    1930           0 :           parent.parent_.random_, cluster->lbConfig(), cluster->lbRoundRobinConfig(),
    1931           0 :           parent.thread_local_dispatcher_.timeSource());
    1932           0 :       break;
    1933           0 :     }
    1934           0 :     case LoadBalancerType::ClusterProvided:
    1935         306 :     case LoadBalancerType::LoadBalancingPolicyConfig:
    1936         306 :     case LoadBalancerType::RingHash:
    1937         306 :     case LoadBalancerType::Maglev:
    1938         306 :     case LoadBalancerType::OriginalDst: {
    1939         306 :       ASSERT(lb_factory_ != nullptr);
    1940         306 :       lb_ = lb_factory_->create({priority_set_, parent_.local_priority_set_});
    1941         306 :       break;
    1942         306 :     }
    1943         306 :     }
    1944         306 :   }
    1945         306 : }
    1946             : 
    1947             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainOrCloseConnPools(
    1948         306 :     const HostSharedPtr& host, absl::optional<ConnectionPool::DrainBehavior> drain_behavior) {
    1949             :   // Drain or close any HTTP connection pool for the host.
    1950         306 :   {
    1951         306 :     const auto container = getHttpConnPoolsContainer(host);
    1952         306 :     if (container != nullptr) {
    1953           0 :       container->do_not_delete_ = true;
    1954           0 :       if (drain_behavior.has_value()) {
    1955           0 :         container->pools_->drainConnections(drain_behavior.value());
    1956           0 :       } else {
    1957             :         // TODO(wbpcode): 'CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE' and 'closeConnections'
    1958             :         // is only supported for TCP connection pools for now. Use 'DrainExistingConnections'
    1959             :         // drain here as alternative.
    1960           0 :         container->pools_->drainConnections(
    1961           0 :             ConnectionPool::DrainBehavior::DrainExistingConnections);
    1962           0 :       }
    1963           0 :       container->do_not_delete_ = false;
    1964             : 
    1965           0 :       if (container->pools_->empty()) {
    1966           0 :         host_http_conn_pool_map_.erase(host);
    1967           0 :       }
    1968           0 :     }
    1969         306 :   }
    1970             :   // Drain or close any TCP connection pool for the host.
    1971         306 :   {
    1972         306 :     const auto container = host_tcp_conn_pool_map_.find(host);
    1973         306 :     if (container != host_tcp_conn_pool_map_.end()) {
    1974             :       // Draining pools or closing connections can cause pool deletion if it becomes
    1975             :       // idle. Copy `pools_` so that we aren't iterating through a container that
    1976             :       // gets mutated by callbacks deleting from it.
    1977           0 :       std::vector<Tcp::ConnectionPool::Instance*> pools;
    1978           0 :       for (const auto& pair : container->second.pools_) {
    1979           0 :         pools.push_back(pair.second.get());
    1980           0 :       }
    1981             : 
    1982           0 :       for (auto* pool : pools) {
    1983           0 :         if (drain_behavior.has_value()) {
    1984           0 :           pool->drainConnections(drain_behavior.value());
    1985           0 :         } else {
    1986           0 :           pool->closeConnections();
    1987           0 :         }
    1988           0 :       }
    1989           0 :     }
    1990         306 :   }
    1991         306 : }
    1992             : 
    1993         306 : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() {
    1994             :   // We need to drain all connection pools for the cluster being removed. Then we can remove the
    1995             :   // cluster.
    1996             :   //
    1997             :   // TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of
    1998             :   // the hosts inside of the HostImpl destructor. That is a change with wide implications, so we
    1999             :   // are going with a more targeted approach for now.
    2000         306 :   drainConnPools();
    2001         306 : }
    2002             : 
    2003             : Http::ConnectionPool::Instance*
    2004             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl(
    2005             :     ResourcePriority priority, absl::optional<Http::Protocol> downstream_protocol,
    2006         251 :     LoadBalancerContext* context, bool peek) {
    2007         251 :   HostConstSharedPtr host = (peek ? peekAnotherHost(context) : chooseHost(context));
    2008         251 :   if (!host) {
    2009           0 :     if (!peek) {
    2010           0 :       ENVOY_LOG(debug, "no healthy host for HTTP connection pool");
    2011           0 :       cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
    2012           0 :     }
    2013           0 :     return nullptr;
    2014           0 :   }
    2015             : 
    2016             :   // Right now, HTTP, HTTP/2 and ALPN pools are considered separate.
    2017             :   // We could do better here, and always use the ALPN pool and simply make sure
    2018             :   // we end up on a connection of the correct protocol, but for simplicity we're
    2019             :   // starting with something simpler.
    2020         251 :   auto upstream_protocols = host->cluster().upstreamHttpProtocol(downstream_protocol);
    2021         251 :   std::vector<uint8_t> hash_key;
    2022         251 :   hash_key.reserve(upstream_protocols.size());
    2023         251 :   for (auto protocol : upstream_protocols) {
    2024         251 :     hash_key.push_back(uint8_t(protocol));
    2025         251 :   }
    2026             : 
    2027         251 :   absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>
    2028         251 :       alternate_protocol_options = host->cluster().alternateProtocolsCacheOptions();
    2029         251 :   Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
    2030         251 :   if (context) {
    2031             :     // Inherit socket options from downstream connection, if set.
    2032         251 :     if (context->downstreamConnection()) {
    2033         183 :       addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
    2034         183 :     }
    2035         251 :     addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
    2036         251 :   }
    2037             : 
    2038             :   // Use the socket options for computing connection pool hash key, if any.
    2039             :   // This allows socket options to control connection pooling so that connections with
    2040             :   // different options are not pooled together.
    2041         251 :   for (const auto& option : *upstream_options) {
    2042           0 :     option->hashKey(hash_key);
    2043           0 :   }
    2044             : 
    2045         251 :   bool have_transport_socket_options = false;
    2046         251 :   if (context && context->upstreamTransportSocketOptions()) {
    2047         183 :     host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
    2048         183 :     have_transport_socket_options = true;
    2049         183 :   }
    2050             : 
    2051             :   // If configured, use the downstream connection id in pool hash key
    2052         251 :   if (cluster_info_->connectionPoolPerDownstreamConnection() && context &&
    2053         251 :       context->downstreamConnection()) {
    2054           0 :     context->downstreamConnection()->hashKey(hash_key);
    2055           0 :   }
    2056             : 
    2057         251 :   ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);
    2058             : 
    2059             :   // Note: to simplify this, we assume that the factory is only called in the scope of this
    2060             :   // function. Otherwise, we'd need to capture a few of these variables by value.
    2061         251 :   ConnPoolsContainer::ConnPools::PoolOptRef pool =
    2062         251 :       container.pools_->getPool(priority, hash_key, [&]() {
    2063         173 :         auto pool = parent_.parent_.factory_.allocateConnPool(
    2064         173 :             parent_.thread_local_dispatcher_, host, priority, upstream_protocols,
    2065         173 :             alternate_protocol_options, !upstream_options->empty() ? upstream_options : nullptr,
    2066         173 :             have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
    2067         173 :             parent_.parent_.time_source_, parent_.cluster_manager_state_, quic_info_);
    2068             : 
    2069         173 :         pool->addIdleCallback([&parent = parent_, host, priority, hash_key]() {
    2070         173 :           parent.httpConnPoolIsIdle(host, priority, hash_key);
    2071         173 :         });
    2072             : 
    2073         173 :         return pool;
    2074         173 :       });
    2075             : 
    2076         251 :   if (pool.has_value()) {
    2077         251 :     return &(pool.value().get());
    2078         251 :   } else {
    2079           0 :     return nullptr;
    2080           0 :   }
    2081         251 : }
    2082             : 
    2083             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::httpConnPoolIsIdle(
    2084         173 :     HostConstSharedPtr host, ResourcePriority priority, const std::vector<uint8_t>& hash_key) {
    2085         173 :   if (destroying_) {
    2086             :     // If the Cluster is being destroyed, this pool will be cleaned up by that
    2087             :     // process.
    2088           0 :     return;
    2089           0 :   }
    2090             : 
    2091         173 :   ConnPoolsContainer* container = getHttpConnPoolsContainer(host);
    2092         173 :   if (container == nullptr) {
    2093             :     // This could happen if we have cleaned out the host before iterating through every
    2094             :     // connection pool. Handle it by just continuing.
    2095           0 :     return;
    2096           0 :   }
    2097             : 
    2098         173 :   ENVOY_LOG(trace, "Erasing idle pool for host {}", *host);
    2099         173 :   container->pools_->erasePool(priority, hash_key);
    2100             : 
    2101             :   // Guard deletion of the container with `do_not_delete_` to avoid deletion while
    2102             :   // iterating through the container in `container->pools_->startDrain()`. See
    2103             :   // comment in `ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools`.
    2104         173 :   if (!container->do_not_delete_ && container->pools_->empty()) {
    2105         173 :     ENVOY_LOG(trace, "Pool container empty for host {}, erasing host entry", *host);
    2106         173 :     host_http_conn_pool_map_.erase(
    2107         173 :         host); // NOTE: `container` is erased after this point in the lambda.
    2108         173 :   }
    2109         173 : }
    2110             : 
    2111             : HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::chooseHost(
    2112         251 :     LoadBalancerContext* context) {
    2113         251 :   auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
    2114         251 :   HostConstSharedPtr host = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
    2115         251 :                                                             override_host_statuses_, context);
    2116         251 :   if (host != nullptr) {
    2117           0 :     return host;
    2118           0 :   }
    2119         251 :   if (!HostUtility::allowLBChooseHost(context)) {
    2120           0 :     return nullptr;
    2121           0 :   }
    2122         251 :   return lb_->chooseHost(context);
    2123         251 : }
    2124             : 
    2125             : HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::peekAnotherHost(
    2126           0 :     LoadBalancerContext* context) {
    2127           0 :   auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
    2128           0 :   HostConstSharedPtr host = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
    2129           0 :                                                             override_host_statuses_, context);
    2130           0 :   if (host != nullptr) {
    2131           0 :     return host;
    2132           0 :   }
    2133           0 :   return lb_->peekAnotherHost(context);
    2134           0 : }
    2135             : 
    2136             : Tcp::ConnectionPool::Instance*
    2137             : ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolImpl(
    2138           0 :     ResourcePriority priority, LoadBalancerContext* context, bool peek) {
    2139             : 
    2140           0 :   HostConstSharedPtr host = (peek ? peekAnotherHost(context) : chooseHost(context));
    2141           0 :   if (!host) {
    2142           0 :     if (!peek) {
    2143           0 :       ENVOY_LOG(debug, "no healthy host for TCP connection pool");
    2144           0 :       cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
    2145           0 :     }
    2146           0 :     return nullptr;
    2147           0 :   }
    2148             : 
    2149             :   // Inherit socket options from downstream connection, if set.
    2150           0 :   std::vector<uint8_t> hash_key = {uint8_t(priority)};
    2151             : 
    2152             :   // Use downstream connection socket options for computing connection pool hash key, if any.
    2153             :   // This allows socket options to control connection pooling so that connections with
    2154             :   // different options are not pooled together.
    2155           0 :   Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
    2156           0 :   if (context) {
    2157           0 :     if (context->downstreamConnection()) {
    2158           0 :       addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
    2159           0 :     }
    2160           0 :     addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
    2161           0 :   }
    2162             : 
    2163           0 :   for (const auto& option : *upstream_options) {
    2164           0 :     option->hashKey(hash_key);
    2165           0 :   }
    2166             : 
    2167           0 :   bool have_transport_socket_options = false;
    2168           0 :   if (context != nullptr && context->upstreamTransportSocketOptions() != nullptr) {
    2169           0 :     have_transport_socket_options = true;
    2170           0 :     host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
    2171           0 :   }
    2172             : 
    2173           0 :   auto container_iter = parent_.host_tcp_conn_pool_map_.find(host);
    2174           0 :   if (container_iter == parent_.host_tcp_conn_pool_map_.end()) {
    2175           0 :     container_iter = parent_.host_tcp_conn_pool_map_.try_emplace(host, host->acquireHandle()).first;
    2176           0 :   }
    2177           0 :   TcpConnPoolsContainer& container = container_iter->second;
    2178           0 :   auto pool_iter = container.pools_.find(hash_key);
    2179           0 :   if (pool_iter == container.pools_.end()) {
    2180           0 :     bool inserted;
    2181           0 :     std::tie(pool_iter, inserted) = container.pools_.emplace(
    2182           0 :         hash_key,
    2183           0 :         parent_.parent_.factory_.allocateTcpConnPool(
    2184           0 :             parent_.thread_local_dispatcher_, host, priority,
    2185           0 :             !upstream_options->empty() ? upstream_options : nullptr,
    2186           0 :             have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
    2187           0 :             parent_.cluster_manager_state_, cluster_info_->tcpPoolIdleTimeout()));
    2188           0 :     ASSERT(inserted);
    2189           0 :     pool_iter->second->addIdleCallback(
    2190           0 :         [&parent = parent_, host, hash_key]() { parent.tcpConnPoolIsIdle(host, hash_key); });
    2191           0 :   }
    2192             : 
    2193           0 :   return pool_iter->second.get();
    2194           0 : }
    2195             : 
    2196             : void ClusterManagerImpl::ThreadLocalClusterManagerImpl::tcpConnPoolIsIdle(
    2197           0 :     HostConstSharedPtr host, const std::vector<uint8_t>& hash_key) {
    2198           0 :   if (destroying_) {
    2199             :     // If the Cluster is being destroyed, this pool will be cleaned up by that process.
    2200           0 :     return;
    2201           0 :   }
    2202             : 
    2203           0 :   auto it = host_tcp_conn_pool_map_.find(host);
    2204           0 :   if (it != host_tcp_conn_pool_map_.end()) {
    2205           0 :     TcpConnPoolsContainer& container = it->second;
    2206             : 
    2207           0 :     auto erase_iter = container.pools_.find(hash_key);
    2208           0 :     if (erase_iter != container.pools_.end()) {
    2209           0 :       ENVOY_LOG(trace, "Idle pool, erasing pool for host {}", *host);
    2210           0 :       thread_local_dispatcher_.deferredDelete(std::move(erase_iter->second));
    2211           0 :       container.pools_.erase(erase_iter);
    2212           0 :     }
    2213             : 
    2214           0 :     if (container.pools_.empty()) {
    2215           0 :       host_tcp_conn_pool_map_.erase(
    2216           0 :           host); // NOTE: `container` is erased after this point in the lambda.
    2217           0 :     }
    2218           0 :   }
    2219           0 : }
    2220             : 
    2221             : ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto(
    2222         131 :     const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
    2223         131 :   auto cluster_manager_impl = std::unique_ptr<ClusterManagerImpl>{new ClusterManagerImpl(
    2224         131 :       bootstrap, *this, stats_, tls_, context_.runtime(), context_.localInfo(),
    2225         131 :       context_.accessLogManager(), context_.mainThreadDispatcher(), context_.admin(),
    2226         131 :       context_.messageValidationContext(), context_.api(), http_context_, context_.grpcContext(),
    2227         131 :       context_.routerContext(), server_)};
    2228         131 :   THROW_IF_NOT_OK(cluster_manager_impl->init(bootstrap));
    2229         131 :   return cluster_manager_impl;
    2230         131 : }
    2231             : 
    2232             : Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
    2233             :     Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
    2234             :     std::vector<Http::Protocol>& protocols,
    2235             :     const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>&
    2236             :         alternate_protocol_options,
    2237             :     const Network::ConnectionSocket::OptionsSharedPtr& options,
    2238             :     const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
    2239         173 :     TimeSource& source, ClusterConnectivityState& state, Http::PersistentQuicInfoPtr& quic_info) {
    2240             : 
    2241         173 :   Http::HttpServerPropertiesCacheSharedPtr alternate_protocols_cache;
    2242         173 :   if (alternate_protocol_options.has_value()) {
    2243             :     // If there is configuration for an alternate protocols cache, always create one.
    2244           0 :     alternate_protocols_cache = alternate_protocols_cache_manager_->getCache(
    2245           0 :         alternate_protocol_options.value(), dispatcher);
    2246         173 :   } else if (!alternate_protocol_options.has_value() &&
    2247         173 :              (protocols.size() == 2 ||
    2248         173 :               (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2))) {
    2249             :     // If there is no configuration for an alternate protocols cache, still
    2250             :     // create one if there's an HTTP/2 upstream (either explicitly, or for mixed
    2251             :     // HTTP/1.1 and HTTP/2 pools) to track the max concurrent streams across
    2252             :     // connections.
    2253         105 :     envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
    2254         105 :     default_options.set_name(host->cluster().name());
    2255         105 :     alternate_protocols_cache =
    2256         105 :         alternate_protocols_cache_manager_->getCache(default_options, dispatcher);
    2257         105 :   }
    2258             : 
    2259         173 :   absl::optional<Http::HttpServerPropertiesCache::Origin> origin =
    2260         173 :       getOrigin(transport_socket_options, host);
    2261         173 :   if (protocols.size() == 3 &&
    2262         173 :       context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100) &&
    2263         173 :       !transport_socket_options->http11ProxyInfo()) {
    2264           0 :     ASSERT(contains(protocols,
    2265           0 :                     {Http::Protocol::Http11, Http::Protocol::Http2, Http::Protocol::Http3}));
    2266           0 :     ASSERT(alternate_protocol_options.has_value());
    2267           0 :     ASSERT(alternate_protocols_cache);
    2268           0 : #ifdef ENVOY_ENABLE_QUIC
    2269           0 :     Envoy::Http::ConnectivityGrid::ConnectivityOptions coptions{protocols};
    2270           0 :     if (quic_info == nullptr) {
    2271           0 :       quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster());
    2272           0 :     }
    2273           0 :     return std::make_unique<Http::ConnectivityGrid>(
    2274           0 :         dispatcher, context_.api().randomGenerator(), host, priority, options,
    2275           0 :         transport_socket_options, state, source, alternate_protocols_cache, coptions,
    2276           0 :         quic_stat_names_, *stats_.rootScope(), *quic_info);
    2277             : #else
    2278             :     (void)quic_info;
    2279             :     // Should be blocked by configuration checking at an earlier point.
    2280             :     PANIC("unexpected");
    2281             : #endif
    2282           0 :   }
    2283         173 :   if (protocols.size() >= 2) {
    2284           0 :     if (origin.has_value()) {
    2285           0 :       envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
    2286           0 :       default_options.set_name(host->cluster().name());
    2287           0 :       alternate_protocols_cache =
    2288           0 :           alternate_protocols_cache_manager_->getCache(default_options, dispatcher);
    2289           0 :     }
    2290             : 
    2291           0 :     ASSERT(contains(protocols, {Http::Protocol::Http11, Http::Protocol::Http2}));
    2292           0 :     return std::make_unique<Http::HttpConnPoolImplMixed>(
    2293           0 :         dispatcher, context_.api().randomGenerator(), host, priority, options,
    2294           0 :         transport_socket_options, state, origin, alternate_protocols_cache);
    2295           0 :   }
    2296         173 :   if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2 &&
    2297         173 :       context_.runtime().snapshot().featureEnabled("upstream.use_http2", 100)) {
    2298         105 :     return Http::Http2::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
    2299         105 :                                          priority, options, transport_socket_options, state, origin,
    2300         105 :                                          alternate_protocols_cache);
    2301         105 :   }
    2302          68 :   if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http3 &&
    2303          68 :       context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
    2304           0 : #ifdef ENVOY_ENABLE_QUIC
    2305           0 :     if (quic_info == nullptr) {
    2306           0 :       quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster());
    2307           0 :     }
    2308           0 :     return Http::Http3::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
    2309           0 :                                          priority, options, transport_socket_options, state,
    2310           0 :                                          quic_stat_names_, {}, *stats_.rootScope(), {}, *quic_info);
    2311             : #else
    2312             :     UNREFERENCED_PARAMETER(source);
    2313             :     // Should be blocked by configuration checking at an earlier point.
    2314             :     PANIC("unexpected");
    2315             : #endif
    2316           0 :   }
    2317          68 :   ASSERT(protocols.size() == 1 && protocols[0] == Http::Protocol::Http11);
    2318          68 :   return Http::Http1::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority,
    2319          68 :                                        options, transport_socket_options, state);
    2320          68 : }
    2321             : 
    2322             : Tcp::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateTcpConnPool(
    2323             :     Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
    2324             :     const Network::ConnectionSocket::OptionsSharedPtr& options,
    2325             :     Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
    2326             :     ClusterConnectivityState& state,
    2327           0 :     absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout) {
    2328           0 :   ENVOY_LOG_MISC(debug, "Allocating TCP conn pool");
    2329           0 :   return std::make_unique<Tcp::ConnPoolImpl>(
    2330           0 :       dispatcher, host, priority, options, transport_socket_options, state, tcp_pool_idle_timeout);
    2331           0 : }
    2332             : 
    2333             : absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
    2334             : ProdClusterManagerFactory::clusterFromProto(const envoy::config::cluster::v3::Cluster& cluster,
    2335             :                                             ClusterManager& cm,
    2336             :                                             Outlier::EventLoggerSharedPtr outlier_event_logger,
    2337         159 :                                             bool added_via_api) {
    2338         159 :   return ClusterFactoryImplBase::create(cluster, context_, cm, dns_resolver_fn_,
    2339         159 :                                         ssl_context_manager_, outlier_event_logger, added_via_api);
    2340         159 : }
    2341             : 
    2342             : CdsApiPtr
    2343             : ProdClusterManagerFactory::createCds(const envoy::config::core::v3::ConfigSource& cds_config,
    2344             :                                      const xds::core::v3::ResourceLocator* cds_resources_locator,
    2345          28 :                                      ClusterManager& cm) {
    2346             :   // TODO(htuch): Differentiate static vs. dynamic validation visitors.
    2347          28 :   return CdsApiImpl::create(cds_config, cds_resources_locator, cm, *stats_.rootScope(),
    2348          28 :                             context_.messageValidationContext().dynamicValidationVisitor());
    2349          28 : }
    2350             : 
    2351             : } // namespace Upstream
    2352             : } // namespace Envoy

Generated by: LCOV version 1.15