1
#include "source/common/upstream/cluster_manager_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
#include <vector>
9

            
10
#include "envoy/admin/v3/config_dump.pb.h"
11
#include "envoy/config/bootstrap/v3/bootstrap.pb.h"
12
#include "envoy/config/cluster/v3/cluster.pb.h"
13
#include "envoy/config/core/v3/config_source.pb.h"
14
#include "envoy/config/core/v3/protocol.pb.h"
15
#include "envoy/event/dispatcher.h"
16
#include "envoy/grpc/async_client.h"
17
#include "envoy/network/dns.h"
18
#include "envoy/runtime/runtime.h"
19
#include "envoy/stats/scope.h"
20
#include "envoy/tcp/async_tcp_client.h"
21
#include "envoy/upstream/load_balancer.h"
22
#include "envoy/upstream/upstream.h"
23

            
24
#include "source/common/common/assert.h"
25
#include "source/common/common/enum_to_int.h"
26
#include "source/common/common/fmt.h"
27
#include "source/common/common/utility.h"
28
#include "source/common/config/null_grpc_mux_impl.h"
29
#include "source/common/config/utility.h"
30
#include "source/common/config/xds_resource.h"
31
#include "source/common/grpc/async_client_manager_impl.h"
32
#include "source/common/http/async_client_impl.h"
33
#include "source/common/http/http1/conn_pool.h"
34
#include "source/common/http/http2/conn_pool.h"
35
#include "source/common/http/mixed_conn_pool.h"
36
#include "source/common/network/utility.h"
37
#include "source/common/protobuf/utility.h"
38
#include "source/common/router/shadow_writer_impl.h"
39
#include "source/common/runtime/runtime_features.h"
40
#include "source/common/tcp/conn_pool.h"
41
#include "source/common/upstream/cds_api_impl.h"
42
#include "source/common/upstream/cluster_factory_impl.h"
43
#include "source/common/upstream/load_balancer_context_base.h"
44
#include "source/common/upstream/load_stats_reporter_impl.h"
45
#include "source/common/upstream/priority_conn_pool_map_impl.h"
46

            
47
#include "absl/hash/hash.h"
48
#include "absl/status/status.h"
49

            
50
#ifdef ENVOY_ENABLE_QUIC
51
#include "source/common/http/conn_pool_grid.h"
52
#include "source/common/http/http3/conn_pool.h"
53
#include "source/common/quic/client_connection_factory_impl.h"
54
#endif
55

            
56
namespace Envoy {
57
namespace Upstream {
58
namespace {
59

            
60
void addOptionsIfNotNull(Network::Socket::OptionsSharedPtr& options,
61
96150
                         const Network::Socket::OptionsSharedPtr& to_add) {
62
96150
  if (to_add != nullptr) {
63
43831
    Network::Socket::appendOptions(options, to_add);
64
43831
  }
65
96150
}
66

            
67
// Helper function to make sure each protocol in expected_protocols is present
68
// in protocols (only used for an ASSERT in debug builds)
69
bool contains(const std::vector<Http::Protocol>& protocols,
70
              const std::vector<Http::Protocol>& expected_protocols) {
71
  for (auto protocol : expected_protocols) {
72
    if (std::find(protocols.begin(), protocols.end(), protocol) == protocols.end()) {
73
      return false;
74
    }
75
  }
76
  return true;
77
}
78

            
79
absl::optional<Http::HttpServerPropertiesCache::Origin>
80
11755
getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostConstSharedPtr host) {
81
11755
  std::string sni = std::string(host->transportSocketFactory().defaultServerNameIndication());
82
11755
  if (options && options->serverNameOverride().has_value()) {
83
916
    sni = options->serverNameOverride().value();
84
916
  }
85
11755
  if (sni.empty() || !host->address() || !host->address()->ip()) {
86
10614
    return absl::nullopt;
87
10614
  }
88
1141
  return {{"https", sni, host->address()->ip()->port()}};
89
11755
}
90

            
91
bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
92
15876
                          absl::string_view cluster_name) {
93
15876
  bool blocking_ads_cluster = false;
94
15876
  if (bootstrap.dynamic_resources().has_ads_config()) {
95
801
    const auto& ads_config_source = bootstrap.dynamic_resources().ads_config();
96
    // We only care about EnvoyGrpc, not GoogleGrpc, because we only need to delay ADS mux
97
    // initialization if it uses an Envoy cluster that needs to be initialized first. We don't
98
    // depend on the same cluster initialization when opening a gRPC stream for GoogleGrpc.
99
801
    blocking_ads_cluster =
100
801
        (ads_config_source.grpc_services_size() > 0 &&
101
801
         ads_config_source.grpc_services(0).has_envoy_grpc() &&
102
801
         ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
103
801
    if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
104
      // Validate the failover server if there is one.
105
160
      blocking_ads_cluster |=
106
160
          (ads_config_source.grpc_services_size() == 2 &&
107
160
           ads_config_source.grpc_services(1).has_envoy_grpc() &&
108
160
           ads_config_source.grpc_services(1).envoy_grpc().cluster_name() == cluster_name);
109
160
    }
110
801
  }
111
15876
  return blocking_ads_cluster;
112
15876
}
113

            
114
} // namespace
115

            
116
16609
void ClusterManagerInitHelper::addCluster(ClusterManagerCluster& cm_cluster) {
117
  // See comments in ClusterManagerImpl::addOrUpdateCluster() for why this is only called during
118
  // server initialization.
119
16609
  ASSERT(state_ != State::AllClustersInitialized);
120

            
121
16609
  const auto initialize_cb = [&cm_cluster, this] {
122
16124
    RETURN_IF_NOT_OK(onClusterInit(cm_cluster));
123
16124
    cm_cluster.cluster().info()->configUpdateStats().warming_state_.set(0);
124
16124
    return absl::OkStatus();
125
16124
  };
126
16609
  Cluster& cluster = cm_cluster.cluster();
127

            
128
16609
  cluster.info()->configUpdateStats().warming_state_.set(1);
129
16609
  if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
130
    // Remove the previous cluster before the cluster object is destroyed.
131
16247
    primary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
132
16247
    cluster.initialize(initialize_cb);
133
16253
  } else {
134
362
    ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
135
    // Remove the previous cluster before the cluster object is destroyed.
136
362
    secondary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
137
362
    if (started_secondary_initialize_) {
138
      // This can happen if we get a second CDS update that adds new clusters after we have
139
      // already started secondary init. In this case, just immediately initialize.
140
11
      cluster.initialize(initialize_cb);
141
11
    }
142
362
  }
143

            
144
16609
  ENVOY_LOG(debug, "cm init: adding: cluster={} primary={} secondary={}", cluster.info()->name(),
145
16609
            primary_init_clusters_.size(), secondary_init_clusters_.size());
146
16609
}
147

            
148
16432
absl::Status ClusterManagerInitHelper::onClusterInit(ClusterManagerCluster& cluster) {
149
16432
  ASSERT(state_ != State::AllClustersInitialized);
150
16432
  RETURN_IF_NOT_OK(per_cluster_init_callback_(cluster));
151
16432
  removeCluster(cluster);
152
16432
  return absl::OkStatus();
153
16432
}
154

            
155
17106
void ClusterManagerInitHelper::removeCluster(ClusterManagerCluster& cluster) {
156
17106
  if (state_ == State::AllClustersInitialized) {
157
667
    return;
158
667
  }
159

            
160
  // There is a remote edge case where we can remove a cluster via CDS that has not yet been
161
  // initialized. When called via the remove cluster API this code catches that case.
162
16439
  absl::flat_hash_map<std::string, ClusterManagerCluster*>* cluster_map;
163
16439
  if (cluster.cluster().initializePhase() == Cluster::InitializePhase::Primary) {
164
16115
    cluster_map = &primary_init_clusters_;
165
16121
  } else {
166
324
    ASSERT(cluster.cluster().initializePhase() == Cluster::InitializePhase::Secondary);
167
324
    cluster_map = &secondary_init_clusters_;
168
324
  }
169

            
170
  // It is possible that the cluster we are removing has already been initialized, and is not
171
  // present in the initializer map. If so, this is fine as a CDS update may happen for a
172
  // cluster with the same name. See the case "UpdateAlreadyInitialized" of the
173
  // target //test/common/upstream:cluster_manager_impl_test.
174
16439
  auto iter = cluster_map->find(cluster.cluster().info()->name());
175
16439
  if (iter != cluster_map->end() && iter->second == &cluster) {
176
16436
    cluster_map->erase(iter);
177
16436
  }
178
16439
  ENVOY_LOG(debug, "cm init: init complete: cluster={} primary={} secondary={}",
179
16439
            cluster.cluster().info()->name(), primary_init_clusters_.size(),
180
16439
            secondary_init_clusters_.size());
181
16439
  maybeFinishInitialize();
182
16439
}
183

            
184
311
void ClusterManagerInitHelper::initializeSecondaryClusters() {
185
311
  started_secondary_initialize_ = true;
186
  // Cluster::initialize() method can modify the map of secondary_init_clusters_ to remove
187
  // the item currently being initialized, so we eschew range-based-for and do this complicated
188
  // dance to increment the iterator before calling initialize.
189
646
  for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) {
190
335
    ClusterManagerCluster* cluster = iter->second;
191
335
    ENVOY_LOG(debug, "initializing secondary cluster {}", iter->first);
192
335
    ++iter;
193
335
    cluster->cluster().initialize([cluster, this] { return onClusterInit(*cluster); });
194
335
  }
195
311
}
196

            
197
38953
void ClusterManagerInitHelper::maybeFinishInitialize() {
198
  // Do not do anything if we are still doing the initial static load or if we are waiting for
199
  // CDS initialize.
200
38953
  ENVOY_LOG(debug, "maybe finish initialize state: {}", enumToInt(state_));
201
38953
  if (state_ == State::Loading || state_ == State::WaitingToStartCdsInitialization) {
202
15915
    return;
203
15915
  }
204

            
205
23038
  ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
206
23038
         state_ == State::CdsInitialized ||
207
23038
         state_ == State::WaitingForPrimaryInitializationToComplete);
208
23038
  ENVOY_LOG(debug, "maybe finish initialize primary init clusters empty: {}",
209
23038
            primary_init_clusters_.empty());
210
  // If we are still waiting for primary clusters to initialize, do nothing.
211
23038
  if (!primary_init_clusters_.empty()) {
212
303
    return;
213
22784
  } else if (state_ == State::WaitingForPrimaryInitializationToComplete) {
214
10895
    state_ = State::WaitingToStartSecondaryInitialization;
215
10895
    if (primary_clusters_initialized_callback_) {
216
114
      primary_clusters_initialized_callback_();
217
114
    }
218
10895
    return;
219
10895
  }
220

            
221
  // If we are still waiting for secondary clusters to initialize, see if we need to first call
222
  // initialize on them. This is only done once.
223
11840
  ENVOY_LOG(debug, "maybe finish initialize secondary init clusters empty: {}",
224
11840
            secondary_init_clusters_.empty());
225
11840
  if (!secondary_init_clusters_.empty()) {
226
338
    if (!started_secondary_initialize_) {
227
311
      ENVOY_LOG(info, "cm init: initializing secondary clusters");
228
      // If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
229
      // should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
230
      // avoid double pause ClusterLoadAssignment.
231
311
      const std::vector<std::string> paused_xds_types{
232
311
          Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(),
233
311
          Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>(),
234
311
          Config::getTypeUrl<envoy::extensions::transport_sockets::tls::v3::Secret>()};
235
311
      Config::ScopedResume resume_eds_leds_sds = xds_manager_.pause(paused_xds_types);
236
311
      initializeSecondaryClusters();
237
311
    }
238
338
    return;
239
338
  }
240

            
241
  // At this point, if we are doing static init, and we have CDS, start CDS init. Otherwise, move
242
  // directly to initialized.
243
11502
  started_secondary_initialize_ = false;
244
11502
  ENVOY_LOG(debug, "maybe finish initialize cds api ready: {}", cds_ != nullptr);
245
11502
  if (state_ == State::WaitingToStartSecondaryInitialization && cds_) {
246
710
    ENVOY_LOG(info, "cm init: initializing cds");
247
710
    state_ = State::WaitingToStartCdsInitialization;
248
710
    cds_->initialize();
249
10816
  } else {
250
10792
    ENVOY_LOG(info, "cm init: all clusters initialized");
251
10792
    state_ = State::AllClustersInitialized;
252
10792
    if (initialized_callback_) {
253
663
      initialized_callback_();
254
663
    }
255
10792
  }
256
11502
}
257

            
258
11000
void ClusterManagerInitHelper::onStaticLoadComplete() {
259
11000
  ASSERT(state_ == State::Loading);
260
  // After initialization of primary clusters has completed, transition to
261
  // waiting for signal to initialize secondary clusters and then CDS.
262
11000
  state_ = State::WaitingForPrimaryInitializationToComplete;
263
11000
  maybeFinishInitialize();
264
11000
}
265

            
266
10822
void ClusterManagerInitHelper::startInitializingSecondaryClusters() {
267
10822
  ASSERT(state_ == State::WaitingToStartSecondaryInitialization);
268
10822
  ENVOY_LOG(debug, "continue initializing secondary clusters");
269
10822
  maybeFinishInitialize();
270
10822
}
271

            
272
10992
void ClusterManagerInitHelper::setCds(CdsApi* cds) {
273
10992
  ASSERT(state_ == State::Loading);
274
10992
  cds_ = cds;
275
10992
  if (cds_) {
276
714
    cds_->setInitializedCb([this]() -> void {
277
692
      ASSERT(state_ == State::WaitingToStartCdsInitialization);
278
692
      state_ = State::CdsInitialized;
279
692
      maybeFinishInitialize();
280
692
    });
281
714
  }
282
10992
}
283

            
284
void ClusterManagerInitHelper::setInitializedCb(
285
10681
    ClusterManager::InitializationCompleteCallback callback) {
286
10681
  if (state_ == State::AllClustersInitialized) {
287
9942
    callback();
288
10507
  } else {
289
739
    initialized_callback_ = callback;
290
739
  }
291
10681
}
292

            
293
void ClusterManagerInitHelper::setPrimaryClustersInitializedCb(
294
10827
    ClusterManager::PrimaryClustersReadyCallback callback) {
295
  // The callback must be set before or at the `WaitingToStartSecondaryInitialization` state.
296
10827
  ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
297
10827
         state_ == State::WaitingForPrimaryInitializationToComplete || state_ == State::Loading);
