LCOV - code coverage report
Current view: top level - source/extensions/clusters/aggregate - cluster.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 139 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 16 0.0 %

          Line data    Source code
       1             : #include "source/extensions/clusters/aggregate/cluster.h"
       2             : 
       3             : #include "envoy/config/cluster/v3/cluster.pb.h"
       4             : #include "envoy/event/dispatcher.h"
       5             : #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
       6             : #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
       7             : 
       8             : #include "source/common/common/assert.h"
       9             : 
      10             : namespace Envoy {
      11             : namespace Extensions {
      12             : namespace Clusters {
      13             : namespace Aggregate {
      14             : 
      15             : Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster,
      16             :                  const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
      17             :                  Upstream::ClusterFactoryContext& context)
      18             :     : Upstream::ClusterImplBase(cluster, context), cluster_manager_(context.clusterManager()),
      19             :       runtime_(context.serverFactoryContext().runtime()),
      20             :       random_(context.serverFactoryContext().api().randomGenerator()),
      21           0 :       clusters_(std::make_shared<ClusterSet>(config.clusters().begin(), config.clusters().end())) {}
      22             : 
      23             : AggregateClusterLoadBalancer::AggregateClusterLoadBalancer(
      24             :     const Upstream::ClusterInfoConstSharedPtr& parent_info,
      25             :     Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
      26             :     Random::RandomGenerator& random, const ClusterSetConstSharedPtr& clusters)
      27             :     : parent_info_(parent_info), cluster_manager_(cluster_manager), runtime_(runtime),
      28           0 :       random_(random), clusters_(clusters) {
      29           0 :   for (const auto& cluster : *clusters_) {
      30           0 :     auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
      31             :     // It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the
      32             :     // cluster could be added dynamically by xDS.
      33           0 :     if (tlc == nullptr) {
      34           0 :       continue;
      35           0 :     }
      36             : 
      37             :     // Add callback for clusters initialized before aggregate cluster.
      38           0 :     addMemberUpdateCallbackForCluster(*tlc);
      39           0 :   }
      40           0 :   refresh();
      41           0 :   handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
      42           0 : }
      43             : 
      44             : void AggregateClusterLoadBalancer::addMemberUpdateCallbackForCluster(
      45           0 :     Upstream::ThreadLocalCluster& thread_local_cluster) {
      46           0 :   member_update_cbs_[thread_local_cluster.info()->name()] =
      47           0 :       thread_local_cluster.prioritySet().addMemberUpdateCb(
      48           0 :           [this, target_cluster_info = thread_local_cluster.info()](const Upstream::HostVector&,
      49           0 :                                                                     const Upstream::HostVector&) {
      50           0 :             ENVOY_LOG(debug, "member update for cluster '{}' in aggregate cluster '{}'",
      51           0 :                       target_cluster_info->name(), parent_info_->name());
      52           0 :             refresh();
      53           0 :           });
      54           0 : }
      55             : 
      56             : PriorityContextPtr
      57           0 : AggregateClusterLoadBalancer::linearizePrioritySet(OptRef<const std::string> excluded_cluster) {
      58           0 :   PriorityContextPtr priority_context = std::make_unique<PriorityContext>();
      59           0 :   uint32_t next_priority_after_linearizing = 0;
      60             : 
      61             :   // Linearize the priority set. e.g. for clusters [C_0, C_1, C_2] referred in aggregate cluster
      62             :   //    C_0 [P_0, P_1, P_2]
      63             :   //    C_1 [P_0, P_1]
      64             :   //    C_2 [P_0, P_1, P_2, P_3]
      65             :   // The linearization result is:
      66             :   //    [C_0.P_0, C_0.P_1, C_0.P_2, C_1.P_0, C_1.P_1, C_2.P_0, C_2.P_1, C_2.P_2, C_2.P_3]
      67             :   // and the traffic will be distributed among these priorities.
      68           0 :   for (const auto& cluster : *clusters_) {
      69           0 :     if (excluded_cluster.has_value() && excluded_cluster.value().get() == cluster) {
      70           0 :       continue;
      71           0 :     }
      72           0 :     auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
      73             :     // It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the
      74             :     // cluster hasn't been added by xDS.
      75           0 :     if (tlc == nullptr) {
      76           0 :       ENVOY_LOG(debug, "refresh: cluster '{}' absent in aggregate cluster '{}'", cluster,
      77           0 :                 parent_info_->name());
      78           0 :       continue;
      79           0 :     } else {
      80           0 :       ENVOY_LOG(debug, "refresh: cluster '{}' found in aggregate cluster '{}'", cluster,
      81           0 :                 parent_info_->name());
      82           0 :     }
      83             : 
      84           0 :     uint32_t priority_in_current_cluster = 0;
      85           0 :     for (const auto& host_set : tlc->prioritySet().hostSetsPerPriority()) {
      86           0 :       if (!host_set->hosts().empty()) {
      87           0 :         priority_context->priority_set_.updateHosts(
      88           0 :             next_priority_after_linearizing, Upstream::HostSetImpl::updateHostsParams(*host_set),
      89           0 :             host_set->localityWeights(), host_set->hosts(), {}, host_set->weightedPriorityHealth(),
      90           0 :             host_set->overprovisioningFactor());
      91           0 :         priority_context->priority_to_cluster_.emplace_back(
      92           0 :             std::make_pair(priority_in_current_cluster, tlc));
      93             : 
      94           0 :         priority_context->cluster_and_priority_to_linearized_priority_[std::make_pair(
      95           0 :             cluster, priority_in_current_cluster)] = next_priority_after_linearizing;
      96           0 :         next_priority_after_linearizing++;
      97           0 :       }
      98           0 :       priority_in_current_cluster++;
      99           0 :     }
     100           0 :   }
     101             : 
     102           0 :   return priority_context;
     103           0 : }
     104             : 
     105           0 : void AggregateClusterLoadBalancer::refresh(OptRef<const std::string> excluded_cluster) {
     106           0 :   PriorityContextPtr priority_context = linearizePrioritySet(excluded_cluster);
     107           0 :   if (!priority_context->priority_set_.hostSetsPerPriority().empty()) {
     108           0 :     load_balancer_ = std::make_unique<LoadBalancerImpl>(
     109           0 :         *priority_context, parent_info_->lbStats(), runtime_, random_, parent_info_->lbConfig());
     110           0 :   } else {
     111           0 :     load_balancer_ = nullptr;
     112           0 :   }
     113           0 :   priority_context_ = std::move(priority_context);
     114           0 : }
     115             : 
     116             : void AggregateClusterLoadBalancer::onClusterAddOrUpdate(
     117           0 :     absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) {
     118           0 :   if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
     119           0 :     ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", cluster_name,
     120           0 :               parent_info_->name());
     121           0 :     auto& cluster = get_cluster();
     122           0 :     refresh();
     123           0 :     addMemberUpdateCallbackForCluster(cluster);
     124           0 :   }
     125           0 : }
     126             : 
     127           0 : void AggregateClusterLoadBalancer::onClusterRemoval(const std::string& cluster_name) {
     128             :   //  The onClusterRemoval callback is called before the thread local cluster is removed. There
     129             :   //  will be a dangling pointer to the thread local cluster if the deleted cluster is not skipped
     130             :   //  when we refresh the load balancer.
     131           0 :   if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
     132           0 :     ENVOY_LOG(debug, "removing cluster '{}' from aggregate cluster '{}'", cluster_name,
     133           0 :               parent_info_->name());
     134           0 :     refresh(cluster_name);
     135           0 :   }
     136           0 : }
     137             : 
     138             : absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToLinearizedPriority(
     139           0 :     const Upstream::HostDescription& host) const {
     140           0 :   auto it = priority_context_.cluster_and_priority_to_linearized_priority_.find(
     141           0 :       std::make_pair(host.cluster().name(), host.priority()));
     142             : 
     143           0 :   if (it != priority_context_.cluster_and_priority_to_linearized_priority_.end()) {
     144           0 :     return it->second;
     145           0 :   } else {
     146             :     // The HostSet can change due to CDS/EDS updates between retries.
     147           0 :     return absl::nullopt;
     148           0 :   }
     149           0 : }
     150             : 
     151             : Upstream::HostConstSharedPtr
     152           0 : AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) {
     153           0 :   const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr;
     154           0 :   if (context != nullptr) {
     155           0 :     priority_loads = &context->determinePriorityLoad(
     156           0 :         priority_set_, per_priority_load_,
     157           0 :         [this](const auto& host) { return hostToLinearizedPriority(host); });
     158           0 :   } else {
     159           0 :     priority_loads = &per_priority_load_;
     160           0 :   }
     161             : 
     162           0 :   const auto priority_pair =
     163           0 :       choosePriority(random_.random(), priority_loads->healthy_priority_load_,
     164           0 :                      priority_loads->degraded_priority_load_);
     165             : 
     166           0 :   AggregateLoadBalancerContext aggregate_context(
     167           0 :       context, priority_pair.second,
     168           0 :       priority_context_.priority_to_cluster_[priority_pair.first].first);
     169             : 
     170           0 :   Upstream::ThreadLocalCluster* cluster =
     171           0 :       priority_context_.priority_to_cluster_[priority_pair.first].second;
     172           0 :   return cluster->loadBalancer().chooseHost(&aggregate_context);
     173           0 : }
     174             : 
     175             : Upstream::HostConstSharedPtr
     176           0 : AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
     177           0 :   if (load_balancer_) {
     178           0 :     return load_balancer_->chooseHost(context);
     179           0 :   }
     180           0 :   return nullptr;
     181           0 : }
     182             : 
     183             : Upstream::HostConstSharedPtr
     184           0 : AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) {
     185           0 :   if (load_balancer_) {
     186           0 :     return load_balancer_->peekAnotherHost(context);
     187           0 :   }
     188           0 :   return nullptr;
     189           0 : }
     190             : 
     191             : absl::optional<Upstream::SelectedPoolAndConnection>
     192             : AggregateClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context,
     193             :                                                        const Upstream::Host& host,
     194           0 :                                                        std::vector<uint8_t>& hash_key) {
     195           0 :   if (load_balancer_) {
     196           0 :     return load_balancer_->selectExistingConnection(context, host, hash_key);
     197           0 :   }
     198           0 :   return absl::nullopt;
     199           0 : }
     200             : 
     201             : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
     202           0 : AggregateClusterLoadBalancer::lifetimeCallbacks() {
     203           0 :   if (load_balancer_) {
     204           0 :     return load_balancer_->lifetimeCallbacks();
     205           0 :   }
     206           0 :   return {};
     207           0 : }
     208             : 
     209             : absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
     210             : ClusterFactory::createClusterWithConfig(
     211             :     const envoy::config::cluster::v3::Cluster& cluster,
     212             :     const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
     213           0 :     Upstream::ClusterFactoryContext& context) {
     214           0 :   auto new_cluster = std::make_shared<Cluster>(cluster, proto_config, context);
     215           0 :   auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster);
     216           0 :   return std::make_pair(new_cluster, std::move(lb));
     217           0 : }
     218             : 
     219             : REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
     220             : 
     221             : } // namespace Aggregate
     222             : } // namespace Clusters
     223             : } // namespace Extensions
     224             : } // namespace Envoy

Generated by: LCOV version 1.15