1
#include "source/common/upstream/cluster_manager_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
#include <vector>
9

            
10
#include "envoy/admin/v3/config_dump.pb.h"
11
#include "envoy/config/bootstrap/v3/bootstrap.pb.h"
12
#include "envoy/config/cluster/v3/cluster.pb.h"
13
#include "envoy/config/core/v3/config_source.pb.h"
14
#include "envoy/config/core/v3/protocol.pb.h"
15
#include "envoy/event/dispatcher.h"
16
#include "envoy/grpc/async_client.h"
17
#include "envoy/network/dns.h"
18
#include "envoy/runtime/runtime.h"
19
#include "envoy/stats/scope.h"
20
#include "envoy/tcp/async_tcp_client.h"
21
#include "envoy/upstream/load_balancer.h"
22
#include "envoy/upstream/upstream.h"
23

            
24
#include "source/common/common/assert.h"
25
#include "source/common/common/enum_to_int.h"
26
#include "source/common/common/fmt.h"
27
#include "source/common/common/utility.h"
28
#include "source/common/config/null_grpc_mux_impl.h"
29
#include "source/common/config/utility.h"
30
#include "source/common/config/xds_resource.h"
31
#include "source/common/grpc/async_client_manager_impl.h"
32
#include "source/common/http/async_client_impl.h"
33
#include "source/common/http/http1/conn_pool.h"
34
#include "source/common/http/http2/conn_pool.h"
35
#include "source/common/http/mixed_conn_pool.h"
36
#include "source/common/network/utility.h"
37
#include "source/common/protobuf/utility.h"
38
#include "source/common/router/shadow_writer_impl.h"
39
#include "source/common/runtime/runtime_features.h"
40
#include "source/common/tcp/conn_pool.h"
41
#include "source/common/upstream/cds_api_impl.h"
42
#include "source/common/upstream/cluster_factory_impl.h"
43
#include "source/common/upstream/load_balancer_context_base.h"
44
#include "source/common/upstream/load_stats_reporter_impl.h"
45
#include "source/common/upstream/priority_conn_pool_map_impl.h"
46

            
47
#include "absl/hash/hash.h"
48
#include "absl/status/status.h"
49

            
50
#ifdef ENVOY_ENABLE_QUIC
51
#include "source/common/http/conn_pool_grid.h"
52
#include "source/common/http/http3/conn_pool.h"
53
#include "source/common/quic/client_connection_factory_impl.h"
54
#endif
55

            
56
namespace Envoy {
57
namespace Upstream {
58
namespace {
59

            
60
void addOptionsIfNotNull(Network::Socket::OptionsSharedPtr& options,
61
95831
                         const Network::Socket::OptionsSharedPtr& to_add) {
62
95831
  if (to_add != nullptr) {
63
43699
    Network::Socket::appendOptions(options, to_add);
64
43699
  }
65
95831
}
66

            
67
// Helper function to make sure each protocol in expected_protocols is present
68
// in protocols (only used for an ASSERT in debug builds)
69
bool contains(const std::vector<Http::Protocol>& protocols,
70
              const std::vector<Http::Protocol>& expected_protocols) {
71
  for (auto protocol : expected_protocols) {
72
    if (std::find(protocols.begin(), protocols.end(), protocol) == protocols.end()) {
73
      return false;
74
    }
75
  }
76
  return true;
77
}
78

            
79
absl::optional<Http::HttpServerPropertiesCache::Origin>
80
11688
getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostConstSharedPtr host) {
81
11688
  std::string sni = std::string(host->transportSocketFactory().defaultServerNameIndication());
82
11688
  if (options && options->serverNameOverride().has_value()) {
83
916
    sni = options->serverNameOverride().value();
84
916
  }
85
11688
  if (sni.empty() || !host->address() || !host->address()->ip()) {
86
10547
    return absl::nullopt;
87
10547
  }
88
1141
  return {{"https", sni, host->address()->ip()->port()}};
89
11688
}
90

            
91
bool isBlockingAdsCluster(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
92
15917
                          absl::string_view cluster_name) {
93
15917
  bool blocking_ads_cluster = false;
94
15917
  if (bootstrap.dynamic_resources().has_ads_config()) {
95
813
    const auto& ads_config_source = bootstrap.dynamic_resources().ads_config();
96
    // We only care about EnvoyGrpc, not GoogleGrpc, because we only need to delay ADS mux
97
    // initialization if it uses an Envoy cluster that needs to be initialized first. We don't
98
    // depend on the same cluster initialization when opening a gRPC stream for GoogleGrpc.
99
813
    blocking_ads_cluster =
100
813
        (ads_config_source.grpc_services_size() > 0 &&
101
813
         ads_config_source.grpc_services(0).has_envoy_grpc() &&
102
813
         ads_config_source.grpc_services(0).envoy_grpc().cluster_name() == cluster_name);
103
813
    if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
104
      // Validate the failover server if there is one.
105
172
      blocking_ads_cluster |=
106
172
          (ads_config_source.grpc_services_size() == 2 &&
107
172
           ads_config_source.grpc_services(1).has_envoy_grpc() &&
108
172
           ads_config_source.grpc_services(1).envoy_grpc().cluster_name() == cluster_name);
109
172
    }
110
813
  }
111
15917
  return blocking_ads_cluster;
112
15917
}
113

            
114
} // namespace
115

            
116
16654
void ClusterManagerInitHelper::addCluster(ClusterManagerCluster& cm_cluster) {
117
  // See comments in ClusterManagerImpl::addOrUpdateCluster() for why this is only called during
118
  // server initialization.
119
16654
  ASSERT(state_ != State::AllClustersInitialized);
120

            
121
16654
  const auto initialize_cb = [&cm_cluster, this] {
122
16169
    RETURN_IF_NOT_OK(onClusterInit(cm_cluster));
123
16169
    cm_cluster.cluster().info()->configUpdateStats().warming_state_.set(0);
124
16169
    return absl::OkStatus();
125
16169
  };
126
16654
  Cluster& cluster = cm_cluster.cluster();
127

            
128
16654
  cluster.info()->configUpdateStats().warming_state_.set(1);
129
16654
  if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
130
    // Remove the previous cluster before the cluster object is destroyed.
131
16292
    primary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
132
16292
    cluster.initialize(initialize_cb);
133
16298
  } else {
134
362
    ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
135
    // Remove the previous cluster before the cluster object is destroyed.
136
362
    secondary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
137
362
    if (started_secondary_initialize_) {
138
      // This can happen if we get a second CDS update that adds new clusters after we have
139
      // already started secondary init. In this case, just immediately initialize.
140
11
      cluster.initialize(initialize_cb);
141
11
    }
142
362
  }
143

            
144
16654
  ENVOY_LOG(debug, "cm init: adding: cluster={} primary={} secondary={}", cluster.info()->name(),
145
16654
            primary_init_clusters_.size(), secondary_init_clusters_.size());
146
16654
}
147

            
148
16477
absl::Status ClusterManagerInitHelper::onClusterInit(ClusterManagerCluster& cluster) {
149
16477
  ASSERT(state_ != State::AllClustersInitialized);
150
16477
  RETURN_IF_NOT_OK(per_cluster_init_callback_(cluster));
151
16477
  removeCluster(cluster);
152
16477
  return absl::OkStatus();
153
16477
}
154

            
155
17151
void ClusterManagerInitHelper::removeCluster(ClusterManagerCluster& cluster) {
156
17151
  if (state_ == State::AllClustersInitialized) {
157
667
    return;
158
667
  }
159

            
160
  // There is a remote edge case where we can remove a cluster via CDS that has not yet been
161
  // initialized. When called via the remove cluster API this code catches that case.
162
16484
  absl::flat_hash_map<std::string, ClusterManagerCluster*>* cluster_map;
163
16484
  if (cluster.cluster().initializePhase() == Cluster::InitializePhase::Primary) {
164
16160
    cluster_map = &primary_init_clusters_;
165
16166
  } else {
166
324
    ASSERT(cluster.cluster().initializePhase() == Cluster::InitializePhase::Secondary);
167
324
    cluster_map = &secondary_init_clusters_;
168
324
  }
169

            
170
  // It is possible that the cluster we are removing has already been initialized, and is not
171
  // present in the initializer map. If so, this is fine as a CDS update may happen for a
172
  // cluster with the same name. See the case "UpdateAlreadyInitialized" of the
173
  // target //test/common/upstream:cluster_manager_impl_test.
174
16484
  auto iter = cluster_map->find(cluster.cluster().info()->name());
175
16484
  if (iter != cluster_map->end() && iter->second == &cluster) {
176
16481
    cluster_map->erase(iter);
177
16481
  }
178
16484
  ENVOY_LOG(debug, "cm init: init complete: cluster={} primary={} secondary={}",
179
16484
            cluster.cluster().info()->name(), primary_init_clusters_.size(),
180
16484
            secondary_init_clusters_.size());
181
16484
  maybeFinishInitialize();
182
16484
}
183

            
184
311
void ClusterManagerInitHelper::initializeSecondaryClusters() {
185
311
  started_secondary_initialize_ = true;
186
  // Cluster::initialize() method can modify the map of secondary_init_clusters_ to remove
187
  // the item currently being initialized, so we eschew range-based-for and do this complicated
188
  // dance to increment the iterator before calling initialize.
189
646
  for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) {
190
335
    ClusterManagerCluster* cluster = iter->second;
191
335
    ENVOY_LOG(debug, "initializing secondary cluster {}", iter->first);
192
335
    ++iter;
193
335
    cluster->cluster().initialize([cluster, this] { return onClusterInit(*cluster); });
194
335
  }
195
311
}
196

            
197
39034
void ClusterManagerInitHelper::maybeFinishInitialize() {
198
  // Do not do anything if we are still doing the initial static load or if we are waiting for
199
  // CDS initialize.
200
39034
  ENVOY_LOG(debug, "maybe finish initialize state: {}", enumToInt(state_));
201
39034
  if (state_ == State::Loading || state_ == State::WaitingToStartCdsInitialization) {
202
15958
    return;
203
15958
  }
204

            
205
23076
  ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
206
23076
         state_ == State::CdsInitialized ||
207
23076
         state_ == State::WaitingForPrimaryInitializationToComplete);
208
23076
  ENVOY_LOG(debug, "maybe finish initialize primary init clusters empty: {}",
209
23076
            primary_init_clusters_.empty());
210
  // If we are still waiting for primary clusters to initialize, do nothing.
211
23076
  if (!primary_init_clusters_.empty()) {
212
305
    return;
213
22820
  } else if (state_ == State::WaitingForPrimaryInitializationToComplete) {
214
10911
    state_ = State::WaitingToStartSecondaryInitialization;
215
10911
    if (primary_clusters_initialized_callback_) {
216
115
      primary_clusters_initialized_callback_();
217
115
    }
218
10911
    return;
219
10911
  }
220

            
221
  // If we are still waiting for secondary clusters to initialize, see if we need to first call
222
  // initialize on them. This is only done once.
223
11860
  ENVOY_LOG(debug, "maybe finish initialize secondary init clusters empty: {}",
224
11860
            secondary_init_clusters_.empty());
225
11860
  if (!secondary_init_clusters_.empty()) {
226
338
    if (!started_secondary_initialize_) {
227
311
      ENVOY_LOG(info, "cm init: initializing secondary clusters");
228
      // If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment
229
      // should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to
230
      // avoid double pause ClusterLoadAssignment.
231
311
      const std::vector<std::string> paused_xds_types{
232
311
          Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(),
233
311
          Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>(),
234
311
          Config::getTypeUrl<envoy::extensions::transport_sockets::tls::v3::Secret>()};
235
311
      Config::ScopedResume resume_eds_leds_sds = xds_manager_.pause(paused_xds_types);
236
311
      initializeSecondaryClusters();
237
311
    }
238
338
    return;
239
338
  }
240

            
241
  // At this point, if we are doing static init, and we have CDS, start CDS init. Otherwise, move
242
  // directly to initialized.
243
11522
  started_secondary_initialize_ = false;
244
11522
  ENVOY_LOG(debug, "maybe finish initialize cds api ready: {}", cds_ != nullptr);
245
11522
  if (state_ == State::WaitingToStartSecondaryInitialization && cds_) {
246
714
    ENVOY_LOG(info, "cm init: initializing cds");
247
714
    state_ = State::WaitingToStartCdsInitialization;
248
714
    cds_->initialize();
249
10832
  } else {
250
10808
    ENVOY_LOG(info, "cm init: all clusters initialized");
251
10808
    state_ = State::AllClustersInitialized;
252
10808
    if (initialized_callback_) {
253
668
      initialized_callback_();
254
668
    }
255
10808
  }
256
11522
}
257

            
258
11016
void ClusterManagerInitHelper::onStaticLoadComplete() {
259
11016
  ASSERT(state_ == State::Loading);
260
  // After initialization of primary clusters has completed, transition to
261
  // waiting for signal to initialize secondary clusters and then CDS.
262
11016
  state_ = State::WaitingForPrimaryInitializationToComplete;
263
11016
  maybeFinishInitialize();
264
11016
}
265

            
266
10838
void ClusterManagerInitHelper::startInitializingSecondaryClusters() {
267
10838
  ASSERT(state_ == State::WaitingToStartSecondaryInitialization);
268
10838
  ENVOY_LOG(debug, "continue initializing secondary clusters");
269
10838
  maybeFinishInitialize();
270
10838
}
271

            
272
11008
void ClusterManagerInitHelper::setCds(CdsApi* cds) {
273
11008
  ASSERT(state_ == State::Loading);
274
11008
  cds_ = cds;
275
11008
  if (cds_) {
276
718
    cds_->setInitializedCb([this]() -> void {
277
696
      ASSERT(state_ == State::WaitingToStartCdsInitialization);
278
696
      state_ = State::CdsInitialized;
279
696
      maybeFinishInitialize();
280
696
    });
281
718
  }
282
11008
}
283

            
284
void ClusterManagerInitHelper::setInitializedCb(
285
10697
    ClusterManager::InitializationCompleteCallback callback) {
286
10697
  if (state_ == State::AllClustersInitialized) {
287
9953
    callback();
288
10522
  } else {
289
744
    initialized_callback_ = callback;
290
744
  }
291
10697
}
292

            
293
void ClusterManagerInitHelper::setPrimaryClustersInitializedCb(
294
10843
    ClusterManager::PrimaryClustersReadyCallback callback) {
295
  // The callback must be set before or at the `WaitingToStartSecondaryInitialization` state.
296
10843
  ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||
297
10843
         state_ == State::WaitingForPrimaryInitializationToComplete || state_ == State::Loading);