298
10827
  if (state_ == State::WaitingToStartSecondaryInitialization) {
299
    // This is the case where all clusters are STATIC and without health checking.
300
10698
    callback();
301
10740
  } else {
302
129
    primary_clusters_initialized_callback_ = callback;
303
129
  }
304
10827
}
305

            
306
ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
307
                                       ClusterManagerFactory& factory,
308
                                       Server::Configuration::ServerFactoryContext& context,
309
                                       absl::Status& creation_status)
310
11012
    : context_(context), factory_(factory), runtime_(context.runtime()),
311
11012
      stats_(context.serverScope().store()), tls_(context.threadLocal()),
312
11012
      xds_manager_(context.xdsManager()), random_(context.api().randomGenerator()),
313
11012
      deferred_cluster_creation_(bootstrap.cluster_manager().enable_deferred_cluster_creation()),
314
11012
      bind_config_(bootstrap.cluster_manager().has_upstream_bind_config()
315
11012
                       ? absl::make_optional(bootstrap.cluster_manager().upstream_bind_config())
316
11012
                       : absl::nullopt),
317
11012
      local_info_(context.localInfo()), cm_stats_(generateStats(*stats_.rootScope())),
318
11012
      init_helper_(xds_manager_,
319
16620
                   [this](ClusterManagerCluster& cluster) { return onClusterInit(cluster); }),
320
11012
      time_source_(context.timeSource()), dispatcher_(context.mainThreadDispatcher()),
321
11012
      http_context_(context.httpContext()), router_context_(context.routerContext()),
322
11012
      cluster_stat_names_(stats_.symbolTable()),
323
11012
      cluster_config_update_stat_names_(stats_.symbolTable()),
324
11012
      cluster_lb_stat_names_(stats_.symbolTable()),
325
11012
      cluster_endpoint_stat_names_(stats_.symbolTable()),
326
11012
      cluster_load_report_stat_names_(stats_.symbolTable()),
327
11012
      cluster_circuit_breakers_stat_names_(stats_.symbolTable()),
328
11012
      cluster_request_response_size_stat_names_(stats_.symbolTable()),
329
11012
      cluster_timeout_budget_stat_names_(stats_.symbolTable()),
330
      common_lb_config_pool_(
331
11012
          std::make_shared<SharedPool::ObjectSharedPool<
332
11012
              const envoy::config::cluster::v3::Cluster::CommonLbConfig, MessageUtil, MessageUtil>>(
333
11012
              dispatcher_)),
334
11012
      shutdown_(false) {
335
11012
  if (auto admin = context.admin(); admin.has_value()) {
336
11012
    config_tracker_entry_ = admin->getConfigTracker().add(
337
11012
        "clusters", [this](const Matchers::StringMatcher& name_matcher) {
338
91
          return dumpClusterConfigs(name_matcher);
339
91
        });
340
11012
  }
341
11012
  async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
342
11012
      bootstrap.grpc_async_client_manager_config(), context, context.grpcContext().statNames());
343
11012
  const auto& cm_config = bootstrap.cluster_manager();
344
11012
  if (cm_config.has_outlier_detection()) {
345
3
    const std::string event_log_file_path = cm_config.outlier_detection().event_log_path();
346
3
    if (!event_log_file_path.empty()) {
347
3
      auto outlier_or_error = Outlier::EventLoggerImpl::create(context.accessLogManager(),
348
3
                                                               event_log_file_path, time_source_);
349
3
      SET_AND_RETURN_IF_NOT_OK(outlier_or_error.status(), creation_status);
350
3
      outlier_event_logger_ = std::move(*outlier_or_error);
351
3
    }
352
3
  }
353

            
354
  // We need to know whether we're zone aware early on, so make sure we do this lookup
355
  // before we load any clusters.
356
11012
  if (!cm_config.local_cluster_name().empty()) {
357
13
    local_cluster_name_ = cm_config.local_cluster_name();
358
13
  }
359

            
360
  // Now that the async-client manager is set, the xDS-Manager can be initialized.
361
11012
  SET_AND_RETURN_IF_NOT_OK(xds_manager_.initialize(bootstrap, this), creation_status);
362
11012
}
363

            
364
absl::Status
365
11011
ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
366
11011
  ASSERT(!initialized_);
367
11011
  initialized_ = true;
368

            
369
  // Cluster loading happens in two phases: first all the primary clusters are loaded, and then all
370
  // the secondary clusters are loaded. As it currently stands all non-EDS clusters and EDS which
371
  // load endpoint definition from file are primary and
372
  // (REST,GRPC,DELTA_GRPC) EDS clusters are secondary. This two phase
373
  // loading is done because in v2 configuration each EDS cluster individually sets up a
374
  // subscription. When this subscription is an API source the cluster will depend on a non-EDS
375
  // cluster, so the non-EDS clusters must be loaded first.
376
32092
  auto is_primary_cluster = [](const envoy::config::cluster::v3::Cluster& cluster) -> bool {
377
31886
    return cluster.type() != envoy::config::cluster::v3::Cluster::EDS ||
378
31886
           (cluster.type() == envoy::config::cluster::v3::Cluster::EDS &&
379
204
            Config::SubscriptionFactory::isPathBasedConfigSource(
380
204
                cluster.eds_cluster_config().eds_config().config_source_specifier_case()));
381
31886
  };
382
  // Build book-keeping for which clusters are primary. This is useful when we
383
  // invoke loadCluster() below and it needs the complete set of primaries.
384
16184
  for (const auto& cluster : bootstrap.static_resources().clusters()) {
385
15943
    if (is_primary_cluster(cluster)) {
386
15876
      primary_clusters_.insert(cluster.name());
387
15876
    }
388
15943
  }
389

            
390
11011
  bool has_ads_cluster = false;
391
  // Load all the primary clusters.
392
16184
  for (const auto& cluster : bootstrap.static_resources().clusters()) {
393
15943
    if (is_primary_cluster(cluster)) {
394
15876
      const bool required_for_ads = isBlockingAdsCluster(bootstrap, cluster.name());
395
15876
      has_ads_cluster |= required_for_ads;
396
      // TODO(abeyad): Consider passing a lambda for a "post-cluster-init" callback, which would
397
      // include a conditional ads_mux_->start() call, if other uses cases for "post-cluster-init"
398
      // functionality pops up.
399
15876
      auto status_or_cluster =
400
15876
          loadCluster(cluster, MessageUtil::hash(cluster), "", /*added_via_api=*/false,
401
15876
                      required_for_ads, active_clusters_);
402
15876
      RETURN_IF_NOT_OK_REF(status_or_cluster.status());
403
15872
    }
404
15943
  }
405

            
406
  // Now setup ADS if needed, this might rely on a primary cluster.
407
11007
  RETURN_IF_NOT_OK(xds_manager_.initializeAdsConnections(bootstrap));
408

            
409
  // After ADS is initialized, load EDS static clusters as EDS config may potentially need ADS.
410
16175
  for (const auto& cluster : bootstrap.static_resources().clusters()) {
411
    // Now load all the secondary clusters.
412
15924
    if (cluster.type() == envoy::config::cluster::v3::Cluster::EDS &&
413
15924
        !Config::SubscriptionFactory::isPathBasedConfigSource(
414
101
            cluster.eds_cluster_config().eds_config().config_source_specifier_case())) {
415
66
      ASSERT(!isBlockingAdsCluster(bootstrap, cluster.name()));
416
      // Passing "false" for required_for_ads because an ADS cluster cannot be
417
      // defined using EDS (or non-primary cluster).
418
66
      auto status_or_cluster =
419
66
          loadCluster(cluster, MessageUtil::hash(cluster), "", /*added_via_api=*/false,
420
66
                      /*required_for_ads=*/false, active_clusters_);
421
66
      if (!status_or_cluster.status().ok()) {
422
        return status_or_cluster.status();
423
      }
424
66
    }
425
15924
  }
426

            
427
11006
  cm_stats_.cluster_added_.add(bootstrap.static_resources().clusters().size());
428
11006
  updateClusterCounts();
429

            
430
11006
  absl::optional<ThreadLocalClusterManagerImpl::LocalClusterParams> local_cluster_params;
431
11006
  if (local_cluster_name_) {
432
11
    auto local_cluster = active_clusters_.find(local_cluster_name_.value());
433
11
    if (local_cluster == active_clusters_.end()) {
434
1
      return absl::InvalidArgumentError(
435
1
          fmt::format("local cluster '{}' must be defined", local_cluster_name_.value()));
436
1
    }
437
10
    local_cluster_params.emplace();
438
10
    local_cluster_params->info_ = local_cluster->second->cluster().info();
439
10
    local_cluster_params->load_balancer_factory_ = local_cluster->second->loadBalancerFactory();
440
10
    local_cluster->second->setAddedOrUpdated();
441
10
  }
442

            
443
  // Once the initial set of static bootstrap clusters are created (including the local cluster),
444
  // we can instantiate the thread local cluster manager.
445
21625
  tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) {
446
21612
    return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_params);
447
21612
  });
448

            
449
11005
  const auto& dyn_resources = bootstrap.dynamic_resources();
450
  // We can now potentially create the CDS API once the backing cluster exists.
451
11005
  if (dyn_resources.has_cds_config() || !dyn_resources.cds_resources_locator().empty()) {
452
719
    std::unique_ptr<xds::core::v3::ResourceLocator> cds_resources_locator;
453
719
    if (!dyn_resources.cds_resources_locator().empty()) {
454
14
      auto url_or_error =
455
14
          Config::XdsResourceIdentifier::decodeUrl(dyn_resources.cds_resources_locator());
456
14
      RETURN_IF_NOT_OK_REF(url_or_error.status());
457
14
      cds_resources_locator =
458
14
          std::make_unique<xds::core::v3::ResourceLocator>(std::move(url_or_error.value()));
459
14
    }
460
    // In case cds_config is configured and the new xDS-TP configs are used,
461
    // then the CdsApi will need to track the resources, as the xDS-TP configs
462
    // may be used for OD-CDS. If this is not set, the SotW update may override
463
    // the OD-CDS resources.
464
719
    const bool support_multi_ads_sources =
465
719
        bootstrap.has_default_config_source() || !bootstrap.config_sources().empty();
466
719
    auto cds_or_error = factory_.createCds(dyn_resources.cds_config(), cds_resources_locator.get(),
467
719
                                           *this, support_multi_ads_sources);
468
719
    RETURN_IF_NOT_OK_REF(cds_or_error.status())
469
719
    cds_api_ = std::move(*cds_or_error);
470
719
    init_helper_.setCds(cds_api_.get());
471
10884
  } else {
472
10286
    init_helper_.setCds(nullptr);
473
10286
  }
474

            
475
  // Proceed to add all static bootstrap clusters to the init manager. This will immediately
476
  // initialize any primary clusters. Post-init processing further initializes any thread
477
  // aware load balancer and sets up the per-worker host set updates.
478
16174
  for (auto& cluster : active_clusters_) {
479
15920
    init_helper_.addCluster(*cluster.second);
480
15920
  }
481

            
482
  // Potentially move to secondary initialization on the static bootstrap clusters if all primary
483
  // clusters have already initialized. (E.g., if all static).
484
11005
  init_helper_.onStaticLoadComplete();
485

            
486
  // Initialize the ADS and xDS-TP config based connections.
487
11005
  if (!has_ads_cluster) {
488
    // There is no ADS cluster, so we won't be starting the ADS mux after a cluster has finished
489
    // initializing, so we must start ADS here.
490
10785
    xds_manager_.adsMux()->start();
491
10785
  }
492
  // TODO(adisuissa): to ensure parity with the non-xdstp-config-based ADS
493
  // we need to change this to only be invoked for Envoy-based clusters when
494
  // they are ready (this is needed to avoid early connection attempts in the
495
  // DNS based clusters).
496
11005
  xds_manager_.startXdstpAdsMuxes();
497
11005
  return absl::OkStatus();
