getOrigin(const Network::TransportSocketOptionsConstSharedPtr& options, HostConstSharedPtr host) {
std::string sni = std::string(host->transportSocketFactory().defaultServerNameIndication());
primary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
secondary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);
ENVOY_LOG(debug, "cm init: adding: cluster={} primary={} secondary={}", cluster.info()->name(),
for (auto iter = secondary_init_clusters_.begin(); iter != secondary_init_clusters_.end();) {
ClusterManagerImpl::ClusterManagerImpl(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
deferred_cluster_creation_(bootstrap.cluster_manager().enable_deferred_cluster_creation()),
bootstrap.grpc_async_client_manager_config(), context, context.grpcContext().statNames());
ClusterManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
// Cluster loading happens in two phases: first all the primary clusters are loaded, and then all
auto is_primary_cluster = [](const envoy::config::cluster::v3::Cluster& cluster) -> bool {
return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_params);
auto cds_or_error = factory_.createCds(dyn_resources.cds_config(), cds_resources_locator.get(),
if (Runtime::runtimeFeatureEnabled("envoy.restart_features.use_cached_grpc_client_for_xds")) {
absl::StatusOr<Envoy::OptRef<const envoy::config::core::v3::GrpcService>> maybe_grpc_service =
const std::string final_prefix = absl::StrCat("thread_local_cluster_manager.", thread_name);
// been setup for cross-thread updates to avoid needless updates during initialization. The order
const uint64_t delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
// 2) After initial server load, we handle warming independently for each cluster in the warming
// Note: It's likely possible that all warming logic could be centralized in the init manager, but
state_changed_cluster_entry->second->cluster_->info()->configUpdateStats().warming_state_.set(
bool ClusterManagerImpl::removeCluster(const std::string& cluster_name, const bool remove_ignored) {
tls_.runOnAllThreads([cluster_name](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
// use it to detect when a previously sent cluster becomes warm before sending routes that depend
// on it. This can improve incidence of HTTP 503 responses from Envoy when a route is used before
ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view cluster) {
HostConstSharedPtr host = LoadBalancer::onlyAllowSynchronousHostSelection(chooseHost(context));
tls_.runOnAllThreads([cluster, predicate](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
if (std::find(supported_cluster_types.begin(), supported_cluster_types.end(), info->type()) ==
HostMapConstSharedPtr host_map = cm_cluster.cluster().prioritySet().crossPriorityHostMap();
// By this time, the main thread has received the cluster initialization update, so we can start
// instances for the same cluster. And before the thread local clusters are actually initialized,
// the new instances will override the old instances in the work threads. But part of data is be
// created only once, such as load balancer factory. So we should always to merge the new instance
entry != cluster_initialization_map_.end() && entry->second->cluster_info_ == cluster_info;
params, std::move(cluster_info), load_balancer_factory, map, drop_overload, drop_category);
*this, initialization_object->cluster_info_, initialization_object->load_balancer_factory_);
updateClusterMembership(initialization_object->cluster_info_->name(), per_priority.priority_,
cross_priority_host_map_(map), drop_overload_(drop_overload), drop_category_(drop_category) {
const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>& per_priority_state,
Host::CreateConnectionData ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::tcpConn(
return std::make_unique<Tcp::AsyncTcpClientImpl>(parent_.thread_local_dispatcher_, *this, context,
priority_set_.updateHosts(priority, std::move(update_hosts_params), std::move(locality_weights),
ClusterManagerImpl::requestOnDemandClusterDiscovery(uint64_t config_source_key, std::string name,
// This seems to be the first request for discovery of this cluster in this worker thread. Rest
ENVOY_LOG(debug, "cm odcds: on-demand discovery for cluster {} is already in progress", name);
// ClusterDiscoveryCallbackHandlePtr&, so named return value optimization does not apply here -
// the local cluster. This is because non-local clusters with a zone aware load balancer have a
ENVOY_LOG(debug, "removing hosts for TLS cluster {} removed {}", name, hosts_removed.size());
const std::string& name, uint32_t priority, PrioritySet::UpdateHostsParams update_hosts_params,
override_host_statuses_(HostUtility::createOverrideHostStatus(cluster_info_->lbConfig())) {
const HostSharedPtr& host, absl::optional<ConnectionPool::DrainBehavior> drain_behavior) {
// TODO(mattklein123): Optimally, we would just fire member changed callbacks and remove all of
Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
HostConstSharedPtr host, ResourcePriority priority, const std::vector<uint8_t>& hash_key) {
HostSelectionResponse ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::chooseHost(
auto host_and_strict_mode = HostUtility::selectOverrideHost(cross_priority_host_map.get(),
HostConstSharedPtr ClusterManagerImpl::ThreadLocalClusterManagerImpl::ClusterEntry::peekAnotherHost(
Network::Socket::OptionsSharedPtr upstream_options(std::make_shared<Network::Socket::Options>());
host->transportSocketFactory().hashKey(hash_key, context->upstreamTransportSocketOptions());
container_iter = parent_.host_tcp_conn_pool_map_.try_emplace(host, host->acquireHandle()).first;
alternate_protocols_cache_manager_.getCache(alternate_protocol_options.value(), dispatcher);
quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster(), context_);
quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster(), context_);
transport_socket_options, state, quic_stat_names_, {}, *stats_.rootScope(), {}, *quic_info,
return Http::Http1::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority,
ProdClusterManagerFactory::clusterFromProto(const envoy::config::cluster::v3::Cluster& cluster,
return ClusterFactoryImplBase::create(cluster, context_, dns_resolver_fn_, outlier_event_logger,