298
10843
  if (state_ == State::WaitingToStartSecondaryInitialization) {
299
    // This is the case where all clusters are STATIC and without health checking.
300
10713
    callback();
301
10755
  } else {
302
130
    primary_clusters_initialized_callback_ = callback;
303
130
  }
304
10843
}
305

            
306
ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
307
                                       ClusterManagerFactory& factory,
308
                                       Server::Configuration::ServerFactoryContext& context,
309
                                       absl::Status& creation_status)
310
11028
    : context_(context), factory_(factory), runtime_(context.runtime()),
311
11028
      stats_(context.serverScope().store()), tls_(context.threadLocal()),
312
11028
      xds_manager_(context.xdsManager()), random_(context.api().randomGenerator()),
313
11028
      deferred_cluster_creation_(bootstrap.cluster_manager().enable_deferred_cluster_creation()),
314
11028
      bind_config_(bootstrap.cluster_manager().has_upstream_bind_config()
315
11028
                       ? absl::make_optional(bootstrap.cluster_manager().upstream_bind_config())
316
11028
                       : absl::nullopt),
317
11028
      local_info_(context.localInfo()), cm_stats_(generateStats(*stats_.rootScope())),
318
11028
      init_helper_(xds_manager_,
319
16665
                   [this](ClusterManagerCluster& cluster) { return onClusterInit(cluster); }),
320
11028
      time_source_(context.timeSource()), dispatcher_(context.mainThreadDispatcher()),
321
11028
      http_context_(context.httpContext()), router_context_(context.routerContext()),
322
11028
      cluster_stat_names_(stats_.symbolTable()),
323
11028
      cluster_config_update_stat_names_(stats_.symbolTable()),
324
11028
      cluster_lb_stat_names_(stats_.symbolTable()),
325
11028
      cluster_endpoint_stat_names_(stats_.symbolTable()),
326
11028
      cluster_load_report_stat_names_(stats_.symbolTable()),
327
11028
      cluster_circuit_breakers_stat_names_(stats_.symbolTable()),
328
11028
      cluster_request_response_size_stat_names_(stats_.symbolTable()),
329
11028
      cluster_timeout_budget_stat_names_(stats_.symbolTable()),
330
      common_lb_config_pool_(
331
11028
          std::make_shared<SharedPool::ObjectSharedPool<
332
11028
              const envoy::config::cluster::v3::Cluster::CommonLbConfig, MessageUtil, MessageUtil>>(
333
11028
              dispatcher_)),
334
11028
      shutdown_(false) {
335
11028
  if (auto admin = context.admin(); admin.has_value()) {
336
11028
    config_tracker_entry_ = admin->getConfigTracker().add(
337
11028
        "clusters", [this](const Matchers::StringMatcher& name_matcher) {
338
91
          return dumpClusterConfigs(name_matcher);
339
91
        });
340
11028
  }
341
11028
  async_client_manager_ = std::make_unique<Grpc::AsyncClientManagerImpl>(
342
11028
      bootstrap.grpc_async_client_manager_config(), context, context.grpcContext().statNames());
343
11028
  const auto& cm_config = bootstrap.cluster_manager();
344
11028
  if (cm_config.has_outlier_detection()) {
345
3
    const std::string event_log_file_path = cm_config.outlier_detection().event_log_path();
346
3
    if (!event_log_file_path.empty()) {
347
3
      auto outlier_or_error = Outlier::EventLoggerImpl::create(context.accessLogManager(),
348
3
                                                               event_log_file_path, time_source_);
349
3
      SET_AND_RETURN_IF_NOT_OK(outlier_or_error.status(), creation_status);
350
3
      outlier_event_logger_ = std::move(*outlier_or_error);
351
3
    }
352
3
  }
353

            
354
  // We need to know whether we're zone aware early on, so make sure we do this lookup
355
  // before we load any clusters.
356
11028
  if (!cm_config.local_cluster_name().empty()) {
357
13
    local_cluster_name_ = cm_config.local_cluster_name();
358
13
  }
359

            
360
  // Now that the async-client manager is set, the xDS-Manager can be initialized.
361
11028
  SET_AND_RETURN_IF_NOT_OK(xds_manager_.initialize(bootstrap, this), creation_status);
362
11028
}
363

            
364
absl::Status
365
11027
ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
366
11027
  ASSERT(!initialized_);
367
11027
  initialized_ = true;
368

            
369
  // Cluster loading happens in two phases: first all the primary clusters are loaded, and then all
370
  // the secondary clusters are loaded. As it currently stands all non-EDS clusters and EDS which
371
  // load endpoint definition from file are primary and
372
  // (REST,GRPC,DELTA_GRPC) EDS clusters are secondary. This two phase
373
  // loading is done because in v2 configuration each EDS cluster individually sets up a
374
  // subscription. When this subscription is an API source the cluster will depend on a non-EDS
375
  // cluster, so the non-EDS clusters must be loaded first.
376
32174
  auto is_primary_cluster = [](const envoy::config::cluster::v3::Cluster& cluster) -> bool {
377
31968
    return cluster.type() != envoy::config::cluster::v3::Cluster::EDS ||
378
31968
           (cluster.type() == envoy::config::cluster::v3::Cluster::EDS &&
379
204
            Config::SubscriptionFactory::isPathBasedConfigSource(
380
204
                cluster.eds_cluster_config().eds_config().config_source_specifier_case()));
381
31968
  };
382
  // Build book-keeping for which clusters are primary. This is useful when we
383
  // invoke loadCluster() below and it needs the complete set of primaries.
384
16225
  for (const auto& cluster : bootstrap.static_resources().clusters()) {
385
15984
    if (is_primary_cluster(cluster)) {
386
15917
      primary_clusters_.insert(cluster.name());
387
15917
    }
388
15984
  }
389

            
390
11027
  bool has_ads_cluster = false;
391
  // Load all the primary clusters.
392
16225
  for (const auto& cluster : bootstrap.static_resources().clusters()) {
393
15984
    if (is_primary_cluster(cluster)) {
394
15917
      const bool required_for_ads = isBlockingAdsCluster(bootstrap, cluster.name());
395
15917
      has_ads_cluster |= required_for_ads;
396
      // TODO(abeyad): Consider passing a lambda for a "post-cluster-init" callback, which would
397
      // include a conditional ads_mux_->start() call, if other uses cases for "post-cluster-init"
398
      // functionality pops up.
399
15917
      auto status_or_cluster =
400
15917
          loadCluster(cluster, MessageUtil::hash(cluster), "", /*added_via_api=*/false,
401
15917
                      required_for_ads, active_clusters_);
402
15917
      RETURN_IF_NOT_OK_REF(status_or_cluster.status());
403
15913
    }
404
15984
  }
405

            
406
  // Now setup ADS if needed, this might rely on a primary cluster.
407
11023
  RETURN_IF_NOT_OK(xds_manager_.initializeAdsConnections(bootstrap));
408

            
409
  // After ADS is initialized, load EDS static clusters as EDS config may potentially need ADS.
410
16216
  for (const auto& cluster : bootstrap.static_resources().clusters()) {
411
    // Now load all the secondary clusters.
412
15965
    if (cluster.type() == envoy::config::cluster::v3::Cluster::EDS &&
413
15965
        !Config::SubscriptionFactory::isPathBasedConfigSource(
414
101
            cluster.eds_cluster_config().eds_config().config_source_specifier_case())) {
415
66
      ASSERT(!isBlockingAdsCluster(bootstrap, cluster.name()));
416
      // Passing "false" for required_for_ads because an ADS cluster cannot be
417
      // defined using EDS (or non-primary cluster).
418
66
      auto status_or_cluster =
419
66
          loadCluster(cluster, MessageUtil::hash(cluster), "", /*added_via_api=*/false,
420
66
                      /*required_for_ads=*/false, active_clusters_);
421
66
      if (!status_or_cluster.status().ok()) {
422
        return status_or_cluster.status();
423
      }
424
66
    }
425
15965
  }
426

            
427
11022
  cm_stats_.cluster_added_.add(bootstrap.static_resources().clusters().size());
428
11022
  updateClusterCounts();
429

            
430
11022
  absl::optional<ThreadLocalClusterManagerImpl::LocalClusterParams> local_cluster_params;
431
11022
  if (local_cluster_name_) {
432
11
    auto local_cluster = active_clusters_.find(local_cluster_name_.value());
433
11
    if (local_cluster == active_clusters_.end()) {
434
1
      return absl::InvalidArgumentError(
435
1
          fmt::format("local cluster '{}' must be defined", local_cluster_name_.value()));
436
1
    }
437
10
    local_cluster_params.emplace();
438
10
    local_cluster_params->info_ = local_cluster->second->cluster().info();
439
10
    local_cluster_params->load_balancer_factory_ = local_cluster->second->loadBalancerFactory();
440
10
    local_cluster->second->setAddedOrUpdated();
441
10
  }
442

            
443
  // Once the initial set of static bootstrap clusters are created (including the local cluster),
444
  // we can instantiate the thread local cluster manager.
445
21657
  tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) {
446
21644
    return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_params);
447
21644
  });
448

            
449
11021
  const auto& dyn_resources = bootstrap.dynamic_resources();
450
  // We can now potentially create the CDS API once the backing cluster exists.
451
11021
  if (dyn_resources.has_cds_config() || !dyn_resources.cds_resources_locator().empty()) {
452
723
    std::unique_ptr<xds::core::v3::ResourceLocator> cds_resources_locator;
453
723
    if (!dyn_resources.cds_resources_locator().empty()) {
454
14
      auto url_or_error =
455
14
          Config::XdsResourceIdentifier::decodeUrl(dyn_resources.cds_resources_locator());
456
14
      RETURN_IF_NOT_OK_REF(url_or_error.status());
457
14
      cds_resources_locator =
458
14
          std::make_unique<xds::core::v3::ResourceLocator>(std::move(url_or_error.value()));
459
14
    }
460
    // In case cds_config is configured and the new xDS-TP configs are used,
461
    // then the CdsApi will need to track the resources, as the xDS-TP configs
462
    // may be used for OD-CDS. If this is not set, the SotW update may override
463
    // the OD-CDS resources.
464
723
    const bool support_multi_ads_sources =
465
723
        bootstrap.has_default_config_source() || !bootstrap.config_sources().empty();
466
723
    auto cds_or_error = factory_.createCds(dyn_resources.cds_config(), cds_resources_locator.get(),
467
723
                                           *this, support_multi_ads_sources);
468
723
    RETURN_IF_NOT_OK_REF(cds_or_error.status())
469
723
    cds_api_ = std::move(*cds_or_error);
470
723
    init_helper_.setCds(cds_api_.get());
471
10900
  } else {
472
10298
    init_helper_.setCds(nullptr);
473
10298
  }
474

            
475
  // Proceed to add all static bootstrap clusters to the init manager. This will immediately
476
  // initialize any primary clusters. Post-init processing further initializes any thread
477
  // aware load balancer and sets up the per-worker host set updates.
478
16215
  for (auto& cluster : active_clusters_) {
479
15961
    init_helper_.addCluster(*cluster.second);
480
15961
  }
481

            
482
  // Potentially move to secondary initialization on the static bootstrap clusters if all primary
483
  // clusters have already initialized. (E.g., if all static).
484
11021
  init_helper_.onStaticLoadComplete();
485

            
486
  // Initialize the ADS and xDS-TP config based connections.
487
11021
  if (!has_ads_cluster) {
488
    // There is no ADS cluster, so we won't be starting the ADS mux after a cluster has finished
489
    // initializing, so we must start ADS here.
490
10799
    xds_manager_.adsMux()->start();
491
10799
  }
492
  // TODO(adisuissa): to ensure parity with the non-xdstp-config-based ADS
493
  // we need to change this to only be invoked for Envoy-based clusters when
494
  // they are ready (this is needed to avoid early connection attempts in the
495
  // DNS based clusters).
496
11021
  xds_manager_.startXdstpAdsMuxes();
497
11021
  return absl::OkStatus();
498
11021
}
499

            
500
absl::Status ClusterManagerImpl::initializeSecondaryClusters(
501
10831
    const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
502
10831
  init_helper_.startInitializingSecondaryClusters();
503

            
504
10831
  const auto& cm_config = bootstrap.cluster_manager();
505
10831
  if (cm_config.has_load_stats_config()) {
506
31
    const auto& load_stats_config = cm_config.load_stats_config();
507

            
508
31
    absl::Status status = Config::Utility::checkTransportVersion(load_stats_config);
509
31
    RETURN_IF_NOT_OK(status);
510
31
    absl::StatusOr<Grpc::RawAsyncClientSharedPtr> client_or_error;
511
31
    if (Runtime::runtimeFeatureEnabled("envoy.restart_features.use_cached_grpc_client_for_xds")) {
512
2
      absl::StatusOr<Envoy::OptRef<const envoy::config::core::v3::GrpcService>> maybe_grpc_service =
513
2
          Envoy::Config::Utility::getGrpcConfigFromApiConfigSource(load_stats_config,
514
2
                                                                   /*grpc_service_idx*/ 0,
515
2
                                                                   /*xdstp_config_source*/ false);
516
2
      RETURN_IF_NOT_OK_REF(maybe_grpc_service.status());
517
2
      if (maybe_grpc_service.value().has_value()) {
518
2
        client_or_error = async_client_manager_->getOrCreateRawAsyncClientWithHashKey(
519
2
            Grpc::GrpcServiceConfigWithHashKey(*maybe_grpc_service.value()), *stats_.rootScope(),
520
2
            /*skip_cluster_check*/ false);
521
2
      } else {
522
        return absl::InvalidArgumentError("Invalid grpc service.");
523
      }
524
29
    } else {
525
29
      auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
526
29
          *async_client_manager_, load_stats_config, *stats_.rootScope(), false, 0, false);
527
29
      RETURN_IF_NOT_OK_REF(factory_or_error.status());
528
28
      client_or_error = factory_or_error.value()->createUncachedRawAsyncClient();
529
28
    }
530
30
    RETURN_IF_NOT_OK_REF(client_or_error.status());
531
30
    load_stats_reporter_ = std::make_unique<LoadStatsReporterImpl>(
532
30
        local_info_, *this, *stats_.rootScope(), std::move(client_or_error.value()), dispatcher_);
533
30
  }