498
11005
}
499

            
500
absl::Status ClusterManagerImpl::initializeSecondaryClusters(
501
10815
    const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
502
10815
  init_helper_.startInitializingSecondaryClusters();
503

            
504
10815
  const auto& cm_config = bootstrap.cluster_manager();
505
10815
  if (cm_config.has_load_stats_config()) {
506
31
    const auto& load_stats_config = cm_config.load_stats_config();
507

            
508
31
    absl::Status status = Config::Utility::checkTransportVersion(load_stats_config);
509
31
    RETURN_IF_NOT_OK(status);
510
31
    absl::StatusOr<Grpc::RawAsyncClientSharedPtr> client_or_error;
511
31
    if (Runtime::runtimeFeatureEnabled("envoy.restart_features.use_cached_grpc_client_for_xds")) {
512
2
      absl::StatusOr<Envoy::OptRef<const envoy::config::core::v3::GrpcService>> maybe_grpc_service =
513
2
          Envoy::Config::Utility::getGrpcConfigFromApiConfigSource(load_stats_config,
514
2
                                                                   /*grpc_service_idx*/ 0,
515
2
                                                                   /*xdstp_config_source*/ false);
516
2
      RETURN_IF_NOT_OK_REF(maybe_grpc_service.status());
517
2
      if (maybe_grpc_service.value().has_value()) {
518
2
        client_or_error = async_client_manager_->getOrCreateRawAsyncClientWithHashKey(
519
2
            Grpc::GrpcServiceConfigWithHashKey(*maybe_grpc_service.value()), *stats_.rootScope(),
520
2
            /*skip_cluster_check*/ false);
521
2
      } else {
522
        return absl::InvalidArgumentError("Invalid grpc service.");
523
      }
524
29
    } else {
525
29
      auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
526
29
          *async_client_manager_, load_stats_config, *stats_.rootScope(), false, 0, false);
527
29
      RETURN_IF_NOT_OK_REF(factory_or_error.status());
528
28
      client_or_error = factory_or_error.value()->createUncachedRawAsyncClient();
529
28
    }
530
30
    RETURN_IF_NOT_OK_REF(client_or_error.status());
531
30
    load_stats_reporter_ = std::make_unique<LoadStatsReporterImpl>(
532
30
        local_info_, *this, *stats_.rootScope(), std::move(client_or_error.value()), dispatcher_);
533
30
  }
534
10814
  return absl::OkStatus();
535
10815
}
536

            
537
11012
ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) {
538
11012
  const std::string final_prefix = "cluster_manager.";
539
11012
  return {ALL_CLUSTER_MANAGER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix),
540
11012
                                    POOL_GAUGE_PREFIX(scope, final_prefix))};
541
11012
}
542

            
543
ThreadLocalClusterManagerStats
544
ClusterManagerImpl::ThreadLocalClusterManagerImpl::generateStats(Stats::Scope& scope,
545
21612
                                                                 const std::string& thread_name) {
546
21612
  const std::string final_prefix = absl::StrCat("thread_local_cluster_manager.", thread_name);
547
21612
  return {ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(POOL_GAUGE_PREFIX(scope, final_prefix))};
548
21612
}
549

            
550
17323
absl::Status ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) {
551
  // This routine is called when a cluster has finished initializing. The cluster has not yet
552
  // been setup for cross-thread updates to avoid needless updates during initialization. The order
553
  // of operations here is important. We start by initializing the thread aware load balancer if
554
  // needed. This must happen first so cluster updates are heard first by the load balancer.
555
  // Also, it assures that all of clusters which this function is called should be always active.
556
17323
  auto& cluster = cm_cluster.cluster();
557
17323
  auto cluster_data = warming_clusters_.find(cluster.info()->name());
558
  // We have a situation that clusters will be immediately active, such as static and primary
559
  // cluster. So we must have this prevention logic here.
560
17323
  if (cluster_data != warming_clusters_.end()) {
561
1550
    clusterWarmingToActive(cluster.info()->name());
562
1550
    updateClusterCounts();
563
1550
  }
564
17323
  cluster_data = active_clusters_.find(cluster.info()->name());
565

            
566
17323
  if (cluster_data->second->thread_aware_lb_ != nullptr) {
567
17323
    RETURN_IF_NOT_OK(cluster_data->second->thread_aware_lb_->initialize());
568
17323
  }
569

            
570
  // Now setup for cross-thread updates.
571
  // This is used by cluster types such as EDS clusters to drain the connection pools of removed
572
  // hosts.
573
17323
  cluster_data->second->member_update_cb_ = cluster.prioritySet().addMemberUpdateCb(
574
17447
      [&cluster, this](const HostVector&, const HostVector& hosts_removed) {
575
900
        if (cluster.info()->lbConfig().close_connections_on_host_set_change()) {
576
6
          for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
577
            // This will drain all tcp and http connection pools.
578
6
            postThreadLocalRemoveHosts(cluster, host_set->hosts());
579
6
          }
580
894
        } else {
581
          // TODO(snowp): Should this be subject to merge windows?
582

            
583
          // Whenever hosts are removed from the cluster, we make each TLS cluster drain it's
584
          // connection pools for the removed hosts. If `close_connections_on_host_set_change` is
585
          // enabled, this case will be covered by first `if` statement, where all
586
          // connection pools are drained.
587
894
          if (!hosts_removed.empty()) {
588
74
            postThreadLocalRemoveHosts(cluster, hosts_removed);
589
74
          }
590
894
        }
591
900
      });
592

            
593
  // This is used by cluster types such as EDS clusters to update the cluster
594
  // without draining the cluster.
595
17323
  cluster_data->second->priority_update_cb_ = cluster.prioritySet().addPriorityUpdateCb(
596
17323
      [&cm_cluster, this](uint32_t priority, const HostVector& hosts_added,
597
17444
                          const HostVector& hosts_removed) {
598
        // This fires when a cluster is about to have an updated member set. We need to send this
599
        // out to all of the thread local configurations.
600

            
601
        // Should we save this update and merge it with other updates?
602
        //
603
        // Note that we can only _safely_ merge updates that have no added/removed hosts. That is,
604
        // only those updates that signal a change in host healthcheck state, weight or metadata.
605
        //
606
        // We've discussed merging updates related to hosts being added/removed, but it's really
607
        // tricky to merge those given that downstream consumers of these updates expect to see the
608
        // full list of updates, not a condensed one. This is because they use the broadcasted
609
        // HostSharedPtrs within internal maps to track hosts. If we fail to broadcast the entire
610
        // list of removals, these maps will leak those HostSharedPtrs.
611
        //
612
        // See https://github.com/envoyproxy/envoy/pull/3941 for more context.
613
503
        bool scheduled = false;
614
503
        const auto merge_timeout = PROTOBUF_GET_MS_OR_DEFAULT(
615
503
            cm_cluster.cluster().info()->lbConfig(), update_merge_window, 1000);
616
        // Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes.
617
503
        const bool is_mergeable = hosts_added.empty() && hosts_removed.empty();
618

            
619
503
        if (merge_timeout > 0) {
620
          // If this is not mergeable, we should cancel any scheduled updates since
621
          // we'll deliver it immediately.
622
457
          scheduled = scheduleUpdate(cm_cluster, priority, is_mergeable, merge_timeout);
623
457
        }
624

            
625
        // If an update was not scheduled for later, deliver it immediately.
626
503
        if (!scheduled) {
627
437
          cm_stats_.cluster_updated_.inc();
628
437
          postThreadLocalClusterUpdate(
629
437
              cm_cluster, ThreadLocalClusterUpdateParams(priority, hosts_added, hosts_removed));
630
437
        }
631
503
      });
632

            
633
  // Finally, post updates cross-thread so the per-thread load balancers are ready. First we
634
  // populate any update information that may be available after cluster init.
635
17323
  ThreadLocalClusterUpdateParams params;
636
17396
  for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
637
17394
    if (host_set->hosts().empty()) {
638
484
      continue;
639
484
    }
640
16910
    params.per_priority_update_params_.emplace_back(host_set->priority(), host_set->hosts(),
641
16910
                                                    HostVector{});
642
16910
  }
643
  // NOTE: In all cases *other* than the local cluster, this is when a cluster is added/updated
644
  // The local cluster must currently be statically defined and must exist prior to other
645
  // clusters being added/updated. We could gate the below update on hosts being available on
646
  // the cluster or the cluster not already existing, but the special logic is not worth it.
647
17323
  postThreadLocalClusterUpdate(cm_cluster, std::move(params));
648
17323
  return absl::OkStatus();
649
17323
}
650

            
651
bool ClusterManagerImpl::scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority,
652
457
                                        bool mergeable, const uint64_t timeout) {
653
  // Find pending updates for this cluster.
654
457
  auto& updates_by_prio = updates_map_[cluster.cluster().info()->name()];
655
457
  if (!updates_by_prio) {
656
246
    updates_by_prio = std::make_unique<PendingUpdatesByPriorityMap>();
657
246
  }
658

            
659
  // Find pending updates for this priority.
660
457
  auto& updates = (*updates_by_prio)[priority];
661
457
  if (!updates) {
662
286
    updates = std::make_unique<PendingUpdates>();
663
286
  }
664

            
665
  // Has an update_merge_window gone by since the last update? If so, don't schedule
666
  // the update so it can be applied immediately. Ditto if this is not a mergeable update.
667
457
  const auto delta = time_source_.monotonicTime() - updates->last_updated_;
668
457
  const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
669
457
  const bool out_of_merge_window = delta_ms > timeout;
670
457
  if (out_of_merge_window || !mergeable) {
671
    // If there was a pending update, we cancel the pending merged update.
672
    //
673
    // Note: it's possible that even though we are outside of a merge window (delta_ms > timeout),
674
    // a timer is enabled. This race condition is fine, since we'll disable the timer here and
675
    // deliver the update immediately.
676

            
677
    // Why wasn't the update scheduled for later delivery? We keep some stats that are helpful
678
    // to understand why merging did not happen. There's 2 things we are tracking here:
679

            
680
    // 1) Was this update out of a merge window?
681
391
    if (mergeable && out_of_merge_window) {
682
74
      cm_stats_.update_out_of_merge_window_.inc();
683
74
    }
684

            
685
    // 2) Were there previous updates that we are cancelling (and delivering immediately)?
686
391
    if (updates->disableTimer()) {
687
12
      cm_stats_.update_merge_cancelled_.inc();
688
12
    }
689

            
690
391
    updates->last_updated_ = time_source_.monotonicTime();
691
391
    return false;
692
391
  }
693

            
694
  // If there's no timer, create one.
695
66
  if (updates->timer_ == nullptr) {
696
36
    updates->timer_ = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void {
697
4
      applyUpdates(cluster, priority, *updates);
698
4
    });
699
36
  }
700

            
701
  // Ensure there's a timer set to deliver these updates.
702
66
  if (!updates->timer_->enabled()) {
703
44
    updates->enableTimer(timeout);
704
44
  }
705

            
706
66
  return true;
707
457
}
708

            
709
void ClusterManagerImpl::applyUpdates(ClusterManagerCluster& cluster, uint32_t priority,
710
4
                                      PendingUpdates& updates) {
711
  // Deliver pending updates.
712

            
713
  // Remember that these merged updates are _only_ for updates related to
714
  // HC/weight/metadata changes. That's why added/removed are empty. All
715
  // adds/removals were already immediately broadcasted.
716
4
  static const HostVector hosts_added;
717
4
  static const HostVector hosts_removed;
718

            
719
4
  postThreadLocalClusterUpdate(
720
4
      cluster, ThreadLocalClusterUpdateParams(priority, hosts_added, hosts_removed));
721

            
722
4
  cm_stats_.cluster_updated_via_merge_.inc();
723
4
  updates.last_updated_ = time_source_.monotonicTime();
724
4
}
725

            
726
absl::StatusOr<bool>
727
ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cluster& cluster,
728
                                       const std::string& version_info,
729
1759
                                       const bool avoid_cds_removal) {
730
  // First we need to see if this new config is new or an update to an existing dynamic cluster.
731
  // We don't allow updates to statically configured clusters in the main configuration. We check
732
  // both the warming clusters and the active clusters to see if we need an update or the update
733
  // should be blocked.
734
1759
  const std::string& cluster_name = cluster.name();
735
1759
  const auto existing_active_cluster = active_clusters_.find(cluster_name);
736
1759
  const auto existing_warming_cluster = warming_clusters_.find(cluster_name);
737
1759
  const uint64_t new_hash = MessageUtil::hash(cluster);
738
1759
  if (existing_warming_cluster != warming_clusters_.end()) {
739
    // If the cluster is the same as the warming cluster of the same name, block the update.
740
30
    if (existing_warming_cluster->second->blockUpdate(new_hash)) {
741
12
      return false;
742
12
    }
743
    // NB: https://github.com/envoyproxy/envoy/issues/14598
744
    // Always proceed if the cluster is different from the existing warming cluster.
745
1729
  } else if (existing_active_cluster != active_clusters_.end() &&
746
1729
             existing_active_cluster->second->blockUpdate(new_hash)) {
747
    // If there's no warming cluster of the same name, and if the cluster is the same as the active
748
    // cluster of the same name, block the update.
749
133
    return false;
750
133
  }
751

            
752
1614
  if (existing_active_cluster != active_clusters_.end() ||
753
1614
      existing_warming_cluster != warming_clusters_.end()) {
754
88
    if (existing_active_cluster != active_clusters_.end()) {
755
      // The following init manager remove call is a NOP in the case we are already initialized.
756
      // It's just kept here to avoid additional logic.
757
72
      init_helper_.removeCluster(*existing_active_cluster->second);
758
72
    }
759
88
    cm_stats_.cluster_modified_.inc();
760
1526
  } else {
761
1526
    cm_stats_.cluster_added_.inc();
762
1526
  }
763

            
764
  // There are two discrete paths here depending on when we are adding/updating a cluster.
765
  // 1) During initial server load we use the init manager which handles complex logic related to
766
  //    primary/secondary init, static/CDS init, warming all clusters, etc.
767
  // 2) After initial server load, we handle warming independently for each cluster in the warming
768
  //    map.
769
  // Note: It's likely possible that all warming logic could be centralized in the init manager, but
770
  //       a decision was made to split the logic given how complex the init manager already is. In
771
  //       the future we may decide to undergo a refactor to unify the logic but the effort/risk to
772
  //       do that right now does not seem worth it given that the logic is generally pretty clean
773
  //       and easy to understand.
774
1614
  const bool all_clusters_initialized =
775
1614
      init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
776
  // Preserve the previous cluster data to avoid early destroy. The same cluster should be added
777
  // before destroy to avoid early initialization complete.
778
1614
  auto status_or_cluster =
779
1614
      loadCluster(cluster, new_hash, version_info, /*added_via_api=*/true,
780
1614
                  /*required_for_ads=*/false, warming_clusters_, avoid_cds_removal);
781
1614
  RETURN_IF_NOT_OK_REF(status_or_cluster.status());
782
1614
  const ClusterDataPtr previous_cluster = std::move(status_or_cluster.value());
783
1614
  auto& cluster_entry = warming_clusters_.at(cluster_name);
784
1614
  cluster_entry->cluster_->info()->configUpdateStats().warming_state_.set(1);
785
1614
  if (!all_clusters_initialized) {
786
676
    ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
787
676
    init_helper_.addCluster(*cluster_entry);
788
1369
  } else {
789
938
    ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
790
938
    cluster_entry->cluster_->initialize([this, cluster_name] {
791
902
      ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
792
902
      auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
793
902
      state_changed_cluster_entry->second->cluster_->info()->configUpdateStats().warming_state_.set(
794
902
          0);
795
902
      return onClusterInit(*state_changed_cluster_entry->second);
796
902
    });
797
938
  }
798

            
799
1614
  return true;
800
1614
}
801

            
802
1550
void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
803
1550
  auto warming_it = warming_clusters_.find(cluster_name);
