LCOV - code coverage report
Current view: top level - source/extensions/clusters/eds - eds.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 144 309 46.6 %
Date: 2024-01-05 06:35:25 Functions: 11 20 55.0 %

          Line data    Source code
       1             : #include "source/extensions/clusters/eds/eds.h"
       2             : 
       3             : #include "envoy/common/exception.h"
       4             : #include "envoy/config/cluster/v3/cluster.pb.h"
       5             : #include "envoy/config/core/v3/config_source.pb.h"
       6             : #include "envoy/service/discovery/v3/discovery.pb.h"
       7             : 
       8             : #include "source/common/common/assert.h"
       9             : #include "source/common/common/utility.h"
      10             : #include "source/common/config/api_version.h"
      11             : #include "source/common/config/decoded_resource_impl.h"
      12             : 
      13             : namespace Envoy {
      14             : namespace Upstream {
      15             : 
      16             : EdsClusterImpl::EdsClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
      17             :                                ClusterFactoryContext& cluster_context)
      18             :     : BaseDynamicClusterImpl(cluster, cluster_context),
      19             :       Envoy::Config::SubscriptionBase<envoy::config::endpoint::v3::ClusterLoadAssignment>(
      20             :           cluster_context.messageValidationVisitor(), "cluster_name"),
      21             :       local_info_(cluster_context.serverFactoryContext().localInfo()),
      22             :       eds_resources_cache_(
      23             :           Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")
      24             :               ? cluster_context.clusterManager().edsResourcesCache()
      25          28 :               : absl::nullopt) {
      26          28 :   Event::Dispatcher& dispatcher = cluster_context.serverFactoryContext().mainThreadDispatcher();
      27          28 :   assignment_timeout_ = dispatcher.createTimer([this]() -> void { onAssignmentTimeout(); });
      28          28 :   const auto& eds_config = cluster.eds_cluster_config().eds_config();
      29          28 :   if (Config::SubscriptionFactory::isPathBasedConfigSource(
      30          28 :           eds_config.config_source_specifier_case())) {
      31           0 :     initialize_phase_ = InitializePhase::Primary;
      32          28 :   } else {
      33          28 :     initialize_phase_ = InitializePhase::Secondary;
      34          28 :   }
      35          28 :   const auto resource_name = getResourceName();
      36          28 :   subscription_ =
      37          28 :       cluster_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
      38          28 :           eds_config, Grpc::Common::typeUrl(resource_name), info_->statsScope(), *this,
      39          28 :           resource_decoder_, {});
      40          28 : }
      41             : 
      42          28 : EdsClusterImpl::~EdsClusterImpl() {
      43          28 :   if (using_cached_resource_) {
      44             :     // Clear the callback as the subscription is no longer valid.
      45           0 :     eds_resources_cache_->removeCallback(edsServiceName(), this);
      46           0 :   }
      47          28 : }
      48             : 
      49          28 : void EdsClusterImpl::startPreInit() { subscription_->start({edsServiceName()}); }
      50             : 
      51          28 : void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) {
      52          28 :   absl::flat_hash_set<std::string> all_new_hosts;
      53          28 :   PriorityStateManager priority_state_manager(parent_, parent_.local_info_, &host_update_cb);
      54          28 :   for (const auto& locality_lb_endpoint : cluster_load_assignment_.endpoints()) {
      55          28 :     parent_.validateEndpointsForZoneAwareRouting(locality_lb_endpoint);
      56             : 
      57          28 :     priority_state_manager.initializePriorityFor(locality_lb_endpoint);
      58             : 
      59          28 :     if (locality_lb_endpoint.has_leds_cluster_locality_config()) {
      60             :       // The locality uses LEDS, fetch its dynamic data, which must be ready, or otherwise
      61             :       // the batchUpdate method should not have been called.
      62           0 :       const auto& leds_config = locality_lb_endpoint.leds_cluster_locality_config();
      63             : 
      64             :       // The batchUpdate call must be performed after all the endpoints of all localities
      65             :       // were received.
      66           0 :       ASSERT(parent_.leds_localities_.find(leds_config) != parent_.leds_localities_.end() &&
      67           0 :              parent_.leds_localities_[leds_config]->isUpdated());
      68           0 :       for (const auto& [_, lb_endpoint] :
      69           0 :            parent_.leds_localities_[leds_config]->getEndpointsMap()) {
      70           0 :         updateLocalityEndpoints(lb_endpoint, locality_lb_endpoint, priority_state_manager,
      71           0 :                                 all_new_hosts);
      72           0 :       }
      73          28 :     } else {
      74          28 :       for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
      75          28 :         updateLocalityEndpoints(lb_endpoint, locality_lb_endpoint, priority_state_manager,
      76          28 :                                 all_new_hosts);
      77          28 :       }
      78          28 :     }
      79          28 :   }
      80             : 
      81             :   // Track whether we rebuilt any LB structures.
      82          28 :   bool cluster_rebuilt = false;
      83             : 
      84             :   // Get the map of all the latest existing hosts, which is used to filter out the existing
      85             :   // hosts in the process of updating cluster memberships.
      86          28 :   HostMapConstSharedPtr all_hosts = parent_.prioritySet().crossPriorityHostMap();
      87          28 :   ASSERT(all_hosts != nullptr);
      88             : 
      89          28 :   const uint32_t overprovisioning_factor = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
      90          28 :       cluster_load_assignment_.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor);
      91          28 :   const bool weighted_priority_health =
      92          28 :       cluster_load_assignment_.policy().weighted_priority_health();
      93             : 
      94          28 :   LocalityWeightsMap empty_locality_map;
      95             : 
      96             :   // Loop over all priorities that exist in the new configuration.
      97          28 :   auto& priority_state = priority_state_manager.priorityState();
      98          56 :   for (size_t i = 0; i < priority_state.size(); ++i) {
      99          28 :     if (parent_.locality_weights_map_.size() <= i) {
     100          28 :       parent_.locality_weights_map_.resize(i + 1);
     101          28 :     }
     102          28 :     if (priority_state[i].first != nullptr) {
     103          28 :       cluster_rebuilt |= parent_.updateHostsPerLocality(
     104          28 :           i, weighted_priority_health, overprovisioning_factor, *priority_state[i].first,
     105          28 :           parent_.locality_weights_map_[i], priority_state[i].second, priority_state_manager,
     106          28 :           *all_hosts, all_new_hosts);
     107          28 :     } else {
     108             :       // If the new update contains a priority with no hosts, call the update function with an empty
     109             :       // set of hosts.
     110           0 :       cluster_rebuilt |=
     111           0 :           parent_.updateHostsPerLocality(i, weighted_priority_health, overprovisioning_factor, {},
     112           0 :                                          parent_.locality_weights_map_[i], empty_locality_map,
     113           0 :                                          priority_state_manager, *all_hosts, all_new_hosts);
     114           0 :     }
     115          28 :   }
     116             : 
     117             :   // Loop over all priorities not present in the config that already exists. This will
     118             :   // empty out any remaining priority that the config update did not refer to.
     119          28 :   for (size_t i = priority_state.size(); i < parent_.priority_set_.hostSetsPerPriority().size();
     120          28 :        ++i) {
     121           0 :     if (parent_.locality_weights_map_.size() <= i) {
     122           0 :       parent_.locality_weights_map_.resize(i + 1);
     123           0 :     }
     124           0 :     cluster_rebuilt |= parent_.updateHostsPerLocality(
     125           0 :         i, weighted_priority_health, overprovisioning_factor, {}, parent_.locality_weights_map_[i],
     126           0 :         empty_locality_map, priority_state_manager, *all_hosts, all_new_hosts);
     127           0 :   }
     128             : 
     129          28 :   if (!cluster_rebuilt) {
     130           0 :     parent_.info_->configUpdateStats().update_no_rebuild_.inc();
     131           0 :   }
     132             : 
     133             :   // If we didn't setup to initialize when our first round of health checking is complete, just
     134             :   // do it now.
     135          28 :   parent_.onPreInitComplete();
     136          28 : }
     137             : 
     138             : void EdsClusterImpl::BatchUpdateHelper::updateLocalityEndpoints(
     139             :     const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint,
     140             :     const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
     141          28 :     PriorityStateManager& priority_state_manager, absl::flat_hash_set<std::string>& all_new_hosts) {
     142          28 :   const auto address = parent_.resolveProtoAddress(lb_endpoint.endpoint().address());
     143          28 :   std::vector<Network::Address::InstanceConstSharedPtr> address_list;
     144          28 :   if (!lb_endpoint.endpoint().additional_addresses().empty()) {
     145           0 :     address_list.push_back(address);
     146           0 :     for (const auto& additional_address : lb_endpoint.endpoint().additional_addresses()) {
     147           0 :       address_list.emplace_back(parent_.resolveProtoAddress(additional_address.address()));
     148           0 :     }
     149           0 :   }
     150             : 
     151             :   // When the configuration contains duplicate hosts, only the first one will be retained.
     152          28 :   const auto address_as_string = address->asString();
     153          28 :   if (all_new_hosts.contains(address_as_string)) {
     154           0 :     return;
     155           0 :   }
     156             : 
     157          28 :   priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address,
     158          28 :                                                  address_list, locality_lb_endpoint, lb_endpoint,
     159          28 :                                                  parent_.time_source_);
     160          28 :   all_new_hosts.emplace(address_as_string);
     161          28 : }
     162             : 
     163             : absl::Status
     164             : EdsClusterImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
     165          28 :                                const std::string&) {
     166          28 :   if (resources.empty()) {
     167           0 :     ENVOY_LOG(debug, "Missing ClusterLoadAssignment for {} in onConfigUpdate()", edsServiceName());
     168           0 :     info_->configUpdateStats().update_empty_.inc();
     169           0 :     onPreInitComplete();
     170           0 :     return absl::OkStatus();
     171           0 :   }
     172          28 :   if (resources.size() != 1) {
     173           0 :     return absl::InvalidArgumentError(
     174           0 :         fmt::format("Unexpected EDS resource length: {}", resources.size()));
     175           0 :   }
     176             : 
     177          28 :   envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment =
     178          28 :       dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
     179          28 :           resources[0].get().resource());
     180          28 :   if (cluster_load_assignment.cluster_name() != edsServiceName()) {
     181           0 :     return absl::InvalidArgumentError(fmt::format("Unexpected EDS cluster (expecting {}): {}",
     182           0 :                                                   edsServiceName(),
     183           0 :                                                   cluster_load_assignment.cluster_name()));
     184           0 :   }
     185             :   // Validate that each locality doesn't have both LEDS and endpoints defined.
     186             :   // TODO(adisuissa): This is only needed for the API v3 support. In future major versions
     187             :   // the oneof definition will take care of it.
     188          28 :   for (const auto& locality : cluster_load_assignment.endpoints()) {
     189          28 :     if (locality.has_leds_cluster_locality_config() && locality.lb_endpoints_size() > 0) {
     190           0 :       return absl::InvalidArgumentError(fmt::format(
     191           0 :           "A ClusterLoadAssignment for cluster {} cannot include both LEDS (resource: {}) and a "
     192           0 :           "list of endpoints.",
     193           0 :           edsServiceName(), locality.leds_cluster_locality_config().leds_collection_name()));
     194           0 :     }
     195          28 :   }
     196             : 
     197             :   // Disable timer (if enabled) as we have received new assignment.
     198          28 :   if (assignment_timeout_->enabled()) {
     199           0 :     assignment_timeout_->disableTimer();
     200           0 :     if (eds_resources_cache_.has_value()) {
     201           0 :       eds_resources_cache_->disableExpiryTimer(edsServiceName());
     202           0 :     }
     203           0 :   }
     204             :   // Check if endpoint_stale_after is set.
     205          28 :   const uint64_t stale_after_ms =
     206          28 :       PROTOBUF_GET_MS_OR_DEFAULT(cluster_load_assignment.policy(), endpoint_stale_after, 0);
     207          28 :   if (stale_after_ms > 0) {
     208             :     // Stat to track how often we receive valid assignment_timeout in response.
     209           0 :     info_->configUpdateStats().assignment_timeout_received_.inc();
     210           0 :     assignment_timeout_->enableTimer(std::chrono::milliseconds(stale_after_ms));
     211           0 :     if (eds_resources_cache_.has_value()) {
     212           0 :       eds_resources_cache_->setExpiryTimer(edsServiceName(),
     213           0 :                                            std::chrono::milliseconds(stale_after_ms));
     214           0 :     }
     215           0 :   }
     216             : 
     217             :   // Drop overload configuration parsing.
     218          28 :   absl::Status status = parseDropOverloadConfig(cluster_load_assignment);
     219          28 :   if (!status.ok()) {
     220           0 :     return status;
     221           0 :   }
     222             : 
     223             :   // Pause LEDS messages until the EDS config is finished processing.
     224          28 :   Config::ScopedResume maybe_resume_leds;
     225          28 :   if (transport_factory_context_->clusterManager().adsMux()) {
     226          28 :     const auto type_url = Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>();
     227          28 :     maybe_resume_leds = transport_factory_context_->clusterManager().adsMux()->pause(type_url);
     228          28 :   }
     229             : 
     230          28 :   update(cluster_load_assignment);
     231             :   // If previously used a cached version, remove the subscription from the cache's
     232             :   // callbacks.
     233          28 :   if (using_cached_resource_) {
     234           0 :     eds_resources_cache_->removeCallback(edsServiceName(), this);
     235           0 :     using_cached_resource_ = false;
     236           0 :   }
     237          28 :   return absl::OkStatus();
     238          28 : }
     239             : 
     240             : void EdsClusterImpl::update(
     241          28 :     const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment) {
     242             :   // Compare the current set of LEDS localities (localities using LEDS) to the one received in the
     243             :   // update. A LEDS locality can either be added, removed, or kept. If it is added we add a
     244             :   // subscription to it, and if it is removed we delete the subscription.
     245          28 :   LedsConfigSet cla_leds_configs;
     246             : 
     247          28 :   for (const auto& locality : cluster_load_assignment.endpoints()) {
     248          28 :     if (locality.has_leds_cluster_locality_config()) {
     249           0 :       cla_leds_configs.emplace(locality.leds_cluster_locality_config());
     250           0 :     }
     251          28 :   }
     252             : 
     253             :   // Remove the LEDS localities that are not needed anymore.
     254          28 :   absl::erase_if(leds_localities_, [&cla_leds_configs](const auto& item) {
     255           0 :     auto const& [leds_config, _] = item;
     256             :     // Returns true if the leds_config isn't in the cla_leds_configs
     257           0 :     return cla_leds_configs.find(leds_config) == cla_leds_configs.end();
     258           0 :   });
     259             : 
     260             :   // In case LEDS is used, store the cluster load assignment as a field
     261             :   // (optimize for no-copy).
     262          28 :   const envoy::config::endpoint::v3::ClusterLoadAssignment* used_load_assignment;
     263          28 :   if (!cla_leds_configs.empty() || eds_resources_cache_.has_value()) {
     264           0 :     cluster_load_assignment_ = std::make_unique<envoy::config::endpoint::v3::ClusterLoadAssignment>(
     265           0 :         std::move(cluster_load_assignment));
     266           0 :     used_load_assignment = cluster_load_assignment_.get();
     267          28 :   } else {
     268          28 :     cluster_load_assignment_ = nullptr;
     269          28 :     used_load_assignment = &cluster_load_assignment;
     270          28 :   }
     271             : 
     272             :   // Add all the LEDS localities that are new.
     273          28 :   for (const auto& leds_config : cla_leds_configs) {
     274           0 :     if (leds_localities_.find(leds_config) == leds_localities_.end()) {
     275           0 :       ENVOY_LOG(trace, "Found new LEDS config in EDS onConfigUpdate() for cluster {}: {}",
     276           0 :                 edsServiceName(), leds_config.DebugString());
     277             : 
     278             :       // Create a new LEDS subscription and add it to the subscriptions map.
     279           0 :       LedsSubscriptionPtr leds_locality_subscription = std::make_unique<LedsSubscription>(
     280           0 :           leds_config, edsServiceName(), *transport_factory_context_, info_->statsScope(),
     281           0 :           [&, used_load_assignment]() {
     282             :             // Called upon an update to the locality.
     283           0 :             if (validateAllLedsUpdated()) {
     284           0 :               BatchUpdateHelper helper(*this, *used_load_assignment);
     285           0 :               priority_set_.batchHostUpdate(helper);
     286           0 :             }
     287           0 :           });
     288           0 :       leds_localities_.emplace(leds_config, std::move(leds_locality_subscription));
     289           0 :     }
     290           0 :   }
     291             : 
     292             :   // If all the LEDS localities are updated, the EDS update can occur. If not, then when the last
     293             :   // LEDS locality will be updated, it will trigger the EDS update helper.
     294          28 :   if (!validateAllLedsUpdated()) {
     295           0 :     return;
     296           0 :   }
     297             : 
     298          28 :   BatchUpdateHelper helper(*this, *used_load_assignment);
     299          28 :   priority_set_.batchHostUpdate(helper);
     300          28 : }
     301             : 
     302             : absl::Status
     303             : EdsClusterImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
     304           8 :                                const Protobuf::RepeatedPtrField<std::string>&, const std::string&) {
     305           8 :   return onConfigUpdate(added_resources, "");
     306           8 : }
     307             : 
     308           0 : void EdsClusterImpl::onAssignmentTimeout() {
     309             :   // We can no longer use the assignments, remove them.
     310             :   // TODO(vishalpowar) This is not going to work for incremental updates, and we
     311             :   // need to instead change the health status to indicate the assignments are
     312             :   // stale.
     313             :   // TODO(snowp): This should probably just use xDS TTLs?
     314           0 :   envoy::config::endpoint::v3::ClusterLoadAssignment resource;
     315           0 :   resource.set_cluster_name(edsServiceName());
     316           0 :   update(resource);
     317             : 
     318           0 :   if (eds_resources_cache_.has_value()) {
     319             :     // Clear the resource so it won't be used, and its watchers will be notified.
     320           0 :     eds_resources_cache_->removeResource(edsServiceName());
     321           0 :   }
     322             :   // Stat to track how often we end up with stale assignments.
     323           0 :   info_->configUpdateStats().assignment_stale_.inc();
     324           0 : }
     325             : 
     326           0 : void EdsClusterImpl::onCachedResourceRemoved(absl::string_view resource_name) {
     327           0 :   ASSERT(resource_name == edsServiceName());
     328             :   // Disable the timer if previously started.
     329           0 :   if (assignment_timeout_->enabled()) {
     330           0 :     assignment_timeout_->disableTimer();
     331           0 :     eds_resources_cache_->disableExpiryTimer(edsServiceName());
     332           0 :   }
     333           0 :   envoy::config::endpoint::v3::ClusterLoadAssignment resource;
     334           0 :   resource.set_cluster_name(edsServiceName());
     335           0 :   update(resource);
     336           0 : }
     337             : 
     338           0 : void EdsClusterImpl::reloadHealthyHostsHelper(const HostSharedPtr& host) {
     339             :   // Here we will see if we have a host that has been marked for deletion by service discovery
     340             :   // but has been stabilized due to passing active health checking. If such a host is now
     341             :   // failing active health checking we can remove it during this health check update.
     342           0 :   HostSharedPtr host_to_exclude = host;
     343           0 :   if (host_to_exclude != nullptr &&
     344           0 :       host_to_exclude->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) &&
     345           0 :       host_to_exclude->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)) {
     346             :     // Empty for clarity.
     347           0 :   } else {
     348             :     // Do not exclude and remove the host during the update.
     349           0 :     host_to_exclude = nullptr;
     350           0 :   }
     351             : 
     352           0 :   const auto& host_sets = prioritySet().hostSetsPerPriority();
     353           0 :   for (size_t priority = 0; priority < host_sets.size(); ++priority) {
     354           0 :     const auto& host_set = host_sets[priority];
     355             : 
     356             :     // Filter current hosts in case we need to exclude a host.
     357           0 :     HostVectorSharedPtr hosts_copy(new HostVector());
     358           0 :     std::copy_if(host_set->hosts().begin(), host_set->hosts().end(),
     359           0 :                  std::back_inserter(*hosts_copy),
     360           0 :                  [&host_to_exclude](const HostSharedPtr& host) { return host_to_exclude != host; });
     361             : 
     362             :     // Setup a hosts to remove vector in case we need to exclude a host.
     363           0 :     HostVector hosts_to_remove;
     364           0 :     if (hosts_copy->size() != host_set->hosts().size()) {
     365           0 :       ASSERT(hosts_copy->size() == host_set->hosts().size() - 1);
     366           0 :       hosts_to_remove.emplace_back(host_to_exclude);
     367           0 :     }
     368             : 
     369             :     // Filter hosts per locality in case we need to exclude a host.
     370           0 :     HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().filter(
     371           0 :         {[&host_to_exclude](const Host& host) { return &host != host_to_exclude.get(); }})[0];
     372             : 
     373           0 :     prioritySet().updateHosts(
     374           0 :         priority, HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
     375           0 :         host_set->localityWeights(), {}, hosts_to_remove, absl::nullopt, absl::nullopt);
     376           0 :   }
     377           0 : }
     378             : 
     379             : bool EdsClusterImpl::updateHostsPerLocality(
     380             :     const uint32_t priority, bool weighted_priority_health, const uint32_t overprovisioning_factor,
     381             :     const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map,
     382             :     LocalityWeightsMap& new_locality_weights_map, PriorityStateManager& priority_state_manager,
     383          28 :     const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) {
     384          28 :   const auto& host_set = priority_set_.getOrCreateHostSet(priority, overprovisioning_factor);
     385          28 :   HostVectorSharedPtr current_hosts_copy(new HostVector(host_set.hosts()));
     386             : 
     387          28 :   HostVector hosts_added;
     388          28 :   HostVector hosts_removed;
     389             :   // We need to trigger updateHosts with the new host vectors if they have changed. We also do this
     390             :   // when the locality weight map or the overprovisioning factor. Note calling updateDynamicHostList
     391             :   // is responsible for both determining whether there was a change and to perform the actual update
     392             :   // to current_hosts_copy, so it must be called even if we know that we need to update (e.g. if the
     393             :   // overprovisioning factor changes).
     394             :   //
     395             :   // TODO(htuch): We eagerly update all the host sets here on weight changes, which may have
     396             :   // performance implications, since this has the knock on effect that we rebuild the load balancers
     397             :   // and locality scheduler. See the comment in BaseDynamicClusterImpl::updateDynamicHostList
     398             :   // about this. In the future we may need to do better here.
     399          28 :   const bool hosts_updated = updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added,
     400          28 :                                                    hosts_removed, all_hosts, all_new_hosts);
     401          28 :   if (hosts_updated || host_set.weightedPriorityHealth() != weighted_priority_health ||
     402          28 :       host_set.overprovisioningFactor() != overprovisioning_factor ||
     403          28 :       locality_weights_map != new_locality_weights_map) {
     404          28 :     ASSERT(std::all_of(current_hosts_copy->begin(), current_hosts_copy->end(),
     405          28 :                        [&](const auto& host) { return host->priority() == priority; }));
     406          28 :     locality_weights_map = new_locality_weights_map;
     407          28 :     ENVOY_LOG(debug,
     408          28 :               "EDS hosts or locality weights changed for cluster: {} current hosts {} priority {}",
     409          28 :               info_->name(), host_set.hosts().size(), host_set.priority());
     410             : 
     411          28 :     priority_state_manager.updateClusterPrioritySet(
     412          28 :         priority, std::move(current_hosts_copy), hosts_added, hosts_removed, absl::nullopt,
     413          28 :         weighted_priority_health, overprovisioning_factor);
     414          28 :     return true;
     415          28 :   }
     416           0 :   return false;
     417          28 : }
     418             : 
     419             : void EdsClusterImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
     420           0 :                                           const EnvoyException*) {
     421           0 :   ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
     422             :   // Config failure may happen if Envoy times out waiting for the EDS resource.
     423             :   // If it is a timeout, the eds resources cache is enabled,
     424             :   // and there is a cached ClusterLoadAssignment, then the cached assignment should be used.
     425           0 :   if (reason == Envoy::Config::ConfigUpdateFailureReason::FetchTimedout &&
     426           0 :       eds_resources_cache_.has_value()) {
     427           0 :     ENVOY_LOG(trace, "onConfigUpdateFailed due to timeout for {}, looking for cached resources",
     428           0 :               edsServiceName());
     429           0 :     auto cached_resource = eds_resources_cache_->getResource(edsServiceName(), this);
     430           0 :     if (cached_resource.has_value()) {
     431           0 :       ENVOY_LOG(
     432           0 :           debug,
     433           0 :           "Did not receive EDS response on time, using cached ClusterLoadAssignment for cluster {}",
     434           0 :           edsServiceName());
     435           0 :       envoy::config::endpoint::v3::ClusterLoadAssignment cached_load_assignment =
     436           0 :           dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(*cached_resource);
     437           0 :       info_->configUpdateStats().assignment_use_cached_.inc();
     438           0 :       using_cached_resource_ = true;
     439           0 :       update(cached_load_assignment);
     440           0 :       return;
     441           0 :     }
     442           0 :   }
     443             :   // We need to allow server startup to continue, even if we have a bad config.
     444           0 :   onPreInitComplete();
     445           0 : }
     446             : 
     447             : absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>>
     448             : EdsClusterFactory::createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
     449          28 :                                      ClusterFactoryContext& context) {
     450             :   // TODO(kbaichoo): EDS cluster should be able to support loading it's
     451             :   // configuration from the CustomClusterType protobuf. Currently it does not.
     452             :   // See: https://github.com/envoyproxy/envoy/issues/28752
     453          28 :   if (!cluster.has_eds_cluster_config()) {
     454           0 :     return absl::InvalidArgumentError("cannot create an EDS cluster without an EDS config");
     455           0 :   }
     456             : 
     457          28 :   return std::make_pair(std::make_unique<EdsClusterImpl>(cluster, context), nullptr);
     458          28 : }
     459             : 
     460          28 : bool EdsClusterImpl::validateAllLedsUpdated() const {
     461             :   // Iterate through all LEDS based localities, and if they are all updated return true.
     462          28 :   for (const auto& [_, leds_subscription] : leds_localities_) {
     463           0 :     if (!leds_subscription->isUpdated()) {
     464           0 :       return false;
     465           0 :     }
     466           0 :   }
     467          28 :   return true;
     468          28 : }
     469             : 
     470             : /**
     471             :  * Static registration for the Eds cluster factory. @see RegisterFactory.
     472             :  */
     473             : REGISTER_FACTORY(EdsClusterFactory, ClusterFactory);
     474             : 
     475             : } // namespace Upstream
     476             : } // namespace Envoy

Generated by: LCOV version 1.15