534
10830
  return absl::OkStatus();
535
10831
}
536

            
537
11028
ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) {
538
11028
  const std::string final_prefix = "cluster_manager.";
539
11028
  return {ALL_CLUSTER_MANAGER_STATS(POOL_COUNTER_PREFIX(scope, final_prefix),
540
11028
                                    POOL_GAUGE_PREFIX(scope, final_prefix))};
541
11028
}
542

            
543
ThreadLocalClusterManagerStats
544
ClusterManagerImpl::ThreadLocalClusterManagerImpl::generateStats(Stats::Scope& scope,
545
21644
                                                                 const std::string& thread_name) {
546
21644
  const std::string final_prefix = absl::StrCat("thread_local_cluster_manager.", thread_name);
547
21644
  return {ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(POOL_GAUGE_PREFIX(scope, final_prefix))};
548
21644
}
549

            
550
17368
absl::Status ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) {
551
  // This routine is called when a cluster has finished initializing. The cluster has not yet
552
  // been setup for cross-thread updates to avoid needless updates during initialization. The order
553
  // of operations here is important. We start by initializing the thread aware load balancer if
554
  // needed. This must happen first so cluster updates are heard first by the load balancer.
555
  // Also, it assures that all of clusters which this function is called should be always active.
556
17368
  auto& cluster = cm_cluster.cluster();
557
17368
  auto cluster_data = warming_clusters_.find(cluster.info()->name());
558
  // We have a situation that clusters will be immediately active, such as static and primary
559
  // cluster. So we must have this prevention logic here.
560
17368
  if (cluster_data != warming_clusters_.end()) {
561
1554
    clusterWarmingToActive(cluster.info()->name());
562
1554
    updateClusterCounts();
563
1554
  }
564
17368
  cluster_data = active_clusters_.find(cluster.info()->name());
565

            
566
17368
  if (cluster_data->second->thread_aware_lb_ != nullptr) {
567
17368
    RETURN_IF_NOT_OK(cluster_data->second->thread_aware_lb_->initialize());
568
17368
  }
569

            
570
  // Now setup for cross-thread updates.
571
  // This is used by cluster types such as EDS clusters to drain the connection pools of removed
572
  // hosts.
573
17368
  cluster_data->second->member_update_cb_ = cluster.prioritySet().addMemberUpdateCb(
574
17494
      [&cluster, this](const HostVector&, const HostVector& hosts_removed) {
575
902
        if (cluster.info()->lbConfig().close_connections_on_host_set_change()) {
576
6
          for (const auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
577
            // This will drain all tcp and http connection pools.
578
6
            postThreadLocalRemoveHosts(cluster, host_set->hosts());
579
6
          }
580
896
        } else {
581
          // TODO(snowp): Should this be subject to merge windows?
582

            
583
          // Whenever hosts are removed from the cluster, we make each TLS cluster drain it's
584
          // connection pools for the removed hosts. If `close_connections_on_host_set_change` is
585
          // enabled, this case will be covered by first `if` statement, where all
586
          // connection pools are drained.
587
896
          if (!hosts_removed.empty()) {
588
76
            postThreadLocalRemoveHosts(cluster, hosts_removed);
589
76
          }
590
896
        }
591
902
      });
592

            
593
  // This is used by cluster types such as EDS clusters to update the cluster
594
  // without draining the cluster.
595
17368
  cluster_data->second->priority_update_cb_ = cluster.prioritySet().addPriorityUpdateCb(
596
17368
      [&cm_cluster, this](uint32_t priority, const HostVector& hosts_added,
597
17491
                          const HostVector& hosts_removed) {
598
        // This fires when a cluster is about to have an updated member set. We need to send this
599
        // out to all of the thread local configurations.
600

            
601
        // Should we save this update and merge it with other updates?
602
        //
603
        // Note that we can only _safely_ merge updates that have no added/removed hosts. That is,
604
        // only those updates that signal a change in host healthcheck state, weight or metadata.
605
        //
606
        // We've discussed merging updates related to hosts being added/removed, but it's really
607
        // tricky to merge those given that downstream consumers of these updates expect to see the
608
        // full list of updates, not a condensed one. This is because they use the broadcasted
609
        // HostSharedPtrs within internal maps to track hosts. If we fail to broadcast the entire
610
        // list of removals, these maps will leak those HostSharedPtrs.
611
        //
612
        // See https://github.com/envoyproxy/envoy/pull/3941 for more context.
613
505
        bool scheduled = false;
614
505
        const auto merge_timeout = PROTOBUF_GET_MS_OR_DEFAULT(
615
505
            cm_cluster.cluster().info()->lbConfig(), update_merge_window, 1000);
616
        // Remember: we only merge updates with no adds/removes — just hc/weight/metadata changes.
617
505
        const bool is_mergeable = hosts_added.empty() && hosts_removed.empty();
618

            
619
505
        if (merge_timeout > 0) {
620
          // If this is not mergeable, we should cancel any scheduled updates since
621
          // we'll deliver it immediately.
622
459
          scheduled = scheduleUpdate(cm_cluster, priority, is_mergeable, merge_timeout);
623
459
        }
624

            
625
        // If an update was not scheduled for later, deliver it immediately.
626
505
        if (!scheduled) {
627
439
          cm_stats_.cluster_updated_.inc();
628
439
          postThreadLocalClusterUpdate(
629
439
              cm_cluster, ThreadLocalClusterUpdateParams(priority, hosts_added, hosts_removed));
630
439
        }
631
505
      });
632

            
633
  // Finally, post updates cross-thread so the per-thread load balancers are ready. First we
634
  // populate any update information that may be available after cluster init.
635
17368
  ThreadLocalClusterUpdateParams params;
636
17441
  for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
637
17439
    if (host_set->hosts().empty()) {
638
488
      continue;
639
488
    }
640
16951
    params.per_priority_update_params_.emplace_back(host_set->priority(), host_set->hosts(),
641
16951
                                                    HostVector{});
642
16951
  }
643
  // NOTE: In all cases *other* than the local cluster, this is when a cluster is added/updated
644
  // The local cluster must currently be statically defined and must exist prior to other
645
  // clusters being added/updated. We could gate the below update on hosts being available on
646
  // the cluster or the cluster not already existing, but the special logic is not worth it.
647
17368
  postThreadLocalClusterUpdate(cm_cluster, std::move(params));
648
17368
  return absl::OkStatus();
649
17368
}
650

            
651
bool ClusterManagerImpl::scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority,
652
459
                                        bool mergeable, const uint64_t timeout) {
653
  // Find pending updates for this cluster.
654
459
  auto& updates_by_prio = updates_map_[cluster.cluster().info()->name()];
655
459
  if (!updates_by_prio) {
656
246
    updates_by_prio = std::make_unique<PendingUpdatesByPriorityMap>();
657
246
  }
658

            
659
  // Find pending updates for this priority.
660
459
  auto& updates = (*updates_by_prio)[priority];
661
459
  if (!updates) {
662
286
    updates = std::make_unique<PendingUpdates>();
663
286
  }
664

            
665
  // Has an update_merge_window gone by since the last update? If so, don't schedule
666
  // the update so it can be applied immediately. Ditto if this is not a mergeable update.
667
459
  const auto delta = time_source_.monotonicTime() - updates->last_updated_;
668
459
  const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
669
459
  const bool out_of_merge_window = delta_ms > timeout;
670
459
  if (out_of_merge_window || !mergeable) {
671
    // If there was a pending update, we cancel the pending merged update.
672
    //
673
    // Note: it's possible that even though we are outside of a merge window (delta_ms > timeout),
674
    // a timer is enabled. This race condition is fine, since we'll disable the timer here and
675
    // deliver the update immediately.
676

            
677
    // Why wasn't the update scheduled for later delivery? We keep some stats that are helpful
678
    // to understand why merging did not happen. There's 2 things we are tracking here:
679

            
680
    // 1) Was this update out of a merge window?
681
393
    if (mergeable && out_of_merge_window) {
682
74
      cm_stats_.update_out_of_merge_window_.inc();
683
74
    }
684

            
685
    // 2) Were there previous updates that we are cancelling (and delivering immediately)?
686
393
    if (updates->disableTimer()) {
687
12
      cm_stats_.update_merge_cancelled_.inc();
688
12
    }
689

            
690
393
    updates->last_updated_ = time_source_.monotonicTime();
691
393
    return false;
692
393
  }
693

            
694
  // If there's no timer, create one.
695
66
  if (updates->timer_ == nullptr) {
696
36
    updates->timer_ = dispatcher_.createTimer([this, &cluster, priority, &updates]() -> void {
697
4
      applyUpdates(cluster, priority, *updates);
698
4
    });
699
36
  }
700

            
701
  // Ensure there's a timer set to deliver these updates.
702
66
  if (!updates->timer_->enabled()) {
703
44
    updates->enableTimer(timeout);
704
44
  }
705

            
706
66
  return true;
707
459
}
708

            
709
void ClusterManagerImpl::applyUpdates(ClusterManagerCluster& cluster, uint32_t priority,
710
4
                                      PendingUpdates& updates) {
711
  // Deliver pending updates.
712

            
713
  // Remember that these merged updates are _only_ for updates related to
714
  // HC/weight/metadata changes. That's why added/removed are empty. All
715
  // adds/removals were already immediately broadcasted.
716
4
  static const HostVector hosts_added;
717
4
  static const HostVector hosts_removed;
718

            
719
4
  postThreadLocalClusterUpdate(
720
4
      cluster, ThreadLocalClusterUpdateParams(priority, hosts_added, hosts_removed));
721

            
722
4
  cm_stats_.cluster_updated_via_merge_.inc();
723
4
  updates.last_updated_ = time_source_.monotonicTime();
724
4
}
725

            
726
absl::StatusOr<bool>
727
ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cluster& cluster,
728
                                       const std::string& version_info,
729
1763
                                       const bool avoid_cds_removal) {
730
  // First we need to see if this new config is new or an update to an existing dynamic cluster.
731
  // We don't allow updates to statically configured clusters in the main configuration. We check
732
  // both the warming clusters and the active clusters to see if we need an update or the update
733
  // should be blocked.
734
1763
  const std::string& cluster_name = cluster.name();
735
1763
  const auto existing_active_cluster = active_clusters_.find(cluster_name);
736
1763
  const auto existing_warming_cluster = warming_clusters_.find(cluster_name);
737
1763
  const uint64_t new_hash = MessageUtil::hash(cluster);
738
1763
  if (existing_warming_cluster != warming_clusters_.end()) {
739
    // If the cluster is the same as the warming cluster of the same name, block the update.
740
30
    if (existing_warming_cluster->second->blockUpdate(new_hash)) {
741
12
      return false;
742
12
    }
743
    // NB: https://github.com/envoyproxy/envoy/issues/14598
744
    // Always proceed if the cluster is different from the existing warming cluster.
745
1733
  } else if (existing_active_cluster != active_clusters_.end() &&
746
1733
             existing_active_cluster->second->blockUpdate(new_hash)) {
747
    // If there's no warming cluster of the same name, and if the cluster is the same as the active
748
    // cluster of the same name, block the update.
749
133
    return false;
750
133
  }
751

            
752
1618
  if (existing_active_cluster != active_clusters_.end() ||
753
1618
      existing_warming_cluster != warming_clusters_.end()) {
754
88
    if (existing_active_cluster != active_clusters_.end()) {
755
      // The following init manager remove call is a NOP in the case we are already initialized.
756
      // It's just kept here to avoid additional logic.
757
72
      init_helper_.removeCluster(*existing_active_cluster->second);
758
72
    }
759
88
    cm_stats_.cluster_modified_.inc();
760
1530
  } else {
761
1530
    cm_stats_.cluster_added_.inc();
762
1530
  }
763

            
764
  // There are two discrete paths here depending on when we are adding/updating a cluster.
765
  // 1) During initial server load we use the init manager which handles complex logic related to
766
  //    primary/secondary init, static/CDS init, warming all clusters, etc.
767
  // 2) After initial server load, we handle warming independently for each cluster in the warming
768
  //    map.
769
  // Note: It's likely possible that all warming logic could be centralized in the init manager, but
770
  //       a decision was made to split the logic given how complex the init manager already is. In
771
  //       the future we may decide to undergo a refactor to unify the logic but the effort/risk to
772
  //       do that right now does not seem worth it given that the logic is generally pretty clean
773
  //       and easy to understand.
774
1618
  const bool all_clusters_initialized =
775
1618
      init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
776
  // Preserve the previous cluster data to avoid early destroy. The same cluster should be added
777
  // before destroy to avoid early initialization complete.
778
1618
  auto status_or_cluster =
779
1618
      loadCluster(cluster, new_hash, version_info, /*added_via_api=*/true,
780
1618
                  /*required_for_ads=*/false, warming_clusters_, avoid_cds_removal);
781
1618
  RETURN_IF_NOT_OK_REF(status_or_cluster.status());
782
1618
  const ClusterDataPtr previous_cluster = std::move(status_or_cluster.value());
783
1618
  auto& cluster_entry = warming_clusters_.at(cluster_name);
784
1618
  cluster_entry->cluster_->info()->configUpdateStats().warming_state_.set(1);
785
1618
  if (!all_clusters_initialized) {
786
680
    ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
787
680
    init_helper_.addCluster(*cluster_entry);
788
1373
  } else {
789
938
    ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
790
938
    cluster_entry->cluster_->initialize([this, cluster_name] {
791
902
      ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
792
902
      auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
793
902
      state_changed_cluster_entry->second->cluster_->info()->configUpdateStats().warming_state_.set(
794
902
          0);
795
902
      return onClusterInit(*state_changed_cluster_entry->second);
796
902
    });
797
938
  }
798

            
799
1618
  return true;
800
1618
}
801

            
802
1554
void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
803
1554
  auto warming_it = warming_clusters_.find(cluster_name);