804
1550
  ASSERT(warming_it != warming_clusters_.end());
805

            
806
  // If the cluster is being updated, we need to cancel any pending merged updates.
807
  // Otherwise, applyUpdates() will fire with a dangling cluster reference.
808
1550
  updates_map_.erase(cluster_name);
809

            
810
1550
  active_clusters_[cluster_name] = std::move(warming_it->second);
811
1550
  warming_clusters_.erase(warming_it);
812
1550
}
813

            
814
1774
bool ClusterManagerImpl::removeCluster(const std::string& cluster_name, const bool remove_ignored) {
815
1774
  bool removed = false;
816
1774
  auto existing_active_cluster = active_clusters_.find(cluster_name);
817
1774
  if (existing_active_cluster != active_clusters_.end() &&
818
1774
      existing_active_cluster->second->added_via_api_ &&
819
1774
      (!existing_active_cluster->second->avoid_cds_removal_ || remove_ignored)) {
820
587
    removed = true;
821
587
    init_helper_.removeCluster(*existing_active_cluster->second);
822
587
    active_clusters_.erase(existing_active_cluster);
823

            
824
587
    ENVOY_LOG(debug, "removing cluster {}", cluster_name);
825
1158
    tls_.runOnAllThreads([cluster_name](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
826
1158
      ASSERT(cluster_manager->thread_local_clusters_.contains(cluster_name) ||
827
1158
             cluster_manager->thread_local_deferred_clusters_.contains(cluster_name));
828
1158
      ENVOY_LOG(debug, "removing TLS cluster {}", cluster_name);
829
1158
      for (auto cb_it = cluster_manager->update_callbacks_.begin();
830
2349
           cb_it != cluster_manager->update_callbacks_.end();) {
831
        // The current callback may remove itself from the list, so a handle for
832
        // the next item is fetched before calling the callback.
833
1191
        auto curr_cb_it = cb_it;
834
1191
        ++cb_it;
835
1191
        (*curr_cb_it)->onClusterRemoval(cluster_name);
836
1191
      }
837
1158
      cluster_manager->thread_local_clusters_.erase(cluster_name);
838
1158
      cluster_manager->thread_local_deferred_clusters_.erase(cluster_name);
839
1158
      cluster_manager->local_stats_.clusters_inflated_.set(
840
1158
          cluster_manager->thread_local_clusters_.size());
841
1158
    });
842
587
    cluster_initialization_map_.erase(cluster_name);
843
587
  }
844

            
845
1774
  auto existing_warming_cluster = warming_clusters_.find(cluster_name);
846
1774
  if (existing_warming_cluster != warming_clusters_.end() &&
847
1774
      existing_warming_cluster->second->added_via_api_ &&
848
1774
      (!existing_warming_cluster->second->avoid_cds_removal_ || remove_ignored)) {
849
12
    removed = true;
850
12
    init_helper_.removeCluster(*existing_warming_cluster->second);
851
12
    warming_clusters_.erase(existing_warming_cluster);
852
12
    ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
853
12
  }
854

            
855
1774
  if (removed) {
856
599
    cm_stats_.cluster_removed_.inc();
857
599
    updateClusterCounts();
858
    // Cancel any pending merged updates.
859
599
    updates_map_.erase(cluster_name);
860
599
  }
861

            
862
1774
  return removed;
863
1774
}
864

            
865
absl::StatusOr<ClusterManagerImpl::ClusterDataPtr>
866
ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
867
                                const uint64_t cluster_hash, const std::string& version_info,
868
                                bool added_via_api, const bool required_for_ads,
869
17556
                                ClusterMap& cluster_map, const bool avoid_cds_removal) {
870
17556
  absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
871
17556
      new_cluster_pair_or_error =
872
17556
          factory_.clusterFromProto(cluster, outlier_event_logger_, added_via_api);
873

            
874
17556
  if (!new_cluster_pair_or_error.ok()) {
875
    return absl::InvalidArgumentError(std::string(new_cluster_pair_or_error.status().message()));
876
  }
877
17556
  auto& new_cluster = new_cluster_pair_or_error->first;
878
17556
  auto& lb = new_cluster_pair_or_error->second;
879
17556
  Cluster& cluster_reference = *new_cluster;
880

            
881
17556
  const auto cluster_info = cluster_reference.info();
882

            
883
17556
  if (!added_via_api) {
884
15929
    if (cluster_map.find(cluster_info->name()) != cluster_map.end()) {
885
2
      return absl::InvalidArgumentError(
886
2
          fmt::format("cluster manager: duplicate cluster '{}'", cluster_info->name()));
887
2
    }
888
15929
  }
889

            
890
  // Check if the cluster provided load balancing policy is used. We need handle it as special
891
  // case.
892
17554
  TypedLoadBalancerFactory& typed_lb_factory = cluster_info->loadBalancerFactory();
893
17554
  const bool cluster_provided_lb =
894
17554
      typed_lb_factory.name() == "envoy.load_balancing_policies.cluster_provided";
895

            
896
17554
  if (cluster_provided_lb && lb == nullptr) {
897
1
    return absl::InvalidArgumentError(
898
1
        fmt::format("cluster manager: cluster provided LB specified but cluster "
899
1
                    "'{}' did not provide one. Check cluster documentation.",
900
1
                    cluster_info->name()));
901
1
  }
902
17553
  if (!cluster_provided_lb && lb != nullptr) {
903
1
    return absl::InvalidArgumentError(
904
1
        fmt::format("cluster manager: cluster provided LB not specified but cluster "
905
1
                    "'{}' provided one. Check cluster documentation.",
906
1
                    cluster_info->name()));
907
1
  }
908

            
909
17552
  if (new_cluster->healthChecker() != nullptr) {
910
105
    new_cluster->healthChecker()->addHostCheckCompleteCb(
911
173
        [this](HostSharedPtr host, HealthTransition changed_state, HealthState) {
912
152
          if (changed_state == HealthTransition::Changed &&
913
152
              host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
914
39
            postThreadLocalHealthFailure(host);
915
39
          }
916
152
        });
917
105
  }
918

            
919
17552
  if (new_cluster->outlierDetector() != nullptr) {
920
76
    new_cluster->outlierDetector()->addChangedStateCb([this](HostSharedPtr host) {
921
44
      if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
922
28
        ENVOY_LOG_EVENT(debug, "outlier_detection_ejection",
923
28
                        "host {} in cluster {} was ejected by the outlier detector",
924
28
                        host->address()->asStringView(), host->cluster().name());
925
28
        postThreadLocalHealthFailure(host);
926
28
      }
927
44
    });
928
62
  }
929
17552
  ClusterDataPtr result;
930
17552
  auto cluster_entry_it = cluster_map.find(cluster_info->name());
931
17552
  if (cluster_entry_it != cluster_map.end()) {
932
18
    result = std::exchange(cluster_entry_it->second,
933
18
                           std::make_unique<ClusterData>(
934
18
                               cluster, cluster_hash, version_info, added_via_api, required_for_ads,
935
18
                               std::move(new_cluster), time_source_, avoid_cds_removal));
936
17534
  } else {
937
17534
    bool inserted = false;
938
17534
    std::tie(cluster_entry_it, inserted) = cluster_map.emplace(
939
17534
        cluster_info->name(),
940
17534
        std::make_unique<ClusterData>(cluster, cluster_hash, version_info, added_via_api,
941
17534
                                      required_for_ads, std::move(new_cluster), time_source_,
942
17534
                                      avoid_cds_removal));
943
17534
    ASSERT(inserted);
944
17534
  }
945

            
946
17552
  if (cluster_provided_lb) {
947
171
    cluster_entry_it->second->thread_aware_lb_ = std::move(lb);
948
17469
  } else {
949
17381
    cluster_entry_it->second->thread_aware_lb_ =
950
17381
        typed_lb_factory.create(cluster_info->loadBalancerConfig(), *cluster_info,
951
17381
                                cluster_reference.prioritySet(), runtime_, random_, time_source_);
952
17381
  }
953

            
954
17552
  updateClusterCounts();
955
17552
  return result;
956
17553
}
957

            
958
41376
void ClusterManagerImpl::updateClusterCounts() {
959
  // This if/else block implements a control flow mechanism that can be used by an ADS
960
  // implementation to properly sequence CDS and RDS updates. It is not enforcing on ADS. ADS can
961
  // use it to detect when a previously sent cluster becomes warm before sending routes that depend
962
  // on it. This can improve incidence of HTTP 503 responses from Envoy when a route is used before
963
  // it's supporting cluster is ready.
964
  //
965
  // We achieve that by leaving CDS in the paused state as long as there is at least
966
  // one cluster in the warming state. This prevents CDS ACK from being sent to ADS.
967
  // Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
968
  // signal to ADS to proceed with RDS updates.
969
  // If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
970
41376
  const bool all_clusters_initialized =
971
41376
      init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
972
41376
  if (all_clusters_initialized && xds_manager_.adsMux()) {
973
2431
    const auto type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>();
974
2431
    if (resume_cds_ == nullptr && !warming_clusters_.empty()) {
975
904
      resume_cds_ = xds_manager_.pause(type_url);
976
1535
    } else if (warming_clusters_.empty()) {
977
1429
      resume_cds_.reset();
978
1429
    }
979
2431
  }
980
41376
  cm_stats_.active_clusters_.set(active_clusters_.size());
981
41376
  cm_stats_.warming_clusters_.set(warming_clusters_.size());
982
41376
}
983

            
984
161380
ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view cluster) {
985
161380
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
986

            
987
161380
  auto entry = cluster_manager.thread_local_clusters_.find(cluster);
988
161380
  if (entry != cluster_manager.thread_local_clusters_.end()) {
989
160332
    return entry->second.get();
990
160364
  } else {
991
1048
    return cluster_manager.initializeClusterInlineIfExists(cluster);
992
1048
  }
993
161380
}
994

            
995
void ClusterManagerImpl::maybePreconnect(
996
    ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
997
    const ClusterConnectivityState& state,
998
49533
    std::function<ConnectionPool::Instance*()> pick_preconnect_pool) {
999
49533
  auto peekahead_ratio = cluster_entry.info()->peekaheadRatio();
49533
  if (peekahead_ratio <= 1.0) {
49450
    return;
49450
  }
  // 3 here is arbitrary. Just as in ConnPoolImplBase::tryCreateNewConnections
  // we want to limit the work which can be done on any given preconnect attempt.
186
  for (int i = 0; i < 3; ++i) {
    // See if adding this one new connection
    // would put the cluster over desired capacity. If so, stop preconnecting.
    //
    // We anticipate the incoming stream here, because maybePreconnect is called
    // before a new stream is established.
183
    if (!ConnectionPool::ConnPoolImplBase::shouldConnect(
183
            state.pending_streams_, state.active_streams_,
183
            state.connecting_and_connected_stream_capacity_, peekahead_ratio, true)) {
53
      return;
53
    }
130
    ConnectionPool::Instance* preconnect_pool = pick_preconnect_pool();
130
    if (!preconnect_pool || !preconnect_pool->maybePreconnect(peekahead_ratio)) {
      // Given that the next preconnect pick may be entirely different, we could
      // opt to try again even if the first preconnect fails. Err on the side of
      // caution and wait for the next attempt.
27
      return;
27
    }
130
  }
83
}
absl::optional<HttpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
    HostConstSharedPtr host, ResourcePriority priority, absl::optional<Http::Protocol> protocol,
