LCOV - code coverage report
Current view: top level - source/common/upstream - thread_aware_lb_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 191 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 11 0.0 %

          Line data    Source code
       1             : #include "source/common/upstream/thread_aware_lb_impl.h"
       2             : 
       3             : #include <memory>
       4             : #include <random>
       5             : 
       6             : namespace Envoy {
       7             : namespace Upstream {
       8             : 
       9             : // TODO(mergeconflict): Adjust locality weights for partial availability, as is done in
      10             : //                      HostSetImpl::effectiveLocalityWeight.
      11             : namespace {
      12             : 
      13             : void normalizeHostWeights(const HostVector& hosts, double normalized_locality_weight,
      14             :                           NormalizedHostWeightVector& normalized_host_weights,
      15           0 :                           double& min_normalized_weight, double& max_normalized_weight) {
      16             :   // sum should be at most uint32_t max value, so we can validate it by accumulating into unit64_t
      17             :   // and making sure there was no overflow
      18           0 :   uint64_t sum = 0;
      19           0 :   for (const auto& host : hosts) {
      20           0 :     sum += host->weight();
      21           0 :     if (sum > std::numeric_limits<uint32_t>::max()) {
      22           0 :       throwEnvoyExceptionOrPanic(
      23           0 :           fmt::format("The sum of weights of all upstream hosts in a locality exceeds {}",
      24           0 :                       std::numeric_limits<uint32_t>::max()));
      25           0 :     }
      26           0 :   }
      27             : 
      28           0 :   for (const auto& host : hosts) {
      29           0 :     const double weight = host->weight() * normalized_locality_weight / sum;
      30           0 :     normalized_host_weights.push_back({host, weight});
      31           0 :     min_normalized_weight = std::min(min_normalized_weight, weight);
      32           0 :     max_normalized_weight = std::max(max_normalized_weight, weight);
      33           0 :   }
      34           0 : }
      35             : 
      36             : void normalizeLocalityWeights(const HostsPerLocality& hosts_per_locality,
      37             :                               const LocalityWeights& locality_weights,
      38             :                               NormalizedHostWeightVector& normalized_host_weights,
      39           0 :                               double& min_normalized_weight, double& max_normalized_weight) {
      40           0 :   ASSERT(locality_weights.size() == hosts_per_locality.get().size());
      41             : 
      42             :   // sum should be at most uint32_t max value, so we can validate it by accumulating into unit64_t
      43             :   // and making sure there was no overflow
      44           0 :   uint64_t sum = 0;
      45           0 :   for (const auto weight : locality_weights) {
      46           0 :     sum += weight;
      47           0 :     if (sum > std::numeric_limits<uint32_t>::max()) {
      48           0 :       throwEnvoyExceptionOrPanic(
      49           0 :           fmt::format("The sum of weights of all localities at the same priority exceeds {}",
      50           0 :                       std::numeric_limits<uint32_t>::max()));
      51           0 :     }
      52           0 :   }
      53             : 
      54             :   // Locality weights (unlike host weights) may be 0. If _all_ locality weights were 0, bail out.
      55           0 :   if (sum == 0) {
      56           0 :     return;
      57           0 :   }
      58             : 
      59             :   // Compute normalized weights for all hosts in each locality. If a locality was assigned zero
      60             :   // weight, all hosts in that locality will be skipped.
      61           0 :   for (LocalityWeights::size_type i = 0; i < locality_weights.size(); ++i) {
      62           0 :     if (locality_weights[i] != 0) {
      63           0 :       const HostVector& hosts = hosts_per_locality.get()[i];
      64           0 :       const double normalized_locality_weight = static_cast<double>(locality_weights[i]) / sum;
      65           0 :       normalizeHostWeights(hosts, normalized_locality_weight, normalized_host_weights,
      66           0 :                            min_normalized_weight, max_normalized_weight);
      67           0 :     }
      68           0 :   }
      69           0 : }
      70             : 
      71             : void normalizeWeights(const HostSet& host_set, bool in_panic,
      72             :                       NormalizedHostWeightVector& normalized_host_weights,
      73             :                       double& min_normalized_weight, double& max_normalized_weight,
      74           0 :                       bool locality_weighted_balancing) {
      75           0 :   if (!locality_weighted_balancing || host_set.localityWeights() == nullptr ||
      76           0 :       host_set.localityWeights()->empty()) {
      77             :     // If we're not dealing with locality weights, just normalize weights for the flat set of hosts.
      78           0 :     const auto& hosts = in_panic ? host_set.hosts() : host_set.healthyHosts();
      79           0 :     normalizeHostWeights(hosts, 1.0, normalized_host_weights, min_normalized_weight,
      80           0 :                          max_normalized_weight);
      81           0 :   } else {
      82             :     // Otherwise, normalize weights across all localities.
      83           0 :     const auto& hosts_per_locality =
      84           0 :         in_panic ? host_set.hostsPerLocality() : host_set.healthyHostsPerLocality();
      85           0 :     normalizeLocalityWeights(hosts_per_locality, *(host_set.localityWeights()),
      86           0 :                              normalized_host_weights, min_normalized_weight, max_normalized_weight);
      87           0 :   }
      88           0 : }
      89             : 
      90             : } // namespace
      91             : 
      92           0 : void ThreadAwareLoadBalancerBase::initialize() {
      93             :   // TODO(mattklein123): In the future, once initialized and the initial LB is built, it would be
      94             :   // better to use a background thread for computing LB updates. This has the substantial benefit
      95             :   // that if the LB computation thread falls behind, host set updates can be trivially collapsed.
      96             :   // I will look into doing this in a follow up. Doing everything using a background thread heavily
      97             :   // complicated initialization as the load balancer would need its own initialized callback. I
      98             :   // think the synchronous/asynchronous split is probably the best option.
      99           0 :   priority_update_cb_ = priority_set_.addPriorityUpdateCb(
     100           0 :       [this](uint32_t, const HostVector&, const HostVector&) -> void { refresh(); });
     101             : 
     102           0 :   refresh();
     103           0 : }
     104             : 
     105           0 : void ThreadAwareLoadBalancerBase::refresh() {
     106           0 :   auto per_priority_state_vector = std::make_shared<std::vector<PerPriorityStatePtr>>(
     107           0 :       priority_set_.hostSetsPerPriority().size());
     108           0 :   auto healthy_per_priority_load =
     109           0 :       std::make_shared<HealthyLoad>(per_priority_load_.healthy_priority_load_);
     110           0 :   auto degraded_per_priority_load =
     111           0 :       std::make_shared<DegradedLoad>(per_priority_load_.degraded_priority_load_);
     112             : 
     113           0 :   for (const auto& host_set : priority_set_.hostSetsPerPriority()) {
     114           0 :     const uint32_t priority = host_set->priority();
     115           0 :     (*per_priority_state_vector)[priority] = std::make_unique<PerPriorityState>();
     116           0 :     const auto& per_priority_state = (*per_priority_state_vector)[priority];
     117             :     // Copy panic flag from LoadBalancerBase. It is calculated when there is a change
     118             :     // in hosts set or hosts' health.
     119           0 :     per_priority_state->global_panic_ = per_priority_panic_[priority];
     120             : 
     121             :     // Normalize host and locality weights such that the sum of all normalized weights is 1.
     122           0 :     NormalizedHostWeightVector normalized_host_weights;
     123           0 :     double min_normalized_weight = 1.0;
     124           0 :     double max_normalized_weight = 0.0;
     125           0 :     normalizeWeights(*host_set, per_priority_state->global_panic_, normalized_host_weights,
     126           0 :                      min_normalized_weight, max_normalized_weight, locality_weighted_balancing_);
     127           0 :     per_priority_state->current_lb_ = createLoadBalancer(
     128           0 :         std::move(normalized_host_weights), min_normalized_weight, max_normalized_weight);
     129           0 :   }
     130             : 
     131           0 :   {
     132           0 :     absl::WriterMutexLock lock(&factory_->mutex_);
     133           0 :     factory_->healthy_per_priority_load_ = healthy_per_priority_load;
     134           0 :     factory_->degraded_per_priority_load_ = degraded_per_priority_load;
     135           0 :     factory_->per_priority_state_ = per_priority_state_vector;
     136           0 :   }
     137           0 : }
     138             : 
     139             : HostConstSharedPtr
     140           0 : ThreadAwareLoadBalancerBase::LoadBalancerImpl::chooseHost(LoadBalancerContext* context) {
     141             :   // Make sure we correctly return nullptr for any early chooseHost() calls.
     142           0 :   if (per_priority_state_ == nullptr) {
     143           0 :     return nullptr;
     144           0 :   }
     145             : 
     146           0 :   HostConstSharedPtr host;
     147             : 
     148             :   // If there is no hash in the context, just choose a random value (this effectively becomes
     149             :   // the random LB but it won't crash if someone configures it this way).
     150             :   // computeHashKey() may be computed on demand, so get it only once.
     151           0 :   absl::optional<uint64_t> hash;
     152           0 :   if (context) {
     153           0 :     hash = context->computeHashKey();
     154           0 :   }
     155           0 :   const uint64_t h = hash ? hash.value() : random_.random();
     156             : 
     157           0 :   const uint32_t priority =
     158           0 :       LoadBalancerBase::choosePriority(h, *healthy_per_priority_load_, *degraded_per_priority_load_)
     159           0 :           .first;
     160           0 :   const auto& per_priority_state = (*per_priority_state_)[priority];
     161           0 :   if (per_priority_state->global_panic_) {
     162           0 :     stats_.lb_healthy_panic_.inc();
     163           0 :   }
     164             : 
     165           0 :   const uint32_t max_attempts = context ? context->hostSelectionRetryCount() + 1 : 1;
     166           0 :   for (uint32_t i = 0; i < max_attempts; ++i) {
     167           0 :     host = per_priority_state->current_lb_->chooseHost(h, i);
     168             : 
     169             :     // If host selection failed or the host is accepted by the filter, return.
     170             :     // Otherwise, try again.
     171           0 :     if (!host || !context || !context->shouldSelectAnotherHost(*host)) {
     172           0 :       return host;
     173           0 :     }
     174           0 :   }
     175           0 :   return host;
     176           0 : }
     177             : 
     178           0 : LoadBalancerPtr ThreadAwareLoadBalancerBase::LoadBalancerFactoryImpl::create(LoadBalancerParams) {
     179           0 :   auto lb = std::make_unique<LoadBalancerImpl>(stats_, random_);
     180             : 
     181             :   // We must protect current_lb_ via a RW lock since it is accessed and written to by multiple
     182             :   // threads. All complex processing has already been precalculated however.
     183           0 :   absl::ReaderMutexLock lock(&mutex_);
     184           0 :   lb->healthy_per_priority_load_ = healthy_per_priority_load_;
     185           0 :   lb->degraded_per_priority_load_ = degraded_per_priority_load_;
     186           0 :   lb->per_priority_state_ = per_priority_state_;
     187           0 :   return lb;
     188           0 : }
     189             : 
     190             : double ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::hostOverloadFactor(
     191           0 :     const Host& host, double weight) const {
     192             :   // TODO(scheler): This will not work if rq_active cluster stat is disabled, need to detect
     193             :   // and alert the user if that's the case.
     194             : 
     195           0 :   const uint32_t overall_active = host.cluster().trafficStats()->upstream_rq_active_.value();
     196           0 :   const uint32_t host_active = host.stats().rq_active_.value();
     197             : 
     198           0 :   const uint32_t total_slots = ((overall_active + 1) * hash_balance_factor_ + 99) / 100;
     199           0 :   const uint32_t slots =
     200           0 :       std::max(static_cast<uint32_t>(std::ceil(total_slots * weight)), static_cast<uint32_t>(1));
     201             : 
     202           0 :   if (host.stats().rq_active_.value() > slots) {
     203           0 :     ENVOY_LOG_MISC(
     204           0 :         debug,
     205           0 :         "ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::chooseHost: "
     206           0 :         "host {} overloaded; overall_active {}, host_weight {}, host_active {} > slots {}",
     207           0 :         host.address()->asString(), overall_active, weight, host_active, slots);
     208           0 :   }
     209           0 :   return static_cast<double>(host.stats().rq_active_.value()) / slots;
     210           0 : }
     211             : 
     212             : HostConstSharedPtr
     213             : ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::chooseHost(uint64_t hash,
     214           0 :                                                                         uint32_t attempt) const {
     215             : 
     216             :   // This is implemented based on the method described in the paper
     217             :   // https://arxiv.org/abs/1608.01350. For the specified `hash_balance_factor`, requests to any
     218             :   // upstream host are capped at `hash_balance_factor/100` times the average number of requests
     219             :   // across the cluster. When a request arrives for an upstream host that is currently serving at
     220             :   // its max capacity, linear probing is used to identify an eligible host. Further, the linear
     221             :   // probe is implemented using a random jump on hosts ring/table to identify the eligible host
     222             :   // (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump
     223             :   // avoids the cascading overflow effect when choosing the next host on the ring/table).
     224             :   //
     225             :   // If weights are specified on the hosts, they are respected.
     226             :   //
     227             :   // This is an O(N) algorithm, unlike other load balancers. Using a lower `hash_balance_factor`
     228             :   // results in more hosts being probed, so use a higher value if you require better performance.
     229             : 
     230           0 :   if (normalized_host_weights_.empty()) {
     231           0 :     return nullptr;
     232           0 :   }
     233             : 
     234           0 :   HostConstSharedPtr host = hashing_lb_ptr_->chooseHost(hash, attempt);
     235           0 :   if (host == nullptr) {
     236           0 :     return nullptr;
     237           0 :   }
     238           0 :   const double weight = normalized_host_weights_map_.at(host);
     239           0 :   double overload_factor = hostOverloadFactor(*host, weight);
     240           0 :   if (overload_factor <= 1.0) {
     241           0 :     ENVOY_LOG_MISC(debug,
     242           0 :                    "ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::chooseHost: "
     243           0 :                    "selected host #{} (attempt:1)",
     244           0 :                    host->address()->asString());
     245           0 :     return host;
     246           0 :   }
     247             : 
     248             :   // When a host is overloaded, we choose the next host in a random manner rather than picking the
     249             :   // next one in the ring. The random sequence is seeded by the hash, so the same input gets the
     250             :   // same sequence of hosts all the time.
     251           0 :   const uint32_t num_hosts = normalized_host_weights_.size();
     252           0 :   auto host_index = std::vector<uint32_t>(num_hosts);
     253           0 :   for (uint32_t i = 0; i < num_hosts; i++) {
     254           0 :     host_index[i] = i;
     255           0 :   }
     256             : 
     257             :   // Not using Random::RandomGenerator as it does not take a seed. Seeded RNG is a requirement
     258             :   // here as we need the same shuffle sequence for the same hash every time.
     259             :   // Further, not using std::default_random_engine and std::uniform_int_distribution as they
     260             :   // are not consistent across Linux and Windows platforms.
     261           0 :   const uint64_t seed = hash;
     262           0 :   std::mt19937 random(seed);
     263             : 
     264             :   // generates a random number in the range [0,k) uniformly.
     265           0 :   auto uniform_int = [](std::mt19937& random, uint32_t k) -> uint32_t {
     266           0 :     uint32_t x = k;
     267           0 :     while (x >= k) {
     268           0 :       x = random() / ((static_cast<uint64_t>(random.max()) + 1u) / k);
     269           0 :     }
     270           0 :     return x;
     271           0 :   };
     272             : 
     273           0 :   HostConstSharedPtr alt_host, least_overloaded_host = host;
     274           0 :   double least_overload_factor = overload_factor;
     275           0 :   for (uint32_t i = 0; i < num_hosts; i++) {
     276             :     // The random shuffle algorithm
     277           0 :     const uint32_t j = uniform_int(random, num_hosts - i);
     278           0 :     std::swap(host_index[i], host_index[i + j]);
     279             : 
     280           0 :     const uint32_t k = host_index[i];
     281           0 :     alt_host = normalized_host_weights_[k].first;
     282           0 :     if (alt_host == host) {
     283           0 :       continue;
     284           0 :     }
     285             : 
     286           0 :     const double alt_host_weight = normalized_host_weights_[k].second;
     287           0 :     overload_factor = hostOverloadFactor(*alt_host, alt_host_weight);
     288             : 
     289           0 :     if (overload_factor <= 1.0) {
     290           0 :       ENVOY_LOG_MISC(debug,
     291           0 :                      "ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::chooseHost: "
     292           0 :                      "selected host #{}:{} (attempt:{})",
     293           0 :                      k, alt_host->address()->asString(), i + 2);
     294           0 :       return alt_host;
     295           0 :     }
     296             : 
     297           0 :     if (least_overload_factor > overload_factor) {
     298           0 :       least_overloaded_host = alt_host;
     299           0 :       least_overload_factor = overload_factor;
     300           0 :     }
     301           0 :   }
     302             : 
     303           0 :   return least_overloaded_host;
     304           0 : }
     305             : 
     306             : } // namespace Upstream
     307             : } // namespace Envoy

Generated by: LCOV version 1.15