804
1554
  ASSERT(warming_it != warming_clusters_.end());
805

            
806
  // If the cluster is being updated, we need to cancel any pending merged updates.
807
  // Otherwise, applyUpdates() will fire with a dangling cluster reference.
808
1554
  updates_map_.erase(cluster_name);
809

            
810
1554
  active_clusters_[cluster_name] = std::move(warming_it->second);
811
1554
  warming_clusters_.erase(warming_it);
812
1554
}
813

            
814
1774
bool ClusterManagerImpl::removeCluster(const std::string& cluster_name, const bool remove_ignored) {
815
1774
  bool removed = false;
816
1774
  auto existing_active_cluster = active_clusters_.find(cluster_name);
817
1774
  if (existing_active_cluster != active_clusters_.end() &&
818
1774
      existing_active_cluster->second->added_via_api_ &&
819
1774
      (!existing_active_cluster->second->avoid_cds_removal_ || remove_ignored)) {
820
587
    removed = true;
821
587
    init_helper_.removeCluster(*existing_active_cluster->second);
822
587
    active_clusters_.erase(existing_active_cluster);
823

            
824
587
    ENVOY_LOG(debug, "removing cluster {}", cluster_name);
825
1158
    tls_.runOnAllThreads([cluster_name](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
826
1158
      ASSERT(cluster_manager->thread_local_clusters_.contains(cluster_name) ||
827
1158
             cluster_manager->thread_local_deferred_clusters_.contains(cluster_name));
828
1158
      ENVOY_LOG(debug, "removing TLS cluster {}", cluster_name);
829
1158
      for (auto cb_it = cluster_manager->update_callbacks_.begin();
830
2349
           cb_it != cluster_manager->update_callbacks_.end();) {
831
        // The current callback may remove itself from the list, so a handle for
832
        // the next item is fetched before calling the callback.
833
1191
        auto curr_cb_it = cb_it;
834
1191
        ++cb_it;
835
1191
        (*curr_cb_it)->onClusterRemoval(cluster_name);
836
1191
      }
837
1158
      cluster_manager->thread_local_clusters_.erase(cluster_name);
838
1158
      cluster_manager->thread_local_deferred_clusters_.erase(cluster_name);
839
1158
      cluster_manager->local_stats_.clusters_inflated_.set(
840
1158
          cluster_manager->thread_local_clusters_.size());
841
1158
    });
842
587
    cluster_initialization_map_.erase(cluster_name);
843
587
  }
844

            
845
1774
  auto existing_warming_cluster = warming_clusters_.find(cluster_name);
846
1774
  if (existing_warming_cluster != warming_clusters_.end() &&
847
1774
      existing_warming_cluster->second->added_via_api_ &&
848
1774
      (!existing_warming_cluster->second->avoid_cds_removal_ || remove_ignored)) {
849
12
    removed = true;
850
12
    init_helper_.removeCluster(*existing_warming_cluster->second);
851
12
    warming_clusters_.erase(existing_warming_cluster);
852
12
    ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
853
12
  }
854

            
855
1774
  if (removed) {
856
599
    cm_stats_.cluster_removed_.inc();
857
599
    updateClusterCounts();
858
    // Cancel any pending merged updates.
859
599
    updates_map_.erase(cluster_name);
860
599
  }
861

            
862
1774
  return removed;
863
1774
}
864

            
865
absl::StatusOr<ClusterManagerImpl::ClusterDataPtr>
866
ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
867
                                const uint64_t cluster_hash, const std::string& version_info,
868
                                bool added_via_api, const bool required_for_ads,
869
17601
                                ClusterMap& cluster_map, const bool avoid_cds_removal) {
870
17601
  absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
871
17601
      new_cluster_pair_or_error =
872
17601
          factory_.clusterFromProto(cluster, outlier_event_logger_, added_via_api);
873

            
874
17601
  if (!new_cluster_pair_or_error.ok()) {
875
    return absl::InvalidArgumentError(std::string(new_cluster_pair_or_error.status().message()));
876
  }
877
17601
  auto& new_cluster = new_cluster_pair_or_error->first;
878
17601
  auto& lb = new_cluster_pair_or_error->second;
879
17601
  Cluster& cluster_reference = *new_cluster;
880

            
881
17601
  const auto cluster_info = cluster_reference.info();
882

            
883
17601
  if (!added_via_api) {
884
15970
    if (cluster_map.find(cluster_info->name()) != cluster_map.end()) {
885
2
      return absl::InvalidArgumentError(
886
2
          fmt::format("cluster manager: duplicate cluster '{}'", cluster_info->name()));
887
2
    }
888
15970
  }
889

            
890
  // Check if the cluster provided load balancing policy is used. We need handle it as special
891
  // case.
892
17599
  TypedLoadBalancerFactory& typed_lb_factory = cluster_info->loadBalancerFactory();
893
17599
  const bool cluster_provided_lb =
894
17599
      typed_lb_factory.name() == "envoy.load_balancing_policies.cluster_provided";
895

            
896
17599
  if (cluster_provided_lb && lb == nullptr) {
897
1
    return absl::InvalidArgumentError(
898
1
        fmt::format("cluster manager: cluster provided LB specified but cluster "
899
1
                    "'{}' did not provide one. Check cluster documentation.",
900
1
                    cluster_info->name()));
901
1
  }
902
17598
  if (!cluster_provided_lb && lb != nullptr) {
903
1
    return absl::InvalidArgumentError(
904
1
        fmt::format("cluster manager: cluster provided LB not specified but cluster "
905
1
                    "'{}' provided one. Check cluster documentation.",
906
1
                    cluster_info->name()));
907
1
  }
908

            
909
17597
  if (new_cluster->healthChecker() != nullptr) {
910
105
    new_cluster->healthChecker()->addHostCheckCompleteCb(
911
164
        [this](HostSharedPtr host, HealthTransition changed_state, HealthState) {
912
143
          if (changed_state == HealthTransition::Changed &&
913
143
              host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
914
39
            postThreadLocalHealthFailure(host);
915
39
          }
916
143
        });
917
105
  }
918

            
919
17597
  if (new_cluster->outlierDetector() != nullptr) {
920
76
    new_cluster->outlierDetector()->addChangedStateCb([this](HostSharedPtr host) {
921
44
      if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
922
28
        ENVOY_LOG_EVENT(debug, "outlier_detection_ejection",
923
28
                        "host {} in cluster {} was ejected by the outlier detector",
924
28
                        host->address()->asStringView(), host->cluster().name());
925
28
        postThreadLocalHealthFailure(host);
926
28
      }
927
44
    });
928
62
  }
929
17597
  ClusterDataPtr result;
930
17597
  auto cluster_entry_it = cluster_map.find(cluster_info->name());
931
17597
  if (cluster_entry_it != cluster_map.end()) {
932
18
    result = std::exchange(cluster_entry_it->second,
933
18
                           std::make_unique<ClusterData>(
934
18
                               cluster, cluster_hash, version_info, added_via_api, required_for_ads,
935
18
                               std::move(new_cluster), time_source_, avoid_cds_removal));
936
17579
  } else {
937
17579
    bool inserted = false;
938
17579
    std::tie(cluster_entry_it, inserted) = cluster_map.emplace(
939
17579
        cluster_info->name(),
940
17579
        std::make_unique<ClusterData>(cluster, cluster_hash, version_info, added_via_api,
941
17579
                                      required_for_ads, std::move(new_cluster), time_source_,
942
17579
                                      avoid_cds_removal));
943
17579
    ASSERT(inserted);
944
17579
  }
945

            
946
17597
  if (cluster_provided_lb) {
947
171
    cluster_entry_it->second->thread_aware_lb_ = std::move(lb);
948
17514
  } else {
949
17426
    cluster_entry_it->second->thread_aware_lb_ =
950
17426
        typed_lb_factory.create(cluster_info->loadBalancerConfig(), *cluster_info,
951
17426
                                cluster_reference.prioritySet(), runtime_, random_, time_source_);
952
17426
  }
953

            
954
17597
  updateClusterCounts();
955
17597
  return result;