47410
    LoadBalancerContext* context) {
  // Select a host and create a connection pool for it if it does not already exist.
47410
  auto pool = httpConnPoolImpl(host, priority, protocol, context);
47410
  if (pool == nullptr) {
4
    return absl::nullopt;
4
  }
47406
  HttpPoolData data(
47406
      [this, priority, protocol, context]() -> void {
        // Now that a new stream is being established, attempt to preconnect.
47333
        maybePreconnect(
47338
            *this, parent_.cluster_manager_state_, [this, &priority, &protocol, &context]() {
93
              HostConstSharedPtr peek_host = peekAnotherHost(context);
93
              return peek_host ? httpConnPoolImpl(peek_host, priority, protocol, context) : nullptr;
93
            });
47333
      },
47406
      pool);
47406
  return data;
47410
}
absl::optional<TcpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
2277
    HostConstSharedPtr host, ResourcePriority priority, LoadBalancerContext* context) {
2277
  if (!host) {
1
    return absl::nullopt;
1
  }
  // Select a host and create a connection pool for it if it does not already exist.
2276
  auto pool = tcpConnPoolImpl(host, priority, context);
2276
  if (pool == nullptr) {
    return absl::nullopt;
  }
2276
  TcpPoolData data(
2276
      [this, priority, context]() -> void {
2201
        maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() {
37
          HostConstSharedPtr peek_host = peekAnotherHost(context);
37
          return peek_host ? tcpConnPoolImpl(peek_host, priority, context) : nullptr;
37
        });
2200
      },
2276
      pool);
2276
  return data;
2276
}
absl::optional<TcpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
386
    ResourcePriority priority, LoadBalancerContext* context) {
386
  HostConstSharedPtr host = LoadBalancer::onlyAllowSynchronousHostSelection(chooseHost(context));
386
  if (!host) {
3
    return absl::nullopt;
3
  }
383
  return tcpConnPool(host, priority, context);
386
}
void ClusterManagerImpl::drainConnections(const std::string& cluster,
35
                                          DrainConnectionsHostPredicate predicate) {
35
  ENVOY_LOG_EVENT(debug, "drain_connections_call", "drainConnections called for cluster {}",
35
                  cluster);
68
  tls_.runOnAllThreads([cluster, predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
68
    auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster);
68
    if (cluster_entry != cluster_manager->thread_local_clusters_.end()) {
68
      cluster_entry->second->drainConnPools(
68
          predicate, ConnectionPool::DrainBehavior::DrainExistingConnections);
68
    }
68
  });
35
}
void ClusterManagerImpl::drainConnections(DrainConnectionsHostPredicate predicate,
11
                                          ConnectionPool::DrainBehavior drain_behavior) {
11
  ENVOY_LOG_EVENT(debug, "drain_connections_call_for_all_clusters",
11
                  "drainConnections called for all clusters");
11
  tls_.runOnAllThreads(
22
      [predicate, drain_behavior](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
44
        for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) {
44
          cluster_entry.second->drainConnPools(predicate, drain_behavior);
44
        }
22
      });
11
}
380
absl::Status ClusterManagerImpl::checkActiveStaticCluster(const std::string& cluster) {
380
  const auto& it = active_clusters_.find(cluster);
380
  if (it == active_clusters_.end()) {
2
    return absl::InvalidArgumentError(fmt::format("Unknown gRPC client cluster '{}'", cluster));
2
  }
378
  if (it->second->added_via_api_) {
1
    return absl::InvalidArgumentError(
1
        fmt::format("gRPC client cluster '{}' is not static", cluster));
1
  }
377
  return absl::OkStatus();
378
}
void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster,
74
                                                    const HostVector& hosts_removed) {
  // Drain the connection pools for the given hosts. For deferred clusters have
  // been created.
74
  tls_.runOnAllThreads([name = cluster.info()->name(),
128
                        hosts_removed](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
128
    cluster_manager->removeHosts(name, hosts_removed);
128
  });
74
}
bool ClusterManagerImpl::deferralIsSupportedForCluster(
17736
    const ClusterInfoConstSharedPtr& info) const {
17736
  if (!deferred_cluster_creation_) {
17203
    return false;
17203
  }
  // Certain cluster types are unsupported for deferred initialization.
  // We need to check both the `clusterType()` (preferred) falling back to
  // the `type()` due to how custom clusters were added leveraging an any
  // config.
533
  if (auto custom_cluster_type = info->clusterType(); custom_cluster_type.has_value()) {
    // TODO(kbaichoo): make it configurable what custom types are supported?
18
    static const std::array<std::string, 4> supported_well_known_cluster_types = {
18
        "envoy.clusters.aggregate", "envoy.cluster.eds", "envoy.clusters.redis",
18
        "envoy.cluster.static"};
18
    if (std::find(supported_well_known_cluster_types.begin(),
18
                  supported_well_known_cluster_types.end(),
18
                  custom_cluster_type->name()) == supported_well_known_cluster_types.end()) {
      return false;
    }
515
  } else {
    // Check DiscoveryType instead.
515
    static constexpr std::array<envoy::config::cluster::v3::Cluster::DiscoveryType, 2>
515
        supported_cluster_types = {envoy::config::cluster::v3::Cluster::EDS,
515
                                   envoy::config::cluster::v3::Cluster::STATIC};
515
    if (std::find(supported_cluster_types.begin(), supported_cluster_types.end(), info->type()) ==
515
        supported_cluster_types.end()) {
13
      return false;
13
    }
515
  }
520
  return true;
533
}
void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
17736
                                                      ThreadLocalClusterUpdateParams&& params) {
17736
  bool add_or_update_cluster = false;
17736
  if (!cm_cluster.addedOrUpdated()) {
17300
    add_or_update_cluster = true;
17300
    cm_cluster.setAddedOrUpdated();
17300
  }
17736
  LoadBalancerFactorySharedPtr load_balancer_factory;
17736
  if (add_or_update_cluster) {
17300
    load_balancer_factory = cm_cluster.loadBalancerFactory();
17300
  }
17767
  for (auto& per_priority : params.per_priority_update_params_) {
17325
    const auto& host_set =
17325
        cm_cluster.cluster().prioritySet().hostSetsPerPriority()[per_priority.priority_];
17325
    per_priority.update_hosts_params_ = HostSetImpl::updateHostsParams(*host_set);
17325
    per_priority.locality_weights_ = host_set->localityWeights();
17325
    per_priority.weighted_priority_health_ = host_set->weightedPriorityHealth();
17325
    per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
17325
  }
17736
  HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap();
17736
  pending_cluster_creations_.erase(cm_cluster.cluster().info()->name());
17736
  const UnitFloat drop_overload = cm_cluster.cluster().dropOverload();
17736
  const std::string drop_category = cm_cluster.cluster().dropCategory();
  // Populate the cluster initialization object based on this update.
17736
  ClusterInitializationObjectConstSharedPtr cluster_initialization_object =
17736
      addOrUpdateClusterInitializationObjectIfSupported(params, cm_cluster.cluster().info(),
17736
                                                        load_balancer_factory, host_map,
17736
                                                        drop_overload, drop_category);
17736
  tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
17736
                        add_or_update_cluster, load_balancer_factory, map = std::move(host_map),
17736
                        cluster_initialization_object = std::move(cluster_initialization_object),
17736
                        drop_overload, drop_category = std::move(drop_category)](
34916
                           OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
34916
    ASSERT(cluster_manager.has_value(),
34916
           "Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");
    // Cluster Manager here provided by the particular thread, it will provide
    // this allowing to make the relevant change.
34916
    if (const bool defer_unused_clusters =
34916
            cluster_initialization_object != nullptr &&
34916
            !cluster_manager->thread_local_clusters_.contains(info->name()) &&
34916
            !Envoy::Thread::MainThread::isMainThread();
34916
        defer_unused_clusters) {
      // Save the cluster initialization object.
490
      ENVOY_LOG(debug, "Deferring add or update for TLS cluster {}", info->name());
490
      cluster_manager->thread_local_deferred_clusters_[info->name()] =
490
          cluster_initialization_object;
      // Invoke similar logic of onClusterAddOrUpdate.
490
      ThreadLocalClusterCommand command = [&cluster_manager,
490
                                           cluster_name = info->name()]() -> ThreadLocalCluster& {
        // If we have multiple callbacks only the first one needs to use the
        // command to initialize the cluster.
8
        auto existing_cluster_entry = cluster_manager->thread_local_clusters_.find(cluster_name);
8
        if (existing_cluster_entry != cluster_manager->thread_local_clusters_.end()) {
          return *existing_cluster_entry->second;
        }
8
        auto* cluster_entry = cluster_manager->initializeClusterInlineIfExists(cluster_name);
8
        ASSERT(cluster_entry != nullptr, "Deferred clusters initiailization should not fail.");
8
        return *cluster_entry;
8
      };
490
      for (auto cb_it = cluster_manager->update_callbacks_.begin();
997
           cb_it != cluster_manager->update_callbacks_.end();) {
        // The current callback may remove itself from the list, so a handle for
        // the next item is fetched before calling the callback.
507
        auto curr_cb_it = cb_it;
507
        ++cb_it;
507
        (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
507
      }
34465
    } else {
      // Broadcast
34426
      ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
34426
      if (add_or_update_cluster) {
33670
        if (cluster_manager->thread_local_clusters_.contains(info->name())) {
114
          ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
33558
        } else {
33556
          ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
33556
        }
33670
        new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
33670
                                                                      load_balancer_factory);
33670
        cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
33670
        cluster_manager->local_stats_.clusters_inflated_.set(
33670
            cluster_manager->thread_local_clusters_.size());
33670
      }
34426
      if (cluster_manager->thread_local_clusters_[info->name()]) {
34424
        cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);
34424
        cluster_manager->thread_local_clusters_[info->name()]->setDropCategory(drop_category);
34424
      }
34480
      for (const auto& per_priority : params.per_priority_update_params_) {
33731
        cluster_manager->updateClusterMembership(
33731
            info->name(), per_priority.priority_, per_priority.update_hosts_params_,
33731
            per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,
33731
            per_priority.weighted_priority_health_, per_priority.overprovisioning_factor_, map);
33731
      }
34426
      if (new_cluster != nullptr) {
33669
        ThreadLocalClusterCommand command = [&new_cluster]() -> ThreadLocalCluster& {
50
          return *new_cluster;
50
        };
33669
        for (auto cb_it = cluster_manager->update_callbacks_.begin();
67512
             cb_it != cluster_manager->update_callbacks_.end();) {
          // The current callback may remove itself from the list, so a handle for
          // the next item is fetched before calling the callback.
33843
          auto curr_cb_it = cb_it;
33843
          ++cb_it;
33843
          (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
33843
        }
33669
      }
34426
    }
34916
  });
  // By this time, the main thread has received the cluster initialization update, so we can start
  // the ADS mux if the ADS mux is dependent on this cluster's initialization.
17736
  if (cm_cluster.requiredForAds() && !ads_mux_initialized_) {
204
    xds_manager_.adsMux()->start();
204
    ads_mux_initialized_ = true;
204
  }
17736
}
ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr
ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
    const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
    LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
17736
    UnitFloat drop_overload, absl::string_view drop_category) {
17736
  if (!deferralIsSupportedForCluster(cluster_info)) {
17216
    return nullptr;
17216
  }
520
  const std::string& cluster_name = cluster_info->name();
520
  auto entry = cluster_initialization_map_.find(cluster_name);
  // TODO(kbaichoo): if EDS can be configured via cluster_type() then modify the
  // merging logic below.
  //
  // This method may be called multiple times to create multiple ClusterInitializationObject
  // instances for the same cluster. And before the thread local clusters are actually initialized,
  // the new instances will override the old instances in the work threads. But part of data is be
  // created only once, such as load balancer factory. So we should always to merge the new instance
  // with the old one to keep the latest instance have all necessary data.
  //
  // More specifically, this will happen in the following scenarios for now:
  // 1. EDS clusters: the ClusterLoadAssignment of EDS cluster may be updated multiples before
  //   the thread local cluster is initialized.
  // 2. Clusters in the unit tests: the cluster in the unit test may be updated multiples before
  //   the thread local cluster is initialized by calling 'updateHosts' manually.
520
  const bool should_merge_with_prior_cluster =
520
      entry != cluster_initialization_map_.end() && entry->second->cluster_info_ == cluster_info;
520
  if (should_merge_with_prior_cluster) {
    // We need to copy from an existing Cluster Initialization Object. In
    // particular, only update the params with changed priority.
63
    auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
63
        entry->second->per_priority_state_, params, std::move(cluster_info),
63
        load_balancer_factory == nullptr ? entry->second->load_balancer_factory_
63
                                         : load_balancer_factory,
63
        map, drop_overload, drop_category);
63
    cluster_initialization_map_[cluster_name] = new_initialization_object;
63
    return new_initialization_object;
469
  } else {
    // We need to create a fresh Cluster Initialization Object.
457
    auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
457
        params, std::move(cluster_info), load_balancer_factory, map, drop_overload, drop_category);
457
    cluster_initialization_map_[cluster_name] = new_initialization_object;
457
    return new_initialization_object;
457
  }
520
}
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExists(
1056
    absl::string_view cluster) {
1056
  auto entry = thread_local_deferred_clusters_.find(cluster);
1056
  if (entry == thread_local_deferred_clusters_.end()) {
    // Unknown cluster.
926
    return nullptr;
926
  }
  // Create the cluster inline.
130
  const ClusterInitializationObjectConstSharedPtr& initialization_object = entry->second;
130
  ENVOY_LOG(debug, "initializing TLS cluster {} inline", cluster);
130
  auto cluster_entry = std::make_unique<ClusterEntry>(
130
      *this, initialization_object->cluster_info_, initialization_object->load_balancer_factory_);
130
  ClusterEntry* cluster_entry_ptr = cluster_entry.get();
130
  thread_local_clusters_[cluster] = std::move(cluster_entry);
130
  local_stats_.clusters_inflated_.set(thread_local_clusters_.size());
134
  for (const auto& [_, per_priority] : initialization_object->per_priority_state_) {
120
    updateClusterMembership(initialization_object->cluster_info_->name(), per_priority.priority_,
120
                            per_priority.update_hosts_params_, per_priority.locality_weights_,
120
                            per_priority.hosts_added_, per_priority.hosts_removed_,
120
                            per_priority.weighted_priority_health_,
120
                            per_priority.overprovisioning_factor_,
120
                            initialization_object->cross_priority_host_map_);
120
  }
130
  thread_local_clusters_[cluster]->setDropOverload(initialization_object->drop_overload_);
130
  thread_local_clusters_[cluster]->setDropCategory(initialization_object->drop_category_);
  // Remove the CIO as we've initialized the cluster.
130
  thread_local_deferred_clusters_.erase(entry);
130
  return cluster_entry_ptr;
1056
}
ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
    const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
    LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
    UnitFloat drop_overload, absl::string_view drop_category)
457
    : cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory),
457
      cross_priority_host_map_(map), drop_overload_(drop_overload), drop_category_(drop_category) {
  // Copy the update since the map is empty.
462
  for (const auto& update : params.per_priority_update_params_) {
415
    per_priority_state_.emplace(update.priority_, update);
415
  }
457
}
ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
    const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>& per_priority_state,
    const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
    LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
    UnitFloat drop_overload, absl::string_view drop_category)
63
    : per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)),
63
      load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map),
63
      drop_overload_(drop_overload), drop_category_(drop_category) {
  // Because EDS Clusters receive the entire ClusterLoadAssignment but only
  // provides the delta we must process the hosts_added and hosts_removed and
  // not simply overwrite with hosts added.
63
  for (const auto& update : update_params.per_priority_update_params_) {
63
    auto it = per_priority_state_.find(update.priority_);
63
    if (it != per_priority_state_.end()) {
35
      auto& priority_state = it->second;
      // Merge the two per_priorities.
35
      priority_state.update_hosts_params_ = update.update_hosts_params_;
35
      priority_state.locality_weights_ = update.locality_weights_;
35
      priority_state.weighted_priority_health_ = update.weighted_priority_health_;
35
      priority_state.overprovisioning_factor_ = update.overprovisioning_factor_;
      // Merge the hosts vectors to just have hosts added.
      // Assumes that the old host_added_ is exclusive to new hosts_added_ and
      // new hosts_removed_ only refers to the old hosts_added_.
35
      ASSERT(priority_state.hosts_removed_.empty(),
35
             "Cluster Initialization Object should apply hosts "
35
             "removed updates to hosts_added vector!");
      // TODO(kbaichoo): replace with a more efficient algorithm. For example
      // if the EDS cluster exposed the LoadAssignment we could just merge by
      // overwriting hosts_added.
35
      if (!update.hosts_removed_.empty()) {
        // Remove all hosts to be removed from the old host_added.
15
        auto& host_added = priority_state.hosts_added_;
15
        auto removed_section = std::remove_if(
15
            host_added.begin(), host_added.end(),
33
            [hosts_removed = std::cref(update.hosts_removed_)](const HostSharedPtr& ptr) {
33
              return std::find(hosts_removed.get().begin(), hosts_removed.get().end(), ptr) !=
33
                     hosts_removed.get().end();
33
            });
15
        priority_state.hosts_added_.erase(removed_section, priority_state.hosts_added_.end());
15
      }
      // Add updated host_added.
35
      priority_state.hosts_added_.reserve(priority_state.hosts_added_.size() +
35
                                          update.hosts_added_.size());
35
      std::copy(update.hosts_added_.begin(), update.hosts_added_.end(),
35
                std::back_inserter(priority_state.hosts_added_));
43
    } else {
      // Just copy the new priority.
28
      per_priority_state_.emplace(update.priority_, update);
28
    }
63
  }
63
}
67
void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) {
118
  tls_.runOnAllThreads([host](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
118
    cluster_manager->onHostHealthFailure(host);
118
  });
67
}
Host::CreateConnectionData ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConn(
62
    LoadBalancerContext* context) {
62
  HostConstSharedPtr logical_host =
62
      LoadBalancer::onlyAllowSynchronousHostSelection(chooseHost(context));
62
  if (logical_host) {
59
    auto conn_info = logical_host->createConnection(
59
        parent_.thread_local_dispatcher_, nullptr,
59
        context == nullptr ? nullptr : context->upstreamTransportSocketOptions());
59
    if ((cluster_info_->features() &
59
         ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) &&
59
        conn_info.connection_ != nullptr) {
8
      auto conn_map_iter = parent_.host_tcp_conn_map_.find(logical_host);
8
      if (conn_map_iter == parent_.host_tcp_conn_map_.end()) {
6
        conn_map_iter =
6
            parent_.host_tcp_conn_map_.try_emplace(logical_host, logical_host->acquireHandle())
6
                .first;
6
      }
8
      auto& conn_map = conn_map_iter->second;
8
      conn_map.connections_.emplace(
8
          conn_info.connection_.get(),
8
          std::make_unique<ThreadLocalClusterManagerImpl::TcpConnContainer>(
8
              parent_, logical_host, *conn_info.connection_));
8
    }
59
    return conn_info;
59
  }
3
  return {nullptr, nullptr};
62
}
Http::AsyncClient&
3101
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient() {
3101
  if (lazy_http_async_client_ == nullptr) {
2042
    lazy_http_async_client_ = std::make_unique<Http::AsyncClientImpl>(
2042
        cluster_info_, parent_.parent_.stats_, parent_.thread_local_dispatcher_, parent_.parent_,
2042
        parent_.parent_.context_,
2042
        Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent_.parent_)},
2042
        parent_.parent_.http_context_, parent_.parent_.router_context_);
2042
  }
3101
  return *lazy_http_async_client_;
3101
}
Tcp::AsyncTcpClientPtr
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpAsyncClient(
22
    LoadBalancerContext* context, Tcp::AsyncTcpClientOptionsConstSharedPtr options) {
22
  return std::make_unique<Tcp::AsyncTcpClientImpl>(parent_.thread_local_dispatcher_, *this, context,
22
                                                   options->enable_half_close);
22
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHosts(
    const std::string& name, uint32_t priority,
    PrioritySet::UpdateHostsParams&& update_hosts_params,
    LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
    const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
    absl::optional<uint32_t> overprovisioning_factor,
33849
    HostMapConstSharedPtr cross_priority_host_map) {
33849
  ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name,
33849
            hosts_added.size(), hosts_removed.size());
33849
  priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights),
33849
                            hosts_added, hosts_removed, weighted_priority_health,
33849
                            overprovisioning_factor, std::move(cross_priority_host_map));
  // If an LB is thread aware, create a new worker local LB on membership changes.
33854
  if (lb_factory_ != nullptr && lb_factory_->recreateOnHostChange()) {
936
    ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
936
    lb_ = lb_factory_->create({priority_set_, parent_.local_priority_set_});
936
  }
33849
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
34141
    const HostVector& hosts_removed) {
34979
  for (const auto& host : hosts_removed) {
34607
    parent_.drainOrCloseConnPools(host, ConnectionPool::DrainBehavior::DrainAndDelete);
34607
  }
34141
}
33816
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools() {
34020
  for (auto& host_set : priority_set_.hostSetsPerPriority()) {
34020
    drainConnPools(host_set->hosts());
34020
  }
33816
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
112
    DrainConnectionsHostPredicate predicate, ConnectionPool::DrainBehavior behavior) {
112
  for (auto& host_set : priority_set_.hostSetsPerPriority()) {
114
    for (const auto& host : host_set->hosts()) {
114
      if (predicate != nullptr && !predicate(*host)) {
24
        continue;
24
      }
90
      parent_.drainOrCloseConnPools(host, behavior);
90
    }
112
  }
112
}
ClusterUpdateCallbacksHandlePtr
407
ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) {
407
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
407
  return cluster_manager.addClusterUpdateCallbacks(cb);
407
}
absl::StatusOr<OdCdsApiHandlePtr>
ClusterManagerImpl::allocateOdCdsApi(OdCdsCreationFunction creation_function,
                                     const envoy::config::core::v3::ConfigSource& odcds_config,
                                     OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
174
                                     ProtobufMessage::ValidationVisitor& validation_visitor) {
  // Generate a unique key based on config and locator. This enables reuse of subscriptions
  // with the same configuration. Note that timeout is intentionally not part of the hash,
  // so different timeout values will share the same subscription.
  // Subscriptions persist for the lifetime of ClusterManagerImpl and are cleaned up when
  // it is destroyed.
174
  uint64_t config_hash = MessageUtil::hash(odcds_config);
174
  if (odcds_resources_locator.has_value()) {
1
    config_hash = absl::HashOf(config_hash, MessageUtil::hash(*odcds_resources_locator));
1
  }
174
  auto it = odcds_subscriptions_.find(config_hash);
174
  if (it != odcds_subscriptions_.end()) {
10
    return OdCdsApiHandleImpl::create(*this, config_hash);
10
  }
164
  auto odcds_or_error =
164
      creation_function(odcds_config, odcds_resources_locator, xds_manager_, *this, *this,
164
                        *stats_.rootScope(), validation_visitor, context_);
164
  RETURN_IF_NOT_OK_REF(odcds_or_error.status());
164
  odcds_subscriptions_.emplace(config_hash, std::move(*odcds_or_error));
164
  return OdCdsApiHandleImpl::create(*this, config_hash);
164
}
ClusterDiscoveryCallbackHandlePtr
ClusterManagerImpl::requestOnDemandClusterDiscovery(uint64_t config_source_key, std::string name,
                                                    ClusterDiscoveryCallbackPtr callback,
268
                                                    std::chrono::milliseconds timeout) {
268
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
268
  auto [handle, discovery_in_progress, invoker] =
268
      cluster_manager.cdm_.addCallback(name, std::move(callback));
  // This check will catch requests for discoveries from this thread only. If other thread
  // requested the same discovery, we will detect it in the main thread later.
268
  if (discovery_in_progress) {
34
    ENVOY_LOG(debug,
34
              "cm odcds: on-demand discovery for cluster {} is already in progress, something else "
34
              "in thread {} has already requested it",
34
              name, cluster_manager.thread_local_dispatcher_.name());
    // This worker thread has already requested a discovery of a cluster with this name, so
    // nothing more left to do here.
    //
    // We can't "just" return handle here, because handle is a part of the structured binding done
    // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like
    // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here
    // - it needs to be moved.
34
    return std::move(handle);
34
  }
234
  ENVOY_LOG(
234
      debug,
234
      "cm odcds: forwarding the on-demand discovery request for cluster {} to the main thread",
234
      name);
  // This seems to be the first request for discovery of this cluster in this worker thread. Rest
  // of the process may only happen in the main thread.
234
  Event::Dispatcher& worker_dispatcher = cluster_manager.thread_local_dispatcher_;
234
  dispatcher_.post([this, config_source_key, timeout, name = std::move(name),
234
                    invoker = std::move(invoker), &worker_dispatcher] {
234
    OdCdsApiSharedPtr odcds = odcds_subscriptions_.at(config_source_key);
    // Check for the cluster here too. It might have been added between the time when this closure
    // was posted and when it is being executed.
234
    if (getThreadLocalCluster(name) != nullptr) {
1
      ENVOY_LOG(
1
          debug,
1
          "cm odcds: the requested cluster {} is already known, posting the callback back to {}",
1
          name, worker_dispatcher.name());
1
      worker_dispatcher.post([invoker = std::move(invoker)] {
1
        invoker.invokeCallback(ClusterDiscoveryStatus::Available);
1
      });
1
      return;
1
    }
233
    if (pending_cluster_creations_.contains(name)) {
1
      ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name);
      // We already began the discovery process for this cluster, nothing to do. If we got here,
      // it means that it was other worker thread that requested the discovery.
1
      return;
1
    }
    // Start the discovery. If the cluster gets discovered, cluster manager will warm it up and
    // invoke the cluster lifecycle callbacks, that will in turn invoke our callback.
232
    odcds->updateOnDemand(name);
    // Setup the discovery timeout timer to avoid keeping callbacks indefinitely.
232
    auto timer = dispatcher_.createTimer([this, name] { notifyExpiredDiscovery(name); });
232
    timer->enableTimer(timeout);
    // Keep odcds handle alive for the duration of the discovery process.
232
    pending_cluster_creations_.insert(
232
        {std::move(name), ClusterCreation{std::move(odcds), std::move(timer)}});
232
  });
  // We can't "just" return handle here, because handle is a part of the structured binding done
  // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like
  // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here -
  // it needs to be moved.
234
  return std::move(handle);
268
}
18
void ClusterManagerImpl::notifyMissingCluster(absl::string_view name) {
18
  ENVOY_LOG(info, "cm odcds: cluster {} not found during on-demand discovery", name);
18
  notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Missing);
18
}
19
void ClusterManagerImpl::notifyExpiredDiscovery(absl::string_view name) {
19
  ENVOY_LOG(info, "cm odcds: on-demand discovery for cluster {} timed out", name);
19
  notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Timeout);