956
17598
}
957

            
958
41457
void ClusterManagerImpl::updateClusterCounts() {
959
  // This if/else block implements a control flow mechanism that can be used by an ADS
960
  // implementation to properly sequence CDS and RDS updates. It is not enforcing on ADS. ADS can
961
  // use it to detect when a previously sent cluster becomes warm before sending routes that depend
962
  // on it. This can improve incidence of HTTP 503 responses from Envoy when a route is used before
963
  // it's supporting cluster is ready.
964
  //
965
  // We achieve that by leaving CDS in the paused state as long as there is at least
966
  // one cluster in the warming state. This prevents CDS ACK from being sent to ADS.
967
  // Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
968
  // signal to ADS to proceed with RDS updates.
969
  // If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
970
41457
  const bool all_clusters_initialized =
971
41457
      init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
972
41457
  if (all_clusters_initialized && xds_manager_.adsMux()) {
973
2431
    const auto type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>();
974
2431
    if (resume_cds_ == nullptr && !warming_clusters_.empty()) {
975
905
      resume_cds_ = xds_manager_.pause(type_url);
976
1534
    } else if (warming_clusters_.empty()) {
977
1430
      resume_cds_.reset();
978
1430
    }
979
2431
  }
980
41457
  cm_stats_.active_clusters_.set(active_clusters_.size());
981
41457
  cm_stats_.warming_clusters_.set(warming_clusters_.size());
982
41457
}
983

            
984
161258
ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view cluster) {
985
161258
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
986

            
987
161258
  auto entry = cluster_manager.thread_local_clusters_.find(cluster);
988
161258
  if (entry != cluster_manager.thread_local_clusters_.end()) {
989
160197
    return entry->second.get();
990
160229
  } else {
991
1061
    return cluster_manager.initializeClusterInlineIfExists(cluster);
992
1061
  }
993
161258
}
994

            
995
void ClusterManagerImpl::maybePreconnect(
996
    ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
997
    const ClusterConnectivityState& state,
998
49348
    std::function<ConnectionPool::Instance*()> pick_preconnect_pool) {
999
49348
  auto peekahead_ratio = cluster_entry.info()->peekaheadRatio();
49348
  if (peekahead_ratio <= 1.0) {
49268
    return;
49268
  }
  // 3 here is arbitrary. Just as in ConnPoolImplBase::tryCreateNewConnections
  // we want to limit the work which can be done on any given preconnect attempt.
180
  for (int i = 0; i < 3; ++i) {
    // See if adding this one new connection
    // would put the cluster over desired capacity. If so, stop preconnecting.
    //
    // We anticipate the incoming stream here, because maybePreconnect is called
    // before a new stream is established.
177
    if (!ConnectionPool::ConnPoolImplBase::shouldConnect(
177
            state.pending_streams_, state.active_streams_,
177
            state.connecting_and_connected_stream_capacity_, peekahead_ratio, true)) {
53
      return;
53
    }
124
    ConnectionPool::Instance* preconnect_pool = pick_preconnect_pool();
124
    if (!preconnect_pool || !preconnect_pool->maybePreconnect(peekahead_ratio)) {
      // Given that the next preconnect pick may be entirely different, we could
      // opt to try again even if the first preconnect fails. Err on the side of
      // caution and wait for the next attempt.
24
      return;
24
    }
124
  }
80
}
absl::optional<HttpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPool(
    HostConstSharedPtr host, ResourcePriority priority, absl::optional<Http::Protocol> protocol,
47224
    LoadBalancerContext* context) {
  // Select a host and create a connection pool for it if it does not already exist.
47224
  auto pool = httpConnPoolImpl(host, priority, protocol, context);
47224
  if (pool == nullptr) {
4
    return absl::nullopt;
4
  }
47220
  HttpPoolData data(
47220
      [this, priority, protocol, context]() -> void {
        // Now that a new stream is being established, attempt to preconnect.
47147
        maybePreconnect(
47152
            *this, parent_.cluster_manager_state_, [this, &priority, &protocol, &context]() {
87
              HostConstSharedPtr peek_host = peekAnotherHost(context);
87
              return peek_host ? httpConnPoolImpl(peek_host, priority, protocol, context) : nullptr;
87
            });
47147
      },
47220
      pool);
47220
  return data;
47224
}
absl::optional<TcpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
2278
    HostConstSharedPtr host, ResourcePriority priority, LoadBalancerContext* context) {
2278
  if (!host) {
1
    return absl::nullopt;
1
  }
  // Select a host and create a connection pool for it if it does not already exist.
2277
  auto pool = tcpConnPoolImpl(host, priority, context);
2277
  if (pool == nullptr) {
    return absl::nullopt;
  }
2277
  TcpPoolData data(
2277
      [this, priority, context]() -> void {
2202
        maybePreconnect(*this, parent_.cluster_manager_state_, [this, &priority, &context]() {
37
          HostConstSharedPtr peek_host = peekAnotherHost(context);
37
          return peek_host ? tcpConnPoolImpl(peek_host, priority, context) : nullptr;
37
        });
2201
      },
2277
      pool);
2277
  return data;
2277
}
absl::optional<TcpPoolData>
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPool(
386
    ResourcePriority priority, LoadBalancerContext* context) {
386
  HostConstSharedPtr host = LoadBalancer::onlyAllowSynchronousHostSelection(chooseHost(context));
386
  if (!host) {
3
    return absl::nullopt;
3
  }
383
  return tcpConnPool(host, priority, context);
386
}
void ClusterManagerImpl::drainConnections(const std::string& cluster,
35
                                          DrainConnectionsHostPredicate predicate) {
35
  ENVOY_LOG_EVENT(debug, "drain_connections_call", "drainConnections called for cluster {}",
35
                  cluster);
68
  tls_.runOnAllThreads([cluster, predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
68
    auto cluster_entry = cluster_manager->thread_local_clusters_.find(cluster);
68
    if (cluster_entry != cluster_manager->thread_local_clusters_.end()) {
68
      cluster_entry->second->drainConnPools(
68
          predicate, ConnectionPool::DrainBehavior::DrainExistingConnections);
68
    }
68
  });
35
}
void ClusterManagerImpl::drainConnections(DrainConnectionsHostPredicate predicate,
11
                                          ConnectionPool::DrainBehavior drain_behavior) {
11
  ENVOY_LOG_EVENT(debug, "drain_connections_call_for_all_clusters",
11
                  "drainConnections called for all clusters");
11
  tls_.runOnAllThreads(
22
      [predicate, drain_behavior](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
44
        for (const auto& cluster_entry : cluster_manager->thread_local_clusters_) {
44
          cluster_entry.second->drainConnPools(predicate, drain_behavior);
44
        }
22
      });
11
}
388
absl::Status ClusterManagerImpl::checkActiveStaticCluster(const std::string& cluster) {
388
  const auto& it = active_clusters_.find(cluster);
388
  if (it == active_clusters_.end()) {
2
    return absl::InvalidArgumentError(fmt::format("Unknown gRPC client cluster '{}'", cluster));
2
  }
386
  if (it->second->added_via_api_) {
1
    return absl::InvalidArgumentError(
1
        fmt::format("gRPC client cluster '{}' is not static", cluster));
1
  }
385
  return absl::OkStatus();
386
}
void ClusterManagerImpl::postThreadLocalRemoveHosts(const Cluster& cluster,
76
                                                    const HostVector& hosts_removed) {
  // Drain the connection pools for the given hosts. For deferred clusters have
  // been created.
76
  tls_.runOnAllThreads([name = cluster.info()->name(),
132
                        hosts_removed](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
132
    cluster_manager->removeHosts(name, hosts_removed);
132
  });
76
}
bool ClusterManagerImpl::deferralIsSupportedForCluster(
17783
    const ClusterInfoConstSharedPtr& info) const {
17783
  if (!deferred_cluster_creation_) {
17249
    return false;
17249
  }
  // Certain cluster types are unsupported for deferred initialization.
  // We need to check both the `clusterType()` (preferred) falling back to
  // the `type()` due to how custom clusters were added leveraging an any
  // config.
534
  if (auto custom_cluster_type = info->clusterType(); custom_cluster_type.has_value()) {
    // TODO(kbaichoo): make it configurable what custom types are supported?
18
    static const std::array<std::string, 4> supported_well_known_cluster_types = {
18
        "envoy.clusters.aggregate", "envoy.cluster.eds", "envoy.clusters.redis",
18
        "envoy.cluster.static"};
18
    if (std::find(supported_well_known_cluster_types.begin(),
18
                  supported_well_known_cluster_types.end(),
18
                  custom_cluster_type->name()) == supported_well_known_cluster_types.end()) {
      return false;
    }
516
  } else {
    // Check DiscoveryType instead.
516
    static constexpr std::array<envoy::config::cluster::v3::Cluster::DiscoveryType, 2>
516
        supported_cluster_types = {envoy::config::cluster::v3::Cluster::EDS,
516
                                   envoy::config::cluster::v3::Cluster::STATIC};
516
    if (std::find(supported_cluster_types.begin(), supported_cluster_types.end(), info->type()) ==
516
        supported_cluster_types.end()) {
13
      return false;
13
    }
516
  }
521
  return true;
534
}
void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
17783
                                                      ThreadLocalClusterUpdateParams&& params) {
17783
  bool add_or_update_cluster = false;
17783
  if (!cm_cluster.addedOrUpdated()) {
17345
    add_or_update_cluster = true;
17345
    cm_cluster.setAddedOrUpdated();
17345
  }
17783
  LoadBalancerFactorySharedPtr load_balancer_factory;
17783
  if (add_or_update_cluster) {
17345
    load_balancer_factory = cm_cluster.loadBalancerFactory();
17345
  }
17814
  for (auto& per_priority : params.per_priority_update_params_) {
17368
    const auto& host_set =
17368
        cm_cluster.cluster().prioritySet().hostSetsPerPriority()[per_priority.priority_];
17368
    per_priority.update_hosts_params_ = HostSetImpl::updateHostsParams(*host_set);
17368
    per_priority.locality_weights_ = host_set->localityWeights();
17368
    per_priority.weighted_priority_health_ = host_set->weightedPriorityHealth();
17368
    per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();
17368
  }
17783
  HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap();
17783
  pending_cluster_creations_.erase(cm_cluster.cluster().info()->name());
17783
  const UnitFloat drop_overload = cm_cluster.cluster().dropOverload();
17783
  const std::string drop_category = cm_cluster.cluster().dropCategory();
  // Populate the cluster initialization object based on this update.
17783
  ClusterInitializationObjectConstSharedPtr cluster_initialization_object =
17783
      addOrUpdateClusterInitializationObjectIfSupported(params, cm_cluster.cluster().info(),
17783
                                                        load_balancer_factory, host_map,
17783
                                                        drop_overload, drop_category);
17783
  tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),
17783
                        add_or_update_cluster, load_balancer_factory, map = std::move(host_map),
17783
                        cluster_initialization_object = std::move(cluster_initialization_object),
17783
                        drop_overload, drop_category = std::move(drop_category)](
34998
                           OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
34998
    ASSERT(cluster_manager.has_value(),
34998
           "Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");
    // Cluster Manager here provided by the particular thread, it will provide
    // this allowing to make the relevant change.
34998
    if (const bool defer_unused_clusters =
34998
            cluster_initialization_object != nullptr &&
34998
            !cluster_manager->thread_local_clusters_.contains(info->name()) &&
34998
            !Envoy::Thread::MainThread::isMainThread();
34998
        defer_unused_clusters) {
      // Save the cluster initialization object.
491
      ENVOY_LOG(debug, "Deferring add or update for TLS cluster {}", info->name());
491
      cluster_manager->thread_local_deferred_clusters_[info->name()] =
491
          cluster_initialization_object;
      // Invoke similar logic of onClusterAddOrUpdate.
491
      ThreadLocalClusterCommand command = [&cluster_manager,
491
                                           cluster_name = info->name()]() -> ThreadLocalCluster& {
        // If we have multiple callbacks only the first one needs to use the
        // command to initialize the cluster.
8
        auto existing_cluster_entry = cluster_manager->thread_local_clusters_.find(cluster_name);
8
        if (existing_cluster_entry != cluster_manager->thread_local_clusters_.end()) {
          return *existing_cluster_entry->second;
        }
8
        auto* cluster_entry = cluster_manager->initializeClusterInlineIfExists(cluster_name);
8
        ASSERT(cluster_entry != nullptr, "Deferred clusters initiailization should not fail.");
8
        return *cluster_entry;
8
      };
491
      for (auto cb_it = cluster_manager->update_callbacks_.begin();
999
           cb_it != cluster_manager->update_callbacks_.end();) {
        // The current callback may remove itself from the list, so a handle for
        // the next item is fetched before calling the callback.
508
        auto curr_cb_it = cb_it;
508
        ++cb_it;
508
        (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
508
      }
34546
    } else {
      // Broadcast
34507
      ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
34507
      if (add_or_update_cluster) {
33748
        if (cluster_manager->thread_local_clusters_.contains(info->name())) {
114
          ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
33636
        } else {
33634
          ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
33634
        }
33748
        new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
33748
                                                                      load_balancer_factory);
33748
        cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
33748
        cluster_manager->local_stats_.clusters_inflated_.set(
33748
            cluster_manager->thread_local_clusters_.size());
33748
      }
34507
      if (cluster_manager->thread_local_clusters_[info->name()]) {
34507
        cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);
34507
        cluster_manager->thread_local_clusters_[info->name()]->setDropCategory(drop_category);
34507
      }
34561
      for (const auto& per_priority : params.per_priority_update_params_) {
33810
        cluster_manager->updateClusterMembership(
33810
            info->name(), per_priority.priority_, per_priority.update_hosts_params_,
33810
            per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,
33810
            per_priority.weighted_priority_health_, per_priority.overprovisioning_factor_, map);
33810
      }
34507
      if (new_cluster != nullptr) {
33748
        ThreadLocalClusterCommand command = [&new_cluster]() -> ThreadLocalCluster& {
50
          return *new_cluster;
50
        };
33748
        for (auto cb_it = cluster_manager->update_callbacks_.begin();
67669
             cb_it != cluster_manager->update_callbacks_.end();) {
          // The current callback may remove itself from the list, so a handle for
          // the next item is fetched before calling the callback.
33921
          auto curr_cb_it = cb_it;
33921
          ++cb_it;
33921
          (*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);
33921
        }
33748
      }
34507
    }
34998
  });
  // By this time, the main thread has received the cluster initialization update, so we can start
  // the ADS mux if the ADS mux is dependent on this cluster's initialization.
17783
  if (cm_cluster.requiredForAds() && !ads_mux_initialized_) {
206
    xds_manager_.adsMux()->start();
206
    ads_mux_initialized_ = true;
206
  }
17783
}
ClusterManagerImpl::ClusterInitializationObjectConstSharedPtr
ClusterManagerImpl::addOrUpdateClusterInitializationObjectIfSupported(
    const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
    LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
17783
    UnitFloat drop_overload, absl::string_view drop_category) {
17783
  if (!deferralIsSupportedForCluster(cluster_info)) {
17262
    return nullptr;
17262
  }
521
  const std::string& cluster_name = cluster_info->name();
521
  auto entry = cluster_initialization_map_.find(cluster_name);
  // TODO(kbaichoo): if EDS can be configured via cluster_type() then modify the
  // merging logic below.
  //
  // This method may be called multiple times to create multiple ClusterInitializationObject
  // instances for the same cluster. And before the thread local clusters are actually initialized,
  // the new instances will override the old instances in the work threads. But part of data is be
  // created only once, such as load balancer factory. So we should always to merge the new instance
  // with the old one to keep the latest instance have all necessary data.
  //
  // More specifically, this will happen in the following scenarios for now:
  // 1. EDS clusters: the ClusterLoadAssignment of EDS cluster may be updated multiples before
  //   the thread local cluster is initialized.
  // 2. Clusters in the unit tests: the cluster in the unit test may be updated multiples before
  //   the thread local cluster is initialized by calling 'updateHosts' manually.
521
  const bool should_merge_with_prior_cluster =
521
      entry != cluster_initialization_map_.end() && entry->second->cluster_info_ == cluster_info;
521
  if (should_merge_with_prior_cluster) {
    // We need to copy from an existing Cluster Initialization Object. In
    // particular, only update the params with changed priority.
64
    auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
64
        entry->second->per_priority_state_, params, std::move(cluster_info),
64
        load_balancer_factory == nullptr ? entry->second->load_balancer_factory_
64
                                         : load_balancer_factory,
64
        map, drop_overload, drop_category);
64
    cluster_initialization_map_[cluster_name] = new_initialization_object;
64
    return new_initialization_object;
470
  } else {
    // We need to create a fresh Cluster Initialization Object.
457
    auto new_initialization_object = std::make_shared<ClusterInitializationObject>(
457
        params, std::move(cluster_info), load_balancer_factory, map, drop_overload, drop_category);
457
    cluster_initialization_map_[cluster_name] = new_initialization_object;
457
    return new_initialization_object;
457
  }
521
}
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::initializeClusterInlineIfExists(
1069
    absl::string_view cluster) {
1069
  auto entry = thread_local_deferred_clusters_.find(cluster);
1069
  if (entry == thread_local_deferred_clusters_.end()) {
    // Unknown cluster.
941
    return nullptr;
941
  }
  // Create the cluster inline.
128
  const ClusterInitializationObjectConstSharedPtr& initialization_object = entry->second;
128
  ENVOY_LOG(debug, "initializing TLS cluster {} inline", cluster);
128
  auto cluster_entry = std::make_unique<ClusterEntry>(
128
      *this, initialization_object->cluster_info_, initialization_object->load_balancer_factory_);
128
  ClusterEntry* cluster_entry_ptr = cluster_entry.get();
128
  thread_local_clusters_[cluster] = std::move(cluster_entry);
128
  local_stats_.clusters_inflated_.set(thread_local_clusters_.size());
132
  for (const auto& [_, per_priority] : initialization_object->per_priority_state_) {
118
    updateClusterMembership(initialization_object->cluster_info_->name(), per_priority.priority_,
118
                            per_priority.update_hosts_params_, per_priority.locality_weights_,
118
                            per_priority.hosts_added_, per_priority.hosts_removed_,
118
                            per_priority.weighted_priority_health_,
118
                            per_priority.overprovisioning_factor_,
118
                            initialization_object->cross_priority_host_map_);
118
  }
128
  thread_local_clusters_[cluster]->setDropOverload(initialization_object->drop_overload_);
128
  thread_local_clusters_[cluster]->setDropCategory(initialization_object->drop_category_);
  // Remove the CIO as we've initialized the cluster.
128
  thread_local_deferred_clusters_.erase(entry);
128
  return cluster_entry_ptr;
1069
}
ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
    const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
    LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
    UnitFloat drop_overload, absl::string_view drop_category)
457
    : cluster_info_(std::move(cluster_info)), load_balancer_factory_(load_balancer_factory),