19
}
void ClusterManagerImpl::notifyClusterDiscoveryStatus(absl::string_view name,
37
                                                      ClusterDiscoveryStatus status) {
37
  auto map_node_handle = pending_cluster_creations_.extract(name);
37
  if (map_node_handle.empty()) {
    // Not a cluster we are interested in. This may happen when ODCDS
    // receives some cluster name in removed resources field and
    // notifies the cluster manager about it.
1
    return;
1
  }
  // Let all the worker threads know that the discovery timed out.
36
  tls_.runOnAllThreads(
70
      [name = std::string(name), status](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
70
        ENVOY_LOG(
70
            trace,
70
            "cm cdm: starting processing cluster name {} (status {}) from the expired timer in {}",
70
            name, enumToInt(status), cluster_manager->thread_local_dispatcher_.name());
70
        cluster_manager->cdm_.processClusterName(name, status);
70
      });
36
}
527
Config::EdsResourcesCacheOptRef ClusterManagerImpl::edsResourcesCache() {
  // EDS caching is only supported for ADS.
527
  if (xds_manager_.adsMux()) {
492
    return xds_manager_.adsMux()->edsResourcesCache();
492
  }
35
  return {};
527
}
void ClusterManagerImpl::createNetworkObserverRegistries(
1
    Quic::EnvoyQuicNetworkObserverRegistryFactory& factory) {
1
#ifdef ENVOY_ENABLE_QUIC
1
  tls_.runOnAllThreads([&factory](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
1
    ENVOY_LOG(trace, "cm: create network observer registry in {}",
1
              cluster_manager->thread_local_dispatcher_.name());
1
    cluster_manager->createThreadLocalNetworkObserverRegistry(factory);
1
  });
#else
  (void)factory;
#endif
1
}
ClusterDiscoveryManager
1
ClusterManagerImpl::createAndSwapClusterDiscoveryManager(std::string thread_name) {
1
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
1
  ClusterDiscoveryManager cdm(std::move(thread_name), cluster_manager);
1
  cluster_manager.cdm_.swap(cdm);
1
  return cdm;
1
}
ProtobufTypes::MessagePtr
91
ClusterManagerImpl::dumpClusterConfigs(const Matchers::StringMatcher& name_matcher) {
91
  auto config_dump = std::make_unique<envoy::admin::v3::ClustersConfigDump>();
91
  config_dump->set_version_info(cds_api_ != nullptr ? cds_api_->versionInfo() : "");
249
  for (const auto& active_cluster_pair : active_clusters_) {
247
    const auto& cluster = *active_cluster_pair.second;
247
    if (!name_matcher.match(cluster.cluster_config_.name())) {
34
      continue;
34
    }
213
    if (!cluster.added_via_api_) {
168
      auto& static_cluster = *config_dump->mutable_static_clusters()->Add();
168
      static_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
168
      TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
168
                                            *(static_cluster.mutable_last_updated()));
168
    } else {
45
      auto& dynamic_cluster = *config_dump->mutable_dynamic_active_clusters()->Add();
45
      dynamic_cluster.set_version_info(cluster.version_info_);
45
      dynamic_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
45
      TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
45
                                            *(dynamic_cluster.mutable_last_updated()));
45
    }
213
  }
93
  for (const auto& warming_cluster_pair : warming_clusters_) {
12
    const auto& cluster = *warming_cluster_pair.second;
12
    if (!name_matcher.match(cluster.cluster_config_.name())) {
      continue;
    }
12
    auto& dynamic_cluster = *config_dump->mutable_dynamic_warming_clusters()->Add();
12
    dynamic_cluster.set_version_info(cluster.version_info_);
12
    dynamic_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
12
    TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
12
                                          *(dynamic_cluster.mutable_last_updated()));
12
  }
91
  return config_dump;
91
}
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
    ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
    const absl::optional<LocalClusterParams>& local_cluster_params)
21612
    : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(dispatcher.name(), *this),
21612
      local_stats_(generateStats(*parent.stats_.rootScope(), dispatcher.name())) {
  // If local cluster is defined then we need to initialize it first.
21612
  if (local_cluster_params.has_value()) {
16
    const auto& local_cluster_name = local_cluster_params->info_->name();
16
    ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name);
16
    thread_local_clusters_[local_cluster_name] = std::make_unique<ClusterEntry>(
16
        *this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_);
16
    local_priority_set_ = &thread_local_clusters_[local_cluster_name]->prioritySet();
16
    local_stats_.clusters_inflated_.set(thread_local_clusters_.size());
16
  }
21612
}
21612
ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImpl() {
  // Clear out connection pools as well as the thread local cluster map so that we release all
  // cluster pointers. Currently we have to free all non-local clusters before we free
  // the local cluster. This is because non-local clusters with a zone aware load balancer have a
  // member update callback registered with the local cluster.
21612
  ENVOY_LOG(debug, "shutting down thread local cluster manager");
21612
  destroying_ = true;
21612
  host_http_conn_pool_map_.clear();
21612
  host_tcp_conn_pool_map_.clear();
21612
  ASSERT(host_tcp_conn_map_.empty());
32966
  for (auto& cluster : thread_local_clusters_) {
32754
    if (&cluster.second->prioritySet() != local_priority_set_) {
32738
      cluster.second.reset();
32738
    }
32754
  }
21612
  thread_local_clusters_.clear();
  // Ensure that all pools are completely destructed.
21612
  thread_local_dispatcher_.clearDeferredDeleteList();
21612
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeTcpConn(
8
    const HostConstSharedPtr& host, Network::ClientConnection& connection) {
8
  auto host_tcp_conn_map_it = host_tcp_conn_map_.find(host);
8
  ASSERT(host_tcp_conn_map_it != host_tcp_conn_map_.end());
8
  auto& connections_map = host_tcp_conn_map_it->second.connections_;
8
  auto it = connections_map.find(&connection);
8
  ASSERT(it != connections_map.end());
8
  connection.dispatcher().deferredDelete(std::move(it->second));
8
  connections_map.erase(it);
8
  if (connections_map.empty()) {
6
    host_tcp_conn_map_.erase(host_tcp_conn_map_it);
6
  }
8
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(
128
    const std::string& name, const HostVector& hosts_removed) {
128
  auto entry = thread_local_clusters_.find(name);
  // The if should only be possible if deferred cluster creation is enabled.
128
  if (entry == thread_local_clusters_.end()) {
7
    ASSERT(
7
        parent_.deferred_cluster_creation_,
7
        fmt::format("Cannot find ThreadLocalCluster {}, but deferred cluster creation is disabled.",
7
                    name));
7
    ASSERT(thread_local_deferred_clusters_.find(name) != thread_local_deferred_clusters_.end(),
7
           "Cluster with removed host is neither deferred or inflated!");
7
    return;
7
  }
121
  const auto& cluster_entry = entry->second;
121
  ENVOY_LOG(debug, "removing hosts for TLS cluster {} removed {}", name, hosts_removed.size());
  // We need to go through and purge any connection pools for hosts that got deleted.
  // Even if two hosts actually point to the same address this will be safe, since if a
  // host is readded it will be a different physical HostSharedPtr.
121
  cluster_entry->drainConnPools(hosts_removed);
121
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
    const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params,
    LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
    const HostVector& hosts_removed, bool weighted_priority_health,
33852
    uint64_t overprovisioning_factor, HostMapConstSharedPtr cross_priority_host_map) {
33852
  ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end());
33852
  const auto& cluster_entry = thread_local_clusters_[name];
33852
  cluster_entry->updateHosts(name, priority, std::move(update_hosts_params),
33852
                             std::move(locality_weights), hosts_added, hosts_removed,
33852
                             weighted_priority_health, overprovisioning_factor,
33852
                             std::move(cross_priority_host_map));
33852
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
118
    const HostSharedPtr& host) {
118
  if (host->cluster().features() &
118
      ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
9
    drainOrCloseConnPools(host, absl::nullopt);
    // Close non connection pool TCP connections obtained from tcpConn()
    //
    // TODO(jono): The only remaining user of the non-pooled connections seems to be the statsd
    // TCP client. Perhaps it could be rewritten to use a connection pool, and this code deleted.
    //
    // Each connection will remove itself from the TcpConnectionsMap when it closes, via its
    // Network::ConnectionCallbacks. The last removed tcp conn will remove the TcpConnectionsMap
    // from host_tcp_conn_map_, so do not cache it between iterations.
    //
    // TODO(ggreenway) PERF: If there are a large number of connections, this could take a long
    // time and halt other useful work. Consider breaking up this work. Note that this behavior is
    // noted in the configuration documentation in cluster setting
    // "close_connections_on_host_health_failure". Update the docs if this if this changes.
15
    while (true) {
15
      const auto& it = host_tcp_conn_map_.find(host);
15
      if (it == host_tcp_conn_map_.end()) {
9
        break;
9
      }
6
      TcpConnectionsMap& container = it->second;
6
      container.connections_.begin()->first->close(
6
          Network::ConnectionCloseType::NoFlush,
6
          StreamInfo::LocalCloseReasons::get().NonPooledTcpConnectionHostHealthFailure);
6
    }
109
  } else {
109
    drainOrCloseConnPools(host, ConnectionPool::DrainBehavior::DrainExistingConnections);
109
  }
118
}
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ConnPoolsContainer*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer(
93848
    const HostConstSharedPtr& host, bool allocate) {
93848
  auto container_iter = host_http_conn_pool_map_.find(host);
93848
  if (container_iter == host_http_conn_pool_map_.end()) {
46429
    if (!allocate) {
34627
      return nullptr;
34627
    }
11802
    container_iter =
11802
        host_http_conn_pool_map_.try_emplace(host, thread_local_dispatcher_, host).first;
11802
  }
59221
  return &container_iter->second;
93848
}
ClusterUpdateCallbacksHandlePtr
ClusterManagerImpl::ThreadLocalClusterManagerImpl::addClusterUpdateCallbacks(
22020
    ClusterUpdateCallbacks& cb) {
22020
  return std::make_unique<ClusterUpdateCallbacksHandleImpl>(cb, update_callbacks_);
22020
}
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
    ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
    const LoadBalancerFactorySharedPtr& lb_factory)
33816
    : parent_(parent), cluster_info_(cluster), lb_factory_(lb_factory),
33816
      override_host_statuses_(HostUtility::createOverrideHostStatus(cluster_info_->lbConfig())) {
33816
  priority_set_.getOrCreateHostSet(0);
  // TODO(mattklein123): Consider converting other LBs over to thread local. All of them could
  // benefit given the healthy panic, locality, and priority calculations that take place.
33816
  ASSERT(lb_factory_ != nullptr);
33816
  lb_ = lb_factory_->create({priority_set_, parent_.local_priority_set_});
33816
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainOrCloseConnPools(
34815
    const HostSharedPtr& host, absl::optional<ConnectionPool::DrainBehavior> drain_behavior) {
  // Drain or close any HTTP connection pool for the host.
34815
  {
34815
    const auto container = getHttpConnPoolsContainer(host);
34815
    if (container != nullptr) {
190
      container->do_not_delete_ = true;
190
      if (drain_behavior.has_value()) {
188
        container->pools_->drainConnections(drain_behavior.value());
188
      } else {
        // TODO(wbpcode): 'CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE' and 'closeConnections'
        // is only supported for TCP connection pools for now. Use 'DrainExistingConnections'
        // drain here as alternative.
2
        container->pools_->drainConnections(
2
            ConnectionPool::DrainBehavior::DrainExistingConnections);
2
      }
190
      container->do_not_delete_ = false;
190
      if (container->pools_->empty()) {
160
        host_http_conn_pool_map_.erase(host);
160
      }
190
    }
34815
  }
  // Drain or close any TCP connection pool for the host.
34815
  {
34815
    const auto container = host_tcp_conn_pool_map_.find(host);
34815
    if (container != host_tcp_conn_pool_map_.end()) {
      // Draining pools or closing connections can cause pool deletion if it becomes
      // idle. Copy `pools_` so that we aren't iterating through a container that
      // gets mutated by callbacks deleting from it.
24
      std::vector<Tcp::ConnectionPool::Instance*> pools;
34
      for (const auto& pair : container->second.pools_) {
34
        pools.push_back(pair.second.get());
34
      }
34
      for (auto* pool : pools) {
34
        if (drain_behavior.has_value()) {
31
          pool->drainConnections(drain_behavior.value());
31
        } else {
3
          pool->closeConnections();
3
        }
34
      }
24
    }
34815
  }
34815
}
33816
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() {
  // We need to drain all connection pools for the cluster being removed. Then we can remove the
  // cluster.
  //
  // TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of
  // the hosts inside of the HostImpl destructor. That is a change with wide implications, so we
  // are going with a more targeted approach for now.
33816
  drainConnPools();
33816
}
Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl(
    HostConstSharedPtr host, ResourcePriority priority,
47481
    absl::optional<Http::Protocol> downstream_protocol, LoadBalancerContext* context) {
47481
  if (!host) {
3
    return nullptr;
3
  }
  // Right now, HTTP, HTTP/2 and ALPN pools are considered separate.
  // We could do better here, and always use the ALPN pool and simply make sure
  // we end up on a connection of the correct protocol, but for simplicity we're
  // starting with something simpler.
47478
  auto upstream_protocols = host->cluster().upstreamHttpProtocol(downstream_protocol);
47478
  std::vector<uint8_t> hash_key;
47478
  hash_key.reserve(upstream_protocols.size());
47626
  for (auto protocol : upstream_protocols) {
47626
    hash_key.push_back(uint8_t(protocol));
47626
  }
47478
  absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>
47478
      alternate_protocol_options =
47478
          host->cluster().httpProtocolOptions().alternateProtocolsCacheOptions();
47478
  Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
47478
  if (context) {
    // Inherit socket options from downstream connection, if set.
47406
    if (context->downstreamConnection()) {
44272
      addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
44272
    }
47406
    addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
47406
  }
  // Use the socket options for computing connection pool hash key, if any.
  // This allows socket options to control connection pooling so that connections with
  // different options are not pooled together.
47478
  for (const auto& option : *upstream_options) {
10
    option->hashKey(hash_key);
10
  }
47478
  bool have_transport_socket_options = false;
47478
  if (context && context->upstreamTransportSocketOptions()) {
44268
    host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
44268
    have_transport_socket_options = true;
44268
  }
  // If configured, use the downstream connection id in pool hash key
47478
  if (cluster_info_->connectionPoolPerDownstreamConnection() && context &&
47478
      context->downstreamConnection()) {
15
    context->downstreamConnection()->hashKey(hash_key);
15
  }
47478
  ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);
  // Note: to simplify this, we assume that the factory is only called in the scope of this
  // function. Otherwise, we'd need to capture a few of these variables by value.
47478
  ConnPoolsContainer::ConnPools::PoolOptRef pool =
47478
      container.pools_->getPool(priority, hash_key, [&]() {
11824
        auto pool = parent_.parent_.factory_.allocateConnPool(
11824
            parent_.thread_local_dispatcher_, host, priority, upstream_protocols,
11824
            alternate_protocol_options, !upstream_options->empty() ? upstream_options : nullptr,
11824
            have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
11824
            parent_.parent_.time_source_, parent_.cluster_manager_state_, quic_info_,
11824
            parent_.getNetworkObserverRegistry());
11824
        pool->addIdleCallback([&parent = parent_, host, priority, hash_key]() {
11774
          parent.httpConnPoolIsIdle(host, priority, hash_key);
11774
        });
11824
        return pool;
11824
      });
47478
  if (pool.has_value()) {
47477
    return &(pool.value().get());
47477
  } else {
1
    return nullptr;
1
  }
47478
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::httpConnPoolIsIdle(
11774
    HostConstSharedPtr host, ResourcePriority priority, const std::vector<uint8_t>& hash_key) {
11774
  if (destroying_) {
    // If the Cluster is being destroyed, this pool will be cleaned up by that
    // process.
221
    return;
221
  }
11553
  ConnPoolsContainer* container = getHttpConnPoolsContainer(host);
11553
  if (container == nullptr) {
    // This could happen if we have cleaned out the host before iterating through every
    // connection pool. Handle it by just continuing.
    return;
  }
11553
  ENVOY_LOG(trace, "Erasing idle pool for host {}", *host);
11553
  container->pools_->erasePool(priority, hash_key);
  // Guard deletion of the container with `do_not_delete_` to avoid deletion while
  // iterating through the container in `container->pools_->startDrain()`. See
  // comment in `ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools`.
11553
  if (!container->do_not_delete_ && container->pools_->empty()) {
11382
    ENVOY_LOG(trace, "Pool container empty for host {}, erasing host entry", *host);
11382
    host_http_conn_pool_map_.erase(
11382
        host); // NOTE: `container` is erased after this point in the lambda.
11382
  }
11553
}
HostSelectionResponse ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::chooseHost(
50255
    LoadBalancerContext* context) {
50255
  auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
50255
  auto host_and_strict_mode = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
50255
                                                              override_host_statuses_, context);
50255
  if (host_and_strict_mode.first != nullptr) {
30
    return {std::move(host_and_strict_mode.first)};
30
  }
50225
  if (!host_and_strict_mode.second) {
50221
    Upstream::HostSelectionResponse host_selection = lb_->chooseHost(context);
50221
    if (host_selection.host || host_selection.cancelable) {
50178
      return host_selection;
50178
    }
43
    cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
43
    ENVOY_LOG(debug, "no healthy host");
43
    return host_selection;
50221
  }
4
  cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
4
  ENVOY_LOG(debug, "no healthy host");
4
  return {nullptr};
50225
}
HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::peekAnotherHost(
130
    LoadBalancerContext* context) {
130
  auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
130
  auto host_and_strict_mode = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
130
                                                              override_host_statuses_, context);