457
      cross_priority_host_map_(map), drop_overload_(drop_overload), drop_category_(drop_category) {
  // Copy the update since the map is empty.
462
  for (const auto& update : params.per_priority_update_params_) {
415
    per_priority_state_.emplace(update.priority_, update);
415
  }
457
}
ClusterManagerImpl::ClusterInitializationObject::ClusterInitializationObject(
    const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>& per_priority_state,
    const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
    LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
    UnitFloat drop_overload, absl::string_view drop_category)
64
    : per_priority_state_(per_priority_state), cluster_info_(std::move(cluster_info)),
64
      load_balancer_factory_(load_balancer_factory), cross_priority_host_map_(map),
64
      drop_overload_(drop_overload), drop_category_(drop_category) {
  // Because EDS Clusters receive the entire ClusterLoadAssignment but only
  // provides the delta we must process the hosts_added and hosts_removed and
  // not simply overwrite with hosts added.
64
  for (const auto& update : update_params.per_priority_update_params_) {
64
    auto it = per_priority_state_.find(update.priority_);
64
    if (it != per_priority_state_.end()) {
36
      auto& priority_state = it->second;
      // Merge the two per_priorities.
36
      priority_state.update_hosts_params_ = update.update_hosts_params_;
36
      priority_state.locality_weights_ = update.locality_weights_;
36
      priority_state.weighted_priority_health_ = update.weighted_priority_health_;
36
      priority_state.overprovisioning_factor_ = update.overprovisioning_factor_;
      // Merge the hosts vectors to just have hosts added.
      // Assumes that the old host_added_ is exclusive to new hosts_added_ and
      // new hosts_removed_ only refers to the old hosts_added_.
36
      ASSERT(priority_state.hosts_removed_.empty(),
36
             "Cluster Initialization Object should apply hosts "
36
             "removed updates to hosts_added vector!");
      // TODO(kbaichoo): replace with a more efficient algorithm. For example
      // if the EDS cluster exposed the LoadAssignment we could just merge by
      // overwriting hosts_added.
36
      if (!update.hosts_removed_.empty()) {
        // Remove all hosts to be removed from the old host_added.
16
        auto& host_added = priority_state.hosts_added_;
16
        auto removed_section = std::remove_if(
16
            host_added.begin(), host_added.end(),
34
            [hosts_removed = std::cref(update.hosts_removed_)](const HostSharedPtr& ptr) {
34
              return std::find(hosts_removed.get().begin(), hosts_removed.get().end(), ptr) !=
34
                     hosts_removed.get().end();
34
            });
16
        priority_state.hosts_added_.erase(removed_section, priority_state.hosts_added_.end());
16
      }
      // Add updated host_added.
36
      priority_state.hosts_added_.reserve(priority_state.hosts_added_.size() +
36
                                          update.hosts_added_.size());
36
      std::copy(update.hosts_added_.begin(), update.hosts_added_.end(),
36
                std::back_inserter(priority_state.hosts_added_));
43
    } else {
      // Just copy the new priority.
28
      per_priority_state_.emplace(update.priority_, update);
28
    }
64
  }
64
}
67
void ClusterManagerImpl::postThreadLocalHealthFailure(const HostSharedPtr& host) {
118
  tls_.runOnAllThreads([host](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
118
    cluster_manager->onHostHealthFailure(host);
118
  });
67
}
Host::CreateConnectionData ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConn(
62
    LoadBalancerContext* context) {
62
  HostConstSharedPtr logical_host =
62
      LoadBalancer::onlyAllowSynchronousHostSelection(chooseHost(context));
62
  if (logical_host) {
59
    auto conn_info = logical_host->createConnection(
59
        parent_.thread_local_dispatcher_, nullptr,
59
        context == nullptr ? nullptr : context->upstreamTransportSocketOptions());
59
    if ((cluster_info_->features() &
59
         ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) &&
59
        conn_info.connection_ != nullptr) {
8
      auto conn_map_iter = parent_.host_tcp_conn_map_.find(logical_host);
8
      if (conn_map_iter == parent_.host_tcp_conn_map_.end()) {
6
        conn_map_iter =
6
            parent_.host_tcp_conn_map_.try_emplace(logical_host, logical_host->acquireHandle())
6
                .first;
6
      }
8
      auto& conn_map = conn_map_iter->second;
8
      conn_map.connections_.emplace(
8
          conn_info.connection_.get(),
8
          std::make_unique<ThreadLocalClusterManagerImpl::TcpConnContainer>(
8
              parent_, logical_host, *conn_info.connection_));
8
    }
59
    return conn_info;
59
  }
3
  return {nullptr, nullptr};
62
}
Http::AsyncClient&
3043
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpAsyncClient() {
3043
  if (lazy_http_async_client_ == nullptr) {
2023
    lazy_http_async_client_ = std::make_unique<Http::AsyncClientImpl>(
2023
        cluster_info_, parent_.parent_.stats_, parent_.thread_local_dispatcher_, parent_.parent_,
2023
        parent_.parent_.context_,
2023
        Router::ShadowWriterPtr{new Router::ShadowWriterImpl(parent_.parent_)},
2023
        parent_.parent_.http_context_, parent_.parent_.router_context_);
2023
  }
3043
  return *lazy_http_async_client_;
3043
}
Tcp::AsyncTcpClientPtr
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpAsyncClient(
22
    LoadBalancerContext* context, Tcp::AsyncTcpClientOptionsConstSharedPtr options) {
22
  return std::make_unique<Tcp::AsyncTcpClientImpl>(parent_.thread_local_dispatcher_, *this, context,
22
                                                   options->enable_half_close);
22
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::updateHosts(
    const std::string& name, uint32_t priority,
    PrioritySet::UpdateHostsParams&& update_hosts_params,
    LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
    const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
    absl::optional<uint32_t> overprovisioning_factor,
33929
    HostMapConstSharedPtr cross_priority_host_map) {
33929
  ENVOY_LOG(debug, "membership update for TLS cluster {} added {} removed {}", name,
33929
            hosts_added.size(), hosts_removed.size());
33929
  priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights),
33929
                            hosts_added, hosts_removed, weighted_priority_health,
33929
                            overprovisioning_factor, std::move(cross_priority_host_map));
  // If an LB is thread aware, create a new worker local LB on membership changes.
33929
  if (lb_factory_ != nullptr && lb_factory_->recreateOnHostChange()) {
936
    ENVOY_LOG(debug, "re-creating local LB for TLS cluster {}", name);
936
    lb_ = lb_factory_->create({priority_set_, parent_.local_priority_set_});
936
  }
33929
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
34220
    const HostVector& hosts_removed) {
35055
  for (const auto& host : hosts_removed) {
34679
    parent_.drainOrCloseConnPools(host, ConnectionPool::DrainBehavior::DrainAndDelete);
34679
  }
34220
}
33892
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools() {
34096
  for (auto& host_set : priority_set_.hostSetsPerPriority()) {
34096
    drainConnPools(host_set->hosts());
34096
  }
33892
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::drainConnPools(
112
    DrainConnectionsHostPredicate predicate, ConnectionPool::DrainBehavior behavior) {
112
  for (auto& host_set : priority_set_.hostSetsPerPriority()) {
114
    for (const auto& host : host_set->hosts()) {
114
      if (predicate != nullptr && !predicate(*host)) {
24
        continue;
24
      }
90
      parent_.drainOrCloseConnPools(host, behavior);
90
    }
112
  }
112
}
ClusterUpdateCallbacksHandlePtr
407
ClusterManagerImpl::addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) {
407
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
407
  return cluster_manager.addClusterUpdateCallbacks(cb);
407
}
absl::StatusOr<OdCdsApiHandlePtr>
ClusterManagerImpl::allocateOdCdsApi(OdCdsCreationFunction creation_function,
                                     const envoy::config::core::v3::ConfigSource& odcds_config,
                                     OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
174
                                     ProtobufMessage::ValidationVisitor& validation_visitor) {
  // Generate a unique key based on config and locator. This enables reuse of subscriptions
  // with the same configuration. Note that timeout is intentionally not part of the hash,
  // so different timeout values will share the same subscription.
  // Subscriptions persist for the lifetime of ClusterManagerImpl and are cleaned up when
  // it is destroyed.
174
  uint64_t config_hash = MessageUtil::hash(odcds_config);
174
  if (odcds_resources_locator.has_value()) {
1
    config_hash = absl::HashOf(config_hash, MessageUtil::hash(*odcds_resources_locator));
1
  }
174
  auto it = odcds_subscriptions_.find(config_hash);
174
  if (it != odcds_subscriptions_.end()) {
10
    return OdCdsApiHandleImpl::create(*this, config_hash);
10
  }
164
  auto odcds_or_error =
164
      creation_function(odcds_config, odcds_resources_locator, xds_manager_, *this, *this,
164
                        *stats_.rootScope(), validation_visitor, context_);
164
  RETURN_IF_NOT_OK_REF(odcds_or_error.status());
164
  odcds_subscriptions_.emplace(config_hash, std::move(*odcds_or_error));
164
  return OdCdsApiHandleImpl::create(*this, config_hash);
164
}
ClusterDiscoveryCallbackHandlePtr
ClusterManagerImpl::requestOnDemandClusterDiscovery(uint64_t config_source_key, std::string name,
                                                    ClusterDiscoveryCallbackPtr callback,
268
                                                    std::chrono::milliseconds timeout) {
268
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
268
  auto [handle, discovery_in_progress, invoker] =
268
      cluster_manager.cdm_.addCallback(name, std::move(callback));
  // This check will catch requests for discoveries from this thread only. If other thread
  // requested the same discovery, we will detect it in the main thread later.
268
  if (discovery_in_progress) {
34
    ENVOY_LOG(debug,
34
              "cm odcds: on-demand discovery for cluster {} is already in progress, something else "
34
              "in thread {} has already requested it",
34
              name, cluster_manager.thread_local_dispatcher_.name());
    // This worker thread has already requested a discovery of a cluster with this name, so
    // nothing more left to do here.
    //
    // We can't "just" return handle here, because handle is a part of the structured binding done
    // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like
    // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here
    // - it needs to be moved.
34
    return std::move(handle);
34
  }
234
  ENVOY_LOG(
234
      debug,
234
      "cm odcds: forwarding the on-demand discovery request for cluster {} to the main thread",
234
      name);
  // This seems to be the first request for discovery of this cluster in this worker thread. Rest
  // of the process may only happen in the main thread.
234
  Event::Dispatcher& worker_dispatcher = cluster_manager.thread_local_dispatcher_;
234
  dispatcher_.post([this, config_source_key, timeout, name = std::move(name),
234
                    invoker = std::move(invoker), &worker_dispatcher] {
234
    OdCdsApiSharedPtr odcds = odcds_subscriptions_.at(config_source_key);
    // Check for the cluster here too. It might have been added between the time when this closure
    // was posted and when it is being executed.
234
    if (getThreadLocalCluster(name) != nullptr) {
1
      ENVOY_LOG(
1
          debug,
1
          "cm odcds: the requested cluster {} is already known, posting the callback back to {}",
1
          name, worker_dispatcher.name());
1
      worker_dispatcher.post([invoker = std::move(invoker)] {
1
        invoker.invokeCallback(ClusterDiscoveryStatus::Available);
1
      });
1
      return;
1
    }
233
    if (pending_cluster_creations_.contains(name)) {
1
      ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name);
      // We already began the discovery process for this cluster, nothing to do. If we got here,
      // it means that it was other worker thread that requested the discovery.
1
      return;
1
    }
    // Start the discovery. If the cluster gets discovered, cluster manager will warm it up and
    // invoke the cluster lifecycle callbacks, that will in turn invoke our callback.
232
    odcds->updateOnDemand(name);
    // Setup the discovery timeout timer to avoid keeping callbacks indefinitely.
232
    auto timer = dispatcher_.createTimer([this, name] { notifyExpiredDiscovery(name); });
232
    timer->enableTimer(timeout);
    // Keep odcds handle alive for the duration of the discovery process.
232
    pending_cluster_creations_.insert(
232
        {std::move(name), ClusterCreation{std::move(odcds), std::move(timer)}});
232
  });
  // We can't "just" return handle here, because handle is a part of the structured binding done
  // above. So it's not really a ClusterDiscoveryCallbackHandlePtr, but more like
  // ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here -
  // it needs to be moved.
234
  return std::move(handle);
268
}
18
void ClusterManagerImpl::notifyMissingCluster(absl::string_view name) {
18
  ENVOY_LOG(info, "cm odcds: cluster {} not found during on-demand discovery", name);
18
  notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Missing);
18
}
19
void ClusterManagerImpl::notifyExpiredDiscovery(absl::string_view name) {
19
  ENVOY_LOG(info, "cm odcds: on-demand discovery for cluster {} timed out", name);
19
  notifyClusterDiscoveryStatus(name, ClusterDiscoveryStatus::Timeout);
19
}
void ClusterManagerImpl::notifyClusterDiscoveryStatus(absl::string_view name,
37
                                                      ClusterDiscoveryStatus status) {
37
  auto map_node_handle = pending_cluster_creations_.extract(name);
37
  if (map_node_handle.empty()) {
    // Not a cluster we are interested in. This may happen when ODCDS
    // receives some cluster name in removed resources field and
    // notifies the cluster manager about it.
1
    return;
1
  }
  // Let all the worker threads know that the discovery timed out.
36
  tls_.runOnAllThreads(
70
      [name = std::string(name), status](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
70
        ENVOY_LOG(
70
            trace,
70
            "cm cdm: starting processing cluster name {} (status {}) from the expired timer in {}",
70
            name, enumToInt(status), cluster_manager->thread_local_dispatcher_.name());
70
        cluster_manager->cdm_.processClusterName(name, status);
70
      });
36
}
527
Config::EdsResourcesCacheOptRef ClusterManagerImpl::edsResourcesCache() {
  // EDS caching is only supported for ADS.
527
  if (xds_manager_.adsMux()) {
492
    return xds_manager_.adsMux()->edsResourcesCache();
492
  }
35
  return {};
527
}
void ClusterManagerImpl::createNetworkObserverRegistries(
1
    Quic::EnvoyQuicNetworkObserverRegistryFactory& factory) {
1
#ifdef ENVOY_ENABLE_QUIC
1
  tls_.runOnAllThreads([&factory](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
1
    ENVOY_LOG(trace, "cm: create network observer registry in {}",
1
              cluster_manager->thread_local_dispatcher_.name());
1
    cluster_manager->createThreadLocalNetworkObserverRegistry(factory);
1
  });
#else
  (void)factory;
#endif
1
}
ClusterDiscoveryManager
1
ClusterManagerImpl::createAndSwapClusterDiscoveryManager(std::string thread_name) {
1
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;
1
  ClusterDiscoveryManager cdm(std::move(thread_name), cluster_manager);
1
  cluster_manager.cdm_.swap(cdm);
1
  return cdm;
1
}
ProtobufTypes::MessagePtr
91
ClusterManagerImpl::dumpClusterConfigs(const Matchers::StringMatcher& name_matcher) {
91
  auto config_dump = std::make_unique<envoy::admin::v3::ClustersConfigDump>();
91
  config_dump->set_version_info(cds_api_ != nullptr ? cds_api_->versionInfo() : "");
249
  for (const auto& active_cluster_pair : active_clusters_) {
247
    const auto& cluster = *active_cluster_pair.second;
247
    if (!name_matcher.match(cluster.cluster_config_.name())) {
34
      continue;
34
    }
213
    if (!cluster.added_via_api_) {
168
      auto& static_cluster = *config_dump->mutable_static_clusters()->Add();
168
      static_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
168
      TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
168
                                            *(static_cluster.mutable_last_updated()));
168
    } else {
45
      auto& dynamic_cluster = *config_dump->mutable_dynamic_active_clusters()->Add();
45
      dynamic_cluster.set_version_info(cluster.version_info_);
45
      dynamic_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
45
      TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
45
                                            *(dynamic_cluster.mutable_last_updated()));
45
    }
213
  }
93
  for (const auto& warming_cluster_pair : warming_clusters_) {
12
    const auto& cluster = *warming_cluster_pair.second;
12
    if (!name_matcher.match(cluster.cluster_config_.name())) {
      continue;
    }
12
    auto& dynamic_cluster = *config_dump->mutable_dynamic_warming_clusters()->Add();
12
    dynamic_cluster.set_version_info(cluster.version_info_);
12
    dynamic_cluster.mutable_cluster()->PackFrom(cluster.cluster_config_);
12
    TimestampUtil::systemClockToTimestamp(cluster.last_updated_,
12
                                          *(dynamic_cluster.mutable_last_updated()));
12
  }
91
  return config_dump;
91
}
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
    ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
    const absl::optional<LocalClusterParams>& local_cluster_params)
21644
    : parent_(parent), thread_local_dispatcher_(dispatcher), cdm_(dispatcher.name(), *this),
21644
      local_stats_(generateStats(*parent.stats_.rootScope(), dispatcher.name())) {
  // If local cluster is defined then we need to initialize it first.
21644
  if (local_cluster_params.has_value()) {
16
    const auto& local_cluster_name = local_cluster_params->info_->name();
16
    ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name);
16
    thread_local_clusters_[local_cluster_name] = std::make_unique<ClusterEntry>(
16
        *this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_);
16
    local_priority_set_ = &thread_local_clusters_[local_cluster_name]->prioritySet();
16
    local_stats_.clusters_inflated_.set(thread_local_clusters_.size());
16
  }