130
  if (host_and_strict_mode.first != nullptr) {
1
    return std::move(host_and_strict_mode.first);
1
  }
  // TODO(wbpcode): should we do strict mode check of override host here?
129
  return lb_->peekAnotherHost(context);
130
}
Tcp::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolImpl(
2312
    HostConstSharedPtr host, ResourcePriority priority, LoadBalancerContext* context) {
  // Inherit socket options from downstream connection, if set.
2312
  std::vector<uint8_t> hash_key = {uint8_t(priority)};
  // Use downstream connection socket options for computing connection pool hash key, if any.
  // This allows socket options to control connection pooling so that connections with
  // different options are not pooled together.
2312
  Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
2312
  if (context) {
2248
    if (context->downstreamConnection()) {
2224
      addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
2224
    }
2248
    addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
2248
  }
2312
  for (const auto& option : *upstream_options) {
2
    option->hashKey(hash_key);
2
  }
  // If configured, use the downstream connection id in pool hash key
2312
  if (cluster_info_->connectionPoolPerDownstreamConnection() && context &&
2312
      context->downstreamConnection()) {
4
    ENVOY_LOG(trace, "honoring connection_pool_per_downstream_connection");
4
    context->downstreamConnection()->hashKey(hash_key);
4
  }
2312
  bool have_transport_socket_options = false;
2312
  if (context != nullptr && context->upstreamTransportSocketOptions() != nullptr) {
1940
    have_transport_socket_options = true;
1940
    host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
1940
  }
2312
  auto container_iter = parent_.host_tcp_conn_pool_map_.find(host);
2312
  if (container_iter == parent_.host_tcp_conn_pool_map_.end()) {
810
    container_iter = parent_.host_tcp_conn_pool_map_.try_emplace(host, host->acquireHandle()).first;
810
  }
2312
  TcpConnPoolsContainer& container = container_iter->second;
2312
  auto pool_iter = container.pools_.find(hash_key);
2312
  if (pool_iter == container.pools_.end()) {
835
    bool inserted;
835
    std::tie(pool_iter, inserted) = container.pools_.emplace(
835
        hash_key,
835
        parent_.parent_.factory_.allocateTcpConnPool(
835
            parent_.thread_local_dispatcher_, host, priority,
835
            !upstream_options->empty() ? upstream_options : nullptr,
835
            have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
835
            parent_.cluster_manager_state_, cluster_info_->tcpPoolIdleTimeout()));
835
    ASSERT(inserted);
835
    pool_iter->second->addIdleCallback(
835
        [&parent = parent_, host, hash_key]() { parent.tcpConnPoolIsIdle(host, hash_key); });
835
  }
2312
  return pool_iter->second.get();
2312
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::tcpConnPoolIsIdle(
794
    HostConstSharedPtr host, const std::vector<uint8_t>& hash_key) {
794
  if (destroying_) {
    // If the Cluster is being destroyed, this pool will be cleaned up by that process.
7
    return;
7
  }
787
  auto it = host_tcp_conn_pool_map_.find(host);
787
  if (it != host_tcp_conn_pool_map_.end()) {
787
    TcpConnPoolsContainer& container = it->second;
787
    auto erase_iter = container.pools_.find(hash_key);
787
    if (erase_iter != container.pools_.end()) {
787
      ENVOY_LOG(trace, "Idle pool, erasing pool for host {}", *host);
787
      thread_local_dispatcher_.deferredDelete(std::move(erase_iter->second));
787
      container.pools_.erase(erase_iter);
787
    }
787
    if (container.pools_.empty()) {
778
      host_tcp_conn_pool_map_.erase(
778
          host); // NOTE: `container` is erased after this point in the lambda.
778
    }
787
  }
787
}
absl::StatusOr<ClusterManagerPtr> ProdClusterManagerFactory::clusterManagerFromProto(
10669
    const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
10669
  absl::Status creation_status = absl::OkStatus();
10669
  auto cluster_manager_impl = std::unique_ptr<ClusterManagerImpl>{
10669
      new ClusterManagerImpl(bootstrap, *this, context_, creation_status)};
10669
  RETURN_IF_NOT_OK(creation_status);
10669
  return cluster_manager_impl;
10669
}
Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
    Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
    std::vector<Http::Protocol>& protocols,
    const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>&
        alternate_protocol_options,
    const Network::ConnectionSocket::OptionsSharedPtr& options,
    const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
    TimeSource& source, ClusterConnectivityState& state, Http::PersistentQuicInfoPtr& quic_info,
11755
    OptRef<Quic::EnvoyQuicNetworkObserverRegistry> network_observer_registry) {
11755
  Http::HttpServerPropertiesCacheSharedPtr alternate_protocols_cache;
11755
  if (alternate_protocol_options.has_value()) {
    // If there is configuration for an alternate protocols cache, always create one.
44
    alternate_protocols_cache =
44
        alternate_protocols_cache_manager_.getCache(alternate_protocol_options.value(), dispatcher);
11751
  } else if (!alternate_protocol_options.has_value() &&
11711
             (protocols.size() == 2 ||
11711
              (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2))) {
    // If there is no configuration for an alternate protocols cache, still
    // create one if there's an HTTP/2 upstream (either explicitly, or for mixed
    // HTTP/1.1 and HTTP/2 pools) to track the max concurrent streams across
    // connections.
5999
    envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
5999
    default_options.set_name(host->cluster().name());
5999
    alternate_protocols_cache =
5999
        alternate_protocols_cache_manager_.getCache(default_options, dispatcher);
5999
  }
11755
  absl::optional<Http::HttpServerPropertiesCache::Origin> origin =
11755
      getOrigin(transport_socket_options, host);
11755
  if (protocols.size() == 3 &&
11755
      context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100) &&
11755
      !transport_socket_options->http11ProxyInfo()) {
41
    ASSERT(contains(protocols,
41
                    {Http::Protocol::Http11, Http::Protocol::Http2, Http::Protocol::Http3}));
41
    ASSERT(alternate_protocol_options.has_value());
41
    ASSERT(alternate_protocols_cache);
41
#ifdef ENVOY_ENABLE_QUIC
41
    Envoy::Http::ConnectivityGrid::ConnectivityOptions coptions{protocols};
41
    if (quic_info == nullptr) {
29
      quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster(), context_);
29
    }
41
    return std::make_unique<Http::ConnectivityGrid>(
41
        dispatcher, context_.api().randomGenerator(), host, priority, options,
41
        transport_socket_options, state, source, alternate_protocols_cache, coptions,
41
        quic_stat_names_, *stats_.rootScope(), *quic_info, network_observer_registry,
41
        context_.overloadManager());
#else
    (void)quic_info;
    (void)network_observer_registry;
    // Should be blocked by configuration checking at an earlier point.
    PANIC("unexpected");
#endif
41
  }
11714
  if (protocols.size() >= 2) {
35
    if (origin.has_value()) {
34
      envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
34
      default_options.set_name(host->cluster().name());
34
      alternate_protocols_cache =
34
          alternate_protocols_cache_manager_.getCache(default_options, dispatcher);
34
    }
35
    ASSERT(contains(protocols, {Http::Protocol::Http11, Http::Protocol::Http2}));
35
    return std::make_unique<Http::HttpConnPoolImplMixed>(
35
        dispatcher, context_.api().randomGenerator(), host, priority, options,
35
        transport_socket_options, state, origin, alternate_protocols_cache,
35
        context_.overloadManager());
35
  }
11679
  if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2 &&
11679
      context_.runtime().snapshot().featureEnabled("upstream.use_http2", 100)) {
5967
    return Http::Http2::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
5967
                                         priority, options, transport_socket_options, state,
5967
                                         context_.overloadManager(), origin,
5967
                                         alternate_protocols_cache);
5967
  }
5712
  if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http3 &&
5712
      context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
895
#ifdef ENVOY_ENABLE_QUIC
895
    if (quic_info == nullptr) {
859
      quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster(), context_);
859
    }
895
    return Http::Http3::allocateConnPool(
895
        dispatcher, context_.api().randomGenerator(), host, priority, options,
895
        transport_socket_options, state, quic_stat_names_, {}, *stats_.rootScope(), {}, *quic_info,
895
        network_observer_registry, context_.overloadManager(), false);
#else
    UNREFERENCED_PARAMETER(source);
    // Should be blocked by configuration checking at an earlier point.
    PANIC("unexpected");
#endif
895
  }
4817
  ASSERT(protocols.size() == 1 && protocols[0] == Http::Protocol::Http11);
4817
  return Http::Http1::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority,
4817
                                       options, transport_socket_options, state,
4817
                                       context_.overloadManager());
5712
}
Tcp::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateTcpConnPool(
    Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
    const Network::ConnectionSocket::OptionsSharedPtr& options,
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
    ClusterConnectivityState& state,
768
    absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout) {
768
  ENVOY_LOG_MISC(debug, "Allocating TCP conn pool");
768
  return std::make_unique<Tcp::ConnPoolImpl>(dispatcher, host, priority, options,
768
                                             transport_socket_options, state, tcp_pool_idle_timeout,
768
                                             context_.overloadManager());
768
}
absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
ProdClusterManagerFactory::clusterFromProto(const envoy::config::cluster::v3::Cluster& cluster,
                                            Outlier::EventLoggerSharedPtr outlier_event_logger,
17287
                                            bool added_via_api) {
17287
  return ClusterFactoryImplBase::create(cluster, context_, dns_resolver_fn_, outlier_event_logger,
17287
                                        added_via_api);
17287
}
absl::StatusOr<CdsApiPtr>
ProdClusterManagerFactory::createCds(const envoy::config::core::v3::ConfigSource& cds_config,
                                     const xds::core::v3::ResourceLocator* cds_resources_locator,
715
                                     ClusterManager& cm, bool support_multi_ads_sources) {
  // TODO(htuch): Differentiate static vs. dynamic validation visitors.
715
  return CdsApiImpl::create(cds_config, cds_resources_locator, cm, *stats_.rootScope(),
715
                            context_.messageValidationContext().dynamicValidationVisitor(),
715
                            context_, support_multi_ads_sources);
715
}
} // namespace Upstream
} // namespace Envoy