21644
}
21644
ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImpl() {
  // Clear out connection pools as well as the thread local cluster map so that we release all
  // cluster pointers. Currently we have to free all non-local clusters before we free
  // the local cluster. This is because non-local clusters with a zone aware load balancer have a
  // member update callback registered with the local cluster.
21644
  ENVOY_LOG(debug, "shutting down thread local cluster manager");
21644
  destroying_ = true;
21644
  host_http_conn_pool_map_.clear();
21644
  host_tcp_conn_pool_map_.clear();
21644
  ASSERT(host_tcp_conn_map_.empty());
33044
  for (auto& cluster : thread_local_clusters_) {
32832
    if (&cluster.second->prioritySet() != local_priority_set_) {
32816
      cluster.second.reset();
32816
    }
32832
  }
21644
  thread_local_clusters_.clear();
  // Ensure that all pools are completely destructed.
21644
  thread_local_dispatcher_.clearDeferredDeleteList();
21644
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeTcpConn(
8
    const HostConstSharedPtr& host, Network::ClientConnection& connection) {
8
  auto host_tcp_conn_map_it = host_tcp_conn_map_.find(host);
8
  ASSERT(host_tcp_conn_map_it != host_tcp_conn_map_.end());
8
  auto& connections_map = host_tcp_conn_map_it->second.connections_;
8
  auto it = connections_map.find(&connection);
8
  ASSERT(it != connections_map.end());
8
  connection.dispatcher().deferredDelete(std::move(it->second));
8
  connections_map.erase(it);
8
  if (connections_map.empty()) {
6
    host_tcp_conn_map_.erase(host_tcp_conn_map_it);
6
  }
8
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::removeHosts(
132
    const std::string& name, const HostVector& hosts_removed) {
132
  auto entry = thread_local_clusters_.find(name);
  // The if should only be possible if deferred cluster creation is enabled.
132
  if (entry == thread_local_clusters_.end()) {
8
    ASSERT(
8
        parent_.deferred_cluster_creation_,
8
        fmt::format("Cannot find ThreadLocalCluster {}, but deferred cluster creation is disabled.",
8
                    name));
8
    ASSERT(thread_local_deferred_clusters_.find(name) != thread_local_deferred_clusters_.end(),
8
           "Cluster with removed host is neither deferred or inflated!");
8
    return;
8
  }
124
  const auto& cluster_entry = entry->second;
124
  ENVOY_LOG(debug, "removing hosts for TLS cluster {} removed {}", name, hosts_removed.size());
  // We need to go through and purge any connection pools for hosts that got deleted.
  // Even if two hosts actually point to the same address this will be safe, since if a
  // host is readded it will be a different physical HostSharedPtr.
124
  cluster_entry->drainConnPools(hosts_removed);
124
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::updateClusterMembership(
    const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params,
    LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
    const HostVector& hosts_removed, bool weighted_priority_health,
33929
    uint64_t overprovisioning_factor, HostMapConstSharedPtr cross_priority_host_map) {
33929
  ASSERT(thread_local_clusters_.find(name) != thread_local_clusters_.end());
33929
  const auto& cluster_entry = thread_local_clusters_[name];
33929
  cluster_entry->updateHosts(name, priority, std::move(update_hosts_params),
33929
                             std::move(locality_weights), hosts_added, hosts_removed,
33929
                             weighted_priority_health, overprovisioning_factor,
33929
                             std::move(cross_priority_host_map));
33929
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::onHostHealthFailure(
118
    const HostSharedPtr& host) {
118
  if (host->cluster().features() &
118
      ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE) {
9
    drainOrCloseConnPools(host, absl::nullopt);
    // Close non connection pool TCP connections obtained from tcpConn()
    //
    // TODO(jono): The only remaining user of the non-pooled connections seems to be the statsd
    // TCP client. Perhaps it could be rewritten to use a connection pool, and this code deleted.
    //
    // Each connection will remove itself from the TcpConnectionsMap when it closes, via its
    // Network::ConnectionCallbacks. The last removed tcp conn will remove the TcpConnectionsMap
    // from host_tcp_conn_map_, so do not cache it between iterations.
    //
    // TODO(ggreenway) PERF: If there are a large number of connections, this could take a long
    // time and halt other useful work. Consider breaking up this work. Note that this behavior is
    // noted in the configuration documentation in cluster setting
    // "close_connections_on_host_health_failure". Update the docs if this if this changes.
15
    while (true) {
15
      const auto& it = host_tcp_conn_map_.find(host);
15
      if (it == host_tcp_conn_map_.end()) {
9
        break;
9
      }
6
      TcpConnectionsMap& container = it->second;
6
      container.connections_.begin()->first->close(
6
          Network::ConnectionCloseType::NoFlush,
6
          StreamInfo::LocalCloseReasons::get().NonPooledTcpConnectionHostHealthFailure);
6
    }
109
  } else {
109
    drainOrCloseConnPools(host, ConnectionPool::DrainBehavior::DrainExistingConnections);
109
  }
118
}
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ConnPoolsContainer*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::getHttpConnPoolsContainer(
93661
    const HostConstSharedPtr& host, bool allocate) {
93661
  auto container_iter = host_http_conn_pool_map_.find(host);
93661
  if (container_iter == host_http_conn_pool_map_.end()) {
46436
    if (!allocate) {
34701
      return nullptr;
34701
    }
11735
    container_iter =
11735
        host_http_conn_pool_map_.try_emplace(host, thread_local_dispatcher_, host).first;
11735
  }
58960
  return &container_iter->second;
93661
}
ClusterUpdateCallbacksHandlePtr
ClusterManagerImpl::ThreadLocalClusterManagerImpl::addClusterUpdateCallbacks(
22052
    ClusterUpdateCallbacks& cb) {
22052
  return std::make_unique<ClusterUpdateCallbacksHandleImpl>(cb, update_callbacks_);
22052
}
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::ClusterEntry(
    ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
    const LoadBalancerFactorySharedPtr& lb_factory)
33892
    : parent_(parent), cluster_info_(cluster), lb_factory_(lb_factory),
33892
      override_host_statuses_(HostUtility::createOverrideHostStatus(cluster_info_->lbConfig())) {
33892
  priority_set_.getOrCreateHostSet(0);
  // TODO(mattklein123): Consider converting other LBs over to thread local. All of them could
  // benefit given the healthy panic, locality, and priority calculations that take place.
33892
  ASSERT(lb_factory_ != nullptr);
33892
  lb_ = lb_factory_->create({priority_set_, parent_.local_priority_set_});
33892
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainOrCloseConnPools(
34888
    const HostSharedPtr& host, absl::optional<ConnectionPool::DrainBehavior> drain_behavior) {
  // Drain or close any HTTP connection pool for the host.
34888
  {
34888
    const auto container = getHttpConnPoolsContainer(host);
34888
    if (container != nullptr) {
187
      container->do_not_delete_ = true;
187
      if (drain_behavior.has_value()) {
185
        container->pools_->drainConnections(drain_behavior.value());
185
      } else {
        // TODO(wbpcode): 'CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE' and 'closeConnections'
        // is only supported for TCP connection pools for now. Use 'DrainExistingConnections'
        // drain here as alternative.
2
        container->pools_->drainConnections(
2
            ConnectionPool::DrainBehavior::DrainExistingConnections);
2
      }
187
      container->do_not_delete_ = false;
187
      if (container->pools_->empty()) {
160
        host_http_conn_pool_map_.erase(host);
160
      }
187
    }
34888
  }
  // Drain or close any TCP connection pool for the host.
34888
  {
34888
    const auto container = host_tcp_conn_pool_map_.find(host);
34888
    if (container != host_tcp_conn_pool_map_.end()) {
      // Draining pools or closing connections can cause pool deletion if it becomes
      // idle. Copy `pools_` so that we aren't iterating through a container that
      // gets mutated by callbacks deleting from it.
24
      std::vector<Tcp::ConnectionPool::Instance*> pools;
34
      for (const auto& pair : container->second.pools_) {
34
        pools.push_back(pair.second.get());
34
      }
34
      for (auto* pool : pools) {
34
        if (drain_behavior.has_value()) {
31
          pool->drainConnections(drain_behavior.value());
31
        } else {
3
          pool->closeConnections();
3
        }
34
      }
24
    }
34888
  }
34888
}
33892
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::~ClusterEntry() {
  // We need to drain all connection pools for the cluster being removed. Then we can remove the
  // cluster.
  //
  // TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of
  // the hosts inside of the HostImpl destructor. That is a change with wide implications, so we
  // are going with a more targeted approach for now.
33892
  drainConnPools();
33892
}
Http::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::httpConnPoolImpl(
    HostConstSharedPtr host, ResourcePriority priority,
47292
    absl::optional<Http::Protocol> downstream_protocol, LoadBalancerContext* context) {
47292
  if (!host) {
3
    return nullptr;
3
  }
  // Right now, HTTP, HTTP/2 and ALPN pools are considered separate.
  // We could do better here, and always use the ALPN pool and simply make sure
  // we end up on a connection of the correct protocol, but for simplicity we're
  // starting with something simpler.
47289
  auto upstream_protocols = host->cluster().upstreamHttpProtocol(downstream_protocol);
47289
  std::vector<uint8_t> hash_key;
47289
  hash_key.reserve(upstream_protocols.size());
47437
  for (auto protocol : upstream_protocols) {
47437
    hash_key.push_back(uint8_t(protocol));
47437
  }
47289
  absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>
47289
      alternate_protocol_options =
47289
          host->cluster().httpProtocolOptions().alternateProtocolsCacheOptions();
47289
  Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
47289
  if (context) {
    // Inherit socket options from downstream connection, if set.
47217
    if (context->downstreamConnection()) {
44140
      addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
44140
    }
47217
    addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
47217
  }
  // Use the socket options for computing connection pool hash key, if any.
  // This allows socket options to control connection pooling so that connections with
  // different options are not pooled together.
47289
  for (const auto& option : *upstream_options) {
10
    option->hashKey(hash_key);
10
  }
47289
  bool have_transport_socket_options = false;
47289
  if (context && context->upstreamTransportSocketOptions()) {
44136
    host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
44136
    have_transport_socket_options = true;
44136
  }
  // If configured, use the downstream connection id in pool hash key
47289
  if (cluster_info_->connectionPoolPerDownstreamConnection() && context &&
47289
      context->downstreamConnection()) {
15
    context->downstreamConnection()->hashKey(hash_key);
15
  }
47289
  ConnPoolsContainer& container = *parent_.getHttpConnPoolsContainer(host, true);
  // Note: to simplify this, we assume that the factory is only called in the scope of this
  // function. Otherwise, we'd need to capture a few of these variables by value.
47289
  ConnPoolsContainer::ConnPools::PoolOptRef pool =
47289
      container.pools_->getPool(priority, hash_key, [&]() {
11757
        auto pool = parent_.parent_.factory_.allocateConnPool(
11757
            parent_.thread_local_dispatcher_, host, priority, upstream_protocols,
11757
            alternate_protocol_options, !upstream_options->empty() ? upstream_options : nullptr,
11757
            have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
11757
            parent_.parent_.time_source_, parent_.cluster_manager_state_, quic_info_,
11757
            parent_.getNetworkObserverRegistry());
11757
        pool->addIdleCallback([&parent = parent_, host, priority, hash_key]() {
11707
          parent.httpConnPoolIsIdle(host, priority, hash_key);
11707
        });
11757
        return pool;
11757
      });
47289
  if (pool.has_value()) {
47288
    return &(pool.value().get());
47288
  } else {
1
    return nullptr;
1
  }
47289
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::httpConnPoolIsIdle(
11707
    HostConstSharedPtr host, ResourcePriority priority, const std::vector<uint8_t>& hash_key) {
11707
  if (destroying_) {
    // If the Cluster is being destroyed, this pool will be cleaned up by that
    // process.
224
    return;
224
  }
11483
  ConnPoolsContainer* container = getHttpConnPoolsContainer(host);
11483
  if (container == nullptr) {
    // This could happen if we have cleaned out the host before iterating through every
    // connection pool. Handle it by just continuing.
    return;
  }
11483
  ENVOY_LOG(trace, "Erasing idle pool for host {}", *host);
11483
  container->pools_->erasePool(priority, hash_key);
  // Guard deletion of the container with `do_not_delete_` to avoid deletion while
  // iterating through the container in `container->pools_->startDrain()`. See
  // comment in `ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools`.
11483
  if (!container->do_not_delete_ && container->pools_->empty()) {
11312
    ENVOY_LOG(trace, "Pool container empty for host {}, erasing host entry", *host);
11312
    host_http_conn_pool_map_.erase(
11312
        host); // NOTE: `container` is erased after this point in the lambda.
11312
  }
11483
}
HostSelectionResponse ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::chooseHost(
50070
    LoadBalancerContext* context) {
50070
  auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
50070
  auto host_and_strict_mode = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
50070
                                                              override_host_statuses_, context);
50070
  if (host_and_strict_mode.first != nullptr) {
30
    return {std::move(host_and_strict_mode.first)};
30
  }
50040
  if (!host_and_strict_mode.second) {
50036
    Upstream::HostSelectionResponse host_selection = lb_->chooseHost(context);
50036
    if (host_selection.host || host_selection.cancelable) {
49993
      return host_selection;
49993
    }
43
    cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
43
    ENVOY_LOG(debug, "no healthy host");
43
    return host_selection;
50036
  }
4
  cluster_info_->trafficStats()->upstream_cx_none_healthy_.inc();
4
  ENVOY_LOG(debug, "no healthy host");
4
  return {nullptr};
50040
}
HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::peekAnotherHost(
124
    LoadBalancerContext* context) {
124
  auto cross_priority_host_map = priority_set_.crossPriorityHostMap();
124
  auto host_and_strict_mode = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
124
                                                              override_host_statuses_, context);
124
  if (host_and_strict_mode.first != nullptr) {
1
    return std::move(host_and_strict_mode.first);
1
  }
  // TODO(wbpcode): should we do strict mode check of override host here?
123
  return lb_->peekAnotherHost(context);
124
}
Tcp::ConnectionPool::Instance*
ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConnPoolImpl(
2313
    HostConstSharedPtr host, ResourcePriority priority, LoadBalancerContext* context) {
  // Inherit socket options from downstream connection, if set.
2313
  std::vector<uint8_t> hash_key = {uint8_t(priority)};
  // Use downstream connection socket options for computing connection pool hash key, if any.
  // This allows socket options to control connection pooling so that connections with
  // different options are not pooled together.
2313
  Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
2313
  if (context) {
2249
    if (context->downstreamConnection()) {
2225
      addOptionsIfNotNull(upstream_options, context->downstreamConnection()->socketOptions());
2225
    }
2249
    addOptionsIfNotNull(upstream_options, context->upstreamSocketOptions());
2249
  }
2313
  for (const auto& option : *upstream_options) {
2
    option->hashKey(hash_key);
2
  }
  // If configured, use the downstream connection id in pool hash key
2313
  if (cluster_info_->connectionPoolPerDownstreamConnection() && context &&
2313
      context->downstreamConnection()) {
4
    ENVOY_LOG(trace, "honoring connection_pool_per_downstream_connection");
4
    context->downstreamConnection()->hashKey(hash_key);
4
  }
2313
  bool have_transport_socket_options = false;
2313
  if (context != nullptr && context->upstreamTransportSocketOptions() != nullptr) {
1941
    have_transport_socket_options = true;
1941
    host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
1941
  }
2313
  auto container_iter = parent_.host_tcp_conn_pool_map_.find(host);
2313
  if (container_iter == parent_.host_tcp_conn_pool_map_.end()) {
809
    container_iter = parent_.host_tcp_conn_pool_map_.try_emplace(host, host->acquireHandle()).first;
809
  }
2313
  TcpConnPoolsContainer& container = container_iter->second;
2313
  auto pool_iter = container.pools_.find(hash_key);
2313
  if (pool_iter == container.pools_.end()) {
834
    bool inserted;
834
    std::tie(pool_iter, inserted) = container.pools_.emplace(
834
        hash_key,
834
        parent_.parent_.factory_.allocateTcpConnPool(
834
            parent_.thread_local_dispatcher_, host, priority,
834
            !upstream_options->empty() ? upstream_options : nullptr,
834
            have_transport_socket_options ? context->upstreamTransportSocketOptions() : nullptr,
834
            parent_.cluster_manager_state_, cluster_info_->tcpPoolIdleTimeout()));
834
    ASSERT(inserted);
834
    pool_iter->second->addIdleCallback(
834
        [&parent = parent_, host, hash_key]() { parent.tcpConnPoolIsIdle(host, hash_key); });
834
  }
2313
  return pool_iter->second.get();
2313
}
void ClusterManagerImpl::ThreadLocalClusterManagerImpl::tcpConnPoolIsIdle(
793
    HostConstSharedPtr host, const std::vector<uint8_t>& hash_key) {
793
  if (destroying_) {
    // If the Cluster is being destroyed, this pool will be cleaned up by that process.
7
    return;
7
  }
786
  auto it = host_tcp_conn_pool_map_.find(host);
786
  if (it != host_tcp_conn_pool_map_.end()) {
786
    TcpConnPoolsContainer& container = it->second;
786
    auto erase_iter = container.pools_.find(hash_key);
786
    if (erase_iter != container.pools_.end()) {
786
      ENVOY_LOG(trace, "Idle pool, erasing pool for host {}", *host);
786
      thread_local_dispatcher_.deferredDelete(std::move(erase_iter->second));
786
      container.pools_.erase(erase_iter);
786
    }
786
    if (container.pools_.empty()) {
777
      host_tcp_conn_pool_map_.erase(
777
          host); // NOTE: `container` is erased after this point in the lambda.
777
    }
786
  }
786
}
absl::StatusOr<ClusterManagerPtr> ProdClusterManagerFactory::clusterManagerFromProto(
10685
    const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
10685
  absl::Status creation_status = absl::OkStatus();
10685
  auto cluster_manager_impl = std::unique_ptr<ClusterManagerImpl>{
10685
      new ClusterManagerImpl(bootstrap, *this, context_, creation_status)};
10685
  RETURN_IF_NOT_OK(creation_status);
10685
  return cluster_manager_impl;
10685
}
Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
    Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
    std::vector<Http::Protocol>& protocols,
    const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>&
        alternate_protocol_options,
    const Network::ConnectionSocket::OptionsSharedPtr& options,
    const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
    TimeSource& source, ClusterConnectivityState& state, Http::PersistentQuicInfoPtr& quic_info,
11688
    OptRef<Quic::EnvoyQuicNetworkObserverRegistry> network_observer_registry) {
11688
  Http::HttpServerPropertiesCacheSharedPtr alternate_protocols_cache;
11688
  if (alternate_protocol_options.has_value()) {
    // If there is configuration for an alternate protocols cache, always create one.
44
    alternate_protocols_cache =
44
        alternate_protocols_cache_manager_.getCache(alternate_protocol_options.value(), dispatcher);
11684
  } else if (!alternate_protocol_options.has_value() &&
11644
             (protocols.size() == 2 ||
11644
              (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2))) {
    // If there is no configuration for an alternate protocols cache, still
    // create one if there's an HTTP/2 upstream (either explicitly, or for mixed
    // HTTP/1.1 and HTTP/2 pools) to track the max concurrent streams across
    // connections.
5914
    envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
5914
    default_options.set_name(host->cluster().name());
5914
    alternate_protocols_cache =
5914
        alternate_protocols_cache_manager_.getCache(default_options, dispatcher);
5914
  }
11688
  absl::optional<Http::HttpServerPropertiesCache::Origin> origin =
11688
      getOrigin(transport_socket_options, host);
11688
  if (protocols.size() == 3 &&
11688
      context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100) &&
11688
      !transport_socket_options->http11ProxyInfo()) {
41
    ASSERT(contains(protocols,
41
                    {Http::Protocol::Http11, Http::Protocol::Http2, Http::Protocol::Http3}));
41
    ASSERT(alternate_protocol_options.has_value());
41
    ASSERT(alternate_protocols_cache);
41
#ifdef ENVOY_ENABLE_QUIC
41
    Envoy::Http::ConnectivityGrid::ConnectivityOptions coptions{protocols};
41
    if (quic_info == nullptr) {
29
      quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster(), context_);
29
    }
41
    return std::make_unique<Http::ConnectivityGrid>(
41
        dispatcher, context_.api().randomGenerator(), host, priority, options,
41
        transport_socket_options, state, source, alternate_protocols_cache, coptions,
41
        quic_stat_names_, *stats_.rootScope(), *quic_info, network_observer_registry,
41
        context_.overloadManager());
#else
    (void)quic_info;
    (void)network_observer_registry;
    // Should be blocked by configuration checking at an earlier point.
    PANIC("unexpected");
#endif
41
  }
11647
  if (protocols.size() >= 2) {
35
    if (origin.has_value()) {
34
      envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
34
      default_options.set_name(host->cluster().name());
34
      alternate_protocols_cache =
34
          alternate_protocols_cache_manager_.getCache(default_options, dispatcher);
34
    }
35
    ASSERT(contains(protocols, {Http::Protocol::Http11, Http::Protocol::Http2}));
35
    return std::make_unique<Http::HttpConnPoolImplMixed>(
35
        dispatcher, context_.api().randomGenerator(), host, priority, options,
35
        transport_socket_options, state, origin, alternate_protocols_cache,
35
        context_.overloadManager());
35
  }
11612
  if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2 &&
11612
      context_.runtime().snapshot().featureEnabled("upstream.use_http2", 100)) {
5882
    return Http::Http2::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
5882
                                         priority, options, transport_socket_options, state,
5882
                                         context_.overloadManager(), origin,
5882
                                         alternate_protocols_cache);
5882
  }
5730
  if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http3 &&
5730
      context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
895
#ifdef ENVOY_ENABLE_QUIC
895
    if (quic_info == nullptr) {
859
      quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster(), context_);
859
    }
895
    return Http::Http3::allocateConnPool(
895
        dispatcher, context_.api().randomGenerator(), host, priority, options,
895
        transport_socket_options, state, quic_stat_names_, {}, *stats_.rootScope(), {}, *quic_info,
895
        network_observer_registry, context_.overloadManager(), false);
#else
    UNREFERENCED_PARAMETER(source);
    // Should be blocked by configuration checking at an earlier point.
    PANIC("unexpected");
#endif
895
  }
4835
  ASSERT(protocols.size() == 1 && protocols[0] == Http::Protocol::Http11);
4835
  return Http::Http1::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority,
4835
                                       options, transport_socket_options, state,
4835
                                       context_.overloadManager());
5730
}
Tcp::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateTcpConnPool(
    Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
    const Network::ConnectionSocket::OptionsSharedPtr& options,
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
    ClusterConnectivityState& state,
767
    absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout) {
767
  ENVOY_LOG_MISC(debug, "Allocating TCP conn pool");
767
  return std::make_unique<Tcp::ConnPoolImpl>(dispatcher, host, priority, options,
767
                                             transport_socket_options, state, tcp_pool_idle_timeout,
767
                                             context_.overloadManager());
767
}
absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
ProdClusterManagerFactory::clusterFromProto(const envoy::config::cluster::v3::Cluster& cluster,
                                            Outlier::EventLoggerSharedPtr outlier_event_logger,
17332
                                            bool added_via_api) {
17332
  return ClusterFactoryImplBase::create(cluster, context_, dns_resolver_fn_, outlier_event_logger,
17332
                                        added_via_api);
17332
}
absl::StatusOr<CdsApiPtr>
ProdClusterManagerFactory::createCds(const envoy::config::core::v3::ConfigSource& cds_config,
                                     const xds::core::v3::ResourceLocator* cds_resources_locator,
719
                                     ClusterManager& cm, bool support_multi_ads_sources) {
  // TODO(htuch): Differentiate static vs. dynamic validation visitors.
719
  return CdsApiImpl::create(cds_config, cds_resources_locator, cm, *stats_.rootScope(),
719
                            context_.messageValidationContext().dynamicValidationVisitor(),
719
                            context_, support_multi_ads_sources);
719
}
} // namespace Upstream
} // namespace Envoy