1
#include "source/extensions/load_balancing_policies/common/load_balancer_impl.h"
2

            
3
#include <atomic>
4
#include <bitset>
5
#include <cstdint>
6
#include <map>
7
#include <memory>
8
#include <string>
9
#include <vector>
10

            
11
#include "envoy/config/cluster/v3/cluster.pb.h"
12
#include "envoy/config/core/v3/base.pb.h"
13
#include "envoy/runtime/runtime.h"
14
#include "envoy/upstream/upstream.h"
15

            
16
#include "source/common/common/assert.h"
17
#include "source/common/common/logger.h"
18
#include "source/common/protobuf/utility.h"
19
#include "source/common/runtime/runtime_features.h"
20

            
21
#include "absl/container/fixed_array.h"
22

            
23
namespace Envoy {
24
namespace Upstream {
25

            
26
namespace {
27
static const std::string RuntimeZoneEnabled = "upstream.zone_routing.enabled";
28
static const std::string RuntimeMinClusterSize = "upstream.zone_routing.min_cluster_size";
29
static const std::string RuntimeForceLocalZoneMinSize =
30
    "upstream.zone_routing.force_local_zone.min_size";
31
static const std::string RuntimePanicThreshold = "upstream.healthy_panic_threshold";
32

            
33
// Returns true if the weights of all the hosts in the HostVector are equal.
34
267934
bool hostWeightsAreEqual(const HostVector& hosts) {
35
267935
  if (hosts.size() <= 1) {
36
265701
    return true;
37
265701
  }
38
2234
  const uint32_t weight = hosts[0]->weight();
39
106291
  for (size_t i = 1; i < hosts.size(); ++i) {
40
104199
    if (hosts[i]->weight() != weight) {
41
142
      return false;
42
142
    }
43
104199
  }
44
2092
  return true;
45
2234
}
46

            
47
} // namespace
48

            
49
std::pair<int32_t, size_t> distributeLoad(PriorityLoad& per_priority_load,
50
                                          const PriorityAvailability& per_priority_availability,
51
69125
                                          size_t total_load, size_t normalized_total_availability) {
52
69125
  int32_t first_available_priority = -1;
53
140917
  for (size_t i = 0; i < per_priority_availability.get().size(); ++i) {
54
71797
    if (first_available_priority < 0 && per_priority_availability.get()[i] > 0) {
55
34674
      first_available_priority = i;
56
34674
    }
57
    // Now assign as much load as possible to the high priority levels and cease assigning load
58
    // when total_load runs out.
59
71792
    per_priority_load.get()[i] = std::min<uint32_t>(
60
71792
        total_load, per_priority_availability.get()[i] * 100 / normalized_total_availability);
61
71792
    total_load -= per_priority_load.get()[i];
62
71792
  }
63

            
64
69125
  return {first_available_priority, total_load};
65
69125
}
66

            
67
std::pair<uint32_t, LoadBalancerBase::HostAvailability>
68
LoadBalancerBase::choosePriority(uint64_t hash, const HealthyLoad& healthy_per_priority_load,
69
3448138
                                 const DegradedLoad& degraded_per_priority_load) {
70
3448138
  hash = hash % 100 + 1; // 1-100
71
3448138
  uint32_t aggregate_percentage_load = 0;
72
  // As with tryChooseLocalLocalityHosts, this can be refactored for efficiency
73
  // but O(N) is good enough for now given the expected number of priorities is
74
  // small.
75

            
76
  // We first attempt to select a priority based on healthy availability.
77
4473933
  for (size_t priority = 0; priority < healthy_per_priority_load.get().size(); ++priority) {
78
4473453
    aggregate_percentage_load += healthy_per_priority_load.get()[priority];
79
4473453
    if (hash <= aggregate_percentage_load) {
80
3447658
      return {static_cast<uint32_t>(priority), HostAvailability::Healthy};
81
3447658
    }
82
4473453
  }
83

            
84
  // If no priorities were selected due to health, we'll select a priority based degraded
85
  // availability.
86
490
  for (size_t priority = 0; priority < degraded_per_priority_load.get().size(); ++priority) {
87
490
    aggregate_percentage_load += degraded_per_priority_load.get()[priority];
88
490
    if (hash <= aggregate_percentage_load) {
89
480
      return {static_cast<uint32_t>(priority), HostAvailability::Degraded};
90
480
    }
91
490
  }
92

            
93
  // The percentages should always add up to 100 but we have to have a return for the compiler.
94
  IS_ENVOY_BUG("unexpected load error");
95
  return {0, HostAvailability::Healthy};
96
480
}
97

            
98
LoadBalancerBase::LoadBalancerBase(const PrioritySet& priority_set, ClusterLbStats& stats,
99
                                   Runtime::Loader& runtime, Random::RandomGenerator& random,
100
                                   uint32_t healthy_panic_threshold)
101
34131
    : stats_(stats), runtime_(runtime), random_(random),
102
34131
      default_healthy_panic_percent_(healthy_panic_threshold), priority_set_(priority_set) {
103
34461
  for (auto& host_set : priority_set_.hostSetsPerPriority()) {
104
34461
    recalculatePerPriorityState(host_set->priority(), priority_set_, per_priority_load_,
105
34461
                                per_priority_health_, per_priority_degraded_, total_healthy_hosts_);
106
34461
  }
107
  // Recalculate panic mode for all levels.
108
34131
  recalculatePerPriorityPanic();
109

            
110
34131
  if (Runtime::runtimeFeatureEnabled(
111
34131
          "envoy.reloadable_features.coalesce_lb_rebuilds_on_batch_update")) {
112
34127
    priority_update_cb_ = priority_set_.addPriorityUpdateCb(
113
35542
        [this](uint32_t priority, const HostVector&, const HostVector&) {
114
34983
          dirty_priorities_.insert(priority);
115
34983
        });
116
34127
    member_update_cb_ = priority_set_.addMemberUpdateCb(
117
35063
        [this](const HostVector&, const HostVector&) { processDirtyPriorities(); });
118
34127
  } else {
119
4
    priority_update_cb_ = priority_set_.addPriorityUpdateCb(
120
4
        [this](uint32_t priority, const HostVector&, const HostVector&) {
121
4
          recalculatePerPriorityState(priority, priority_set_, per_priority_load_,
122
4
                                      per_priority_health_, per_priority_degraded_,
123
4
                                      total_healthy_hosts_);
124
4
          recalculatePerPriorityPanic();
125
4
          stashed_random_.clear();
126
4
        });
127
4
  }
128
34131
}
129

            
130
35104
void LoadBalancerBase::processDirtyPriorities() {
131
35104
  if (dirty_priorities_.empty()) {
132
605
    return;
133
605
  }
134
34504
  for (uint32_t priority : dirty_priorities_) {
135
34501
    recalculatePerPriorityState(priority, priority_set_, per_priority_load_, per_priority_health_,
136
34501
                                per_priority_degraded_, total_healthy_hosts_);
137
34501
  }
138
34499
  dirty_priorities_.clear();
139
34499
  recalculatePerPriorityPanic();
140
34499
  stashed_random_.clear();
141
34499
}
142

            
143
// The following cases are handled by
144
// recalculatePerPriorityState and recalculatePerPriorityPanic methods (normalized total health is
145
// sum of all priorities' health values and capped at 100).
146
// - normalized total health is = 100%. It means there are enough healthy hosts to handle the load.
147
//   Do not enter panic mode, even if a specific priority has low number of healthy hosts.
148
// - normalized total health is < 100%. There are not enough healthy hosts to handle the load.
149
// Continue distributing the load among priority sets, but turn on panic mode for a given priority
150
//   if # of healthy hosts in priority set is low.
151
// - all host sets are in panic mode. Situation called TotalPanic. Load distribution is
152
//   calculated based on the number of hosts in each priority regardless of their health.
153
// - all hosts in all priorities are down (normalized total health is 0%). If panic
154
//   threshold > 0% the cluster is in TotalPanic (see above). If panic threshold == 0
155
//   then priorities are not in panic, but there are no healthy hosts to route to.
156
//   In this case just mark P=0 as recipient of 100% of the traffic (nothing will be routed
157
//   to P=0 anyways as there are no healthy hosts there).
158
void LoadBalancerBase::recalculatePerPriorityState(uint32_t priority,
159
                                                   const PrioritySet& priority_set,
160
                                                   HealthyAndDegradedLoad& per_priority_load,
161
                                                   HealthyAvailability& per_priority_health,
162
                                                   DegradedAvailability& per_priority_degraded,
163
69012
                                                   uint32_t& total_healthy_hosts) {
164
69012
  per_priority_load.healthy_priority_load_.get().resize(priority_set.hostSetsPerPriority().size());
165
69012
  per_priority_load.degraded_priority_load_.get().resize(priority_set.hostSetsPerPriority().size());
166
69012
  per_priority_health.get().resize(priority_set.hostSetsPerPriority().size());
167
69012
  per_priority_degraded.get().resize(priority_set.hostSetsPerPriority().size());
168
69012
  total_healthy_hosts = 0;
169

            
170
  // Determine the health of the newly modified priority level.
171
  // Health ranges from 0-100, and is the ratio of healthy/degraded hosts to total hosts, modified
172
  // by the overprovisioning factor.
173
69012
  HostSet& host_set = *priority_set.hostSetsPerPriority()[priority];
174
69012
  per_priority_health.get()[priority] = 0;
175
69012
  per_priority_degraded.get()[priority] = 0;
176
69012
  const auto host_count = host_set.hosts().size() - host_set.excludedHosts().size();
177

            
178
69012
  if (host_count > 0) {
179
34702
    uint64_t healthy_weight = 0;
180
34702
    uint64_t degraded_weight = 0;
181
34702
    uint64_t total_weight = 0;
182
34702
    if (host_set.weightedPriorityHealth()) {
183
52
      for (const auto& host : host_set.healthyHosts()) {
184
52
        healthy_weight += host->weight();
185
52
      }
186

            
187
48
      for (const auto& host : host_set.degradedHosts()) {
188
10
        degraded_weight += host->weight();
189
10
      }
190

            
191
108
      for (const auto& host : host_set.hosts()) {
192
108
        total_weight += host->weight();
193
108
      }
194

            
195
48
      uint64_t excluded_weight = 0;
196
48
      for (const auto& host : host_set.excludedHosts()) {
197
4
        excluded_weight += host->weight();
198
4
      }
199
48
      ASSERT(total_weight >= excluded_weight);
200
48
      total_weight -= excluded_weight;
201
34660
    } else {
202
34654
      healthy_weight = host_set.healthyHosts().size();
203
34654
      degraded_weight = host_set.degradedHosts().size();
204
34654
      total_weight = host_count;
205
34654
    }
206
    // Each priority level's health is ratio of healthy hosts to total number of hosts in a
207
    // priority multiplied by overprovisioning factor of 1.4 and capped at 100%. It means that if
208
    // all hosts are healthy that priority's health is 100%*1.4=140% and is capped at 100% which
209
    // results in 100%. If 80% of hosts are healthy, that priority's health is still 100%
210
    // (80%*1.4=112% and capped at 100%).
211
34702
    per_priority_health.get()[priority] =
212
34702
        std::min<uint32_t>(100,
213
                           // NOLINTNEXTLINE(clang-analyzer-core.DivideZero)
214
34702
                           (host_set.overprovisioningFactor() * healthy_weight / total_weight));
215

            
216
    // We perform the same computation for degraded hosts.
217
34702
    per_priority_degraded.get()[priority] = std::min<uint32_t>(
218
34702
        100, (host_set.overprovisioningFactor() * degraded_weight / total_weight));
219

            
220
34702
    ENVOY_LOG(trace,
221
34702
              "recalculated priority state: priority level {}, healthy weight {}, total weight {}, "
222
34702
              "overprovision factor {}, healthy result {}, degraded result {}",
223
34702
              priority, healthy_weight, total_weight, host_set.overprovisioningFactor(),
224
34702
              per_priority_health.get()[priority], per_priority_degraded.get()[priority]);
225
34702
  }
226

            
227
  // Now that we've updated health for the changed priority level, we need to calculate percentage
228
  // load for all priority levels.
229

            
230
  // First, determine if the load needs to be scaled relative to availability (healthy + degraded).
231
  // For example if there are 3 host sets with 10% / 20% / 10% health and 20% / 10% / 0% degraded
232
  // they will get 16% / 28% / 14% load to healthy hosts and 28% / 14% / 0% load to degraded hosts
233
  // to ensure total load adds up to 100. Note the first healthy priority is receiving 2% additional
234
  // load due to rounding.
235
  //
236
  // Sum of priority levels' health and degraded values may exceed 100, so it is capped at 100 and
237
  // referred as normalized total availability.
238
69012
  const uint32_t normalized_total_availability =
239
69012
      calculateNormalizedTotalAvailability(per_priority_health, per_priority_degraded);
240
69012
  if (normalized_total_availability == 0) {
241
    // Everything is terrible. There is nothing to calculate here.
242
    // Let recalculatePerPriorityPanic and recalculateLoadInTotalPanic deal with
243
    // load calculation.
244
34450
    return;
245
34450
  }
246

            
247
  // We start of with a total load of 100 and distribute it between priorities based on
248
  // availability. We first attempt to distribute this load to healthy priorities based on healthy
249
  // availability.
250
34562
  const auto first_healthy_and_remaining =
251
34562
      distributeLoad(per_priority_load.healthy_priority_load_, per_priority_health, 100,
252
34562
                     normalized_total_availability);
253

            
254
  // Using the remaining load after allocating load to healthy priorities, distribute it based on
255
  // degraded availability.
256
34562
  const auto remaining_load_for_degraded = first_healthy_and_remaining.second;
257
34562
  const auto first_degraded_and_remaining =
258
34562
      distributeLoad(per_priority_load.degraded_priority_load_, per_priority_degraded,
259
34562
                     remaining_load_for_degraded, normalized_total_availability);
260

            
261
  // Anything that remains should just be rounding errors, so allocate that to the first available
262
  // priority, either as healthy or degraded.
263
34562
  const auto remaining_load = first_degraded_and_remaining.second;
264
34562
  if (remaining_load != 0) {
265
29
    const auto first_healthy = first_healthy_and_remaining.first;
266
29
    const auto first_degraded = first_degraded_and_remaining.first;
267
29
    ASSERT(first_healthy != -1 || first_degraded != -1);
268

            
269
    // Attempt to allocate the remainder to the first healthy priority first. If no such priority
270
    // exist, allocate to the first degraded priority.
271
29
    ASSERT(remaining_load < per_priority_load.healthy_priority_load_.get().size() +
272
29
                                per_priority_load.degraded_priority_load_.get().size());
273
29
    if (first_healthy != -1) {
274
19
      per_priority_load.healthy_priority_load_.get()[first_healthy] += remaining_load;
275
19
    } else {
276
10
      per_priority_load.degraded_priority_load_.get()[first_degraded] += remaining_load;
277
10
    }
278
29
  }
279

            
280
  // The allocated load between healthy and degraded should be exactly 100.
281
34562
  ASSERT(100 == std::accumulate(per_priority_load.healthy_priority_load_.get().begin(),
282
34562
                                per_priority_load.healthy_priority_load_.get().end(), 0) +
283
34562
                    std::accumulate(per_priority_load.degraded_priority_load_.get().begin(),
284
34562
                                    per_priority_load.degraded_priority_load_.get().end(), 0));
285

            
286
35901
  for (auto& host_set : priority_set.hostSetsPerPriority()) {
287
35901
    total_healthy_hosts += host_set->healthyHosts().size();
288
35901
  }
289
34562
}
290

            
291
// Method iterates through priority levels and turns on/off panic mode.
292
68635
void LoadBalancerBase::recalculatePerPriorityPanic() {
293
68635
  per_priority_panic_.resize(priority_set_.hostSetsPerPriority().size());
294

            
295
68635
  const uint32_t normalized_total_availability =
296
68635
      calculateNormalizedTotalAvailability(per_priority_health_, per_priority_degraded_);
297

            
298
68635
  const uint64_t panic_threshold = std::min<uint64_t>(
299
68635
      100, runtime_.snapshot().getInteger(RuntimePanicThreshold, default_healthy_panic_percent_));
300

            
301
  // This is corner case when panic is disabled and there is no hosts available.
302
  // LoadBalancerBase::choosePriority method expects that the sum of
303
  // load percentages always adds up to 100.
304
  // To satisfy that requirement 100% is assigned to P=0.
305
  // In reality no traffic will be routed to P=0 priority, because
306
  // the panic mode is disabled and LoadBalancer will try to find
307
  // a healthy node and none is available.
308
68635
  if (panic_threshold == 0 && normalized_total_availability == 0) {
309
108
    per_priority_load_.healthy_priority_load_.get()[0] = 100;
310
108
    return;
311
108
  }
312

            
313
68527
  bool total_panic = true;
314
138327
  for (size_t i = 0; i < per_priority_health_.get().size(); ++i) {
315
    // For each level check if it should run in panic mode. Never set panic mode if
316
    // normalized total health is 100%, even when individual priority level has very low # of
317
    // healthy hosts.
318
69800
    const HostSet& priority_host_set = *priority_set_.hostSetsPerPriority()[i];
319
69800
    per_priority_panic_[i] =
320
69800
        (normalized_total_availability == 100 ? false : isHostSetInPanic(priority_host_set));
321
69800
    total_panic = total_panic && per_priority_panic_[i];
322
69800
  }
323

            
324
  // If all priority levels are in panic mode, load distribution
325
  // is done differently.
326
68527
  if (total_panic) {
327
34216
    recalculateLoadInTotalPanic();
328
34216
  }
329
68527
}
330

            
331
// recalculateLoadInTotalPanic method is called when all priority levels
332
// are in panic mode. The load distribution is done NOT based on number
333
// of healthy hosts in the priority, but based on number of hosts
334
// in each priority regardless of its health.
335
34216
void LoadBalancerBase::recalculateLoadInTotalPanic() {
336
  // First calculate total number of hosts across all priorities regardless
337
  // whether they are healthy or not.
338
34216
  const uint32_t total_hosts_count =
339
34216
      std::accumulate(priority_set_.hostSetsPerPriority().begin(),
340
34216
                      priority_set_.hostSetsPerPriority().end(), static_cast<size_t>(0),
341
34566
                      [](size_t acc, const std::unique_ptr<Envoy::Upstream::HostSet>& host_set) {
342
34566
                        return acc + host_set->hosts().size();
343
34566
                      });
344

            
345
34216
  if (0 == total_hosts_count) {
346
    // Backend is empty, but load must be distributed somewhere.
347
33979
    per_priority_load_.healthy_priority_load_.get()[0] = 100;
348
33979
    return;
349
33979
  }
350

            
351
  // Now iterate through all priority levels and calculate how much
352
  // load is supposed to go to each priority. In panic mode the calculation
353
  // is based not on the number of healthy hosts but based on the number of
354
  // total hosts in the priority.
355
237
  uint32_t total_load = 100;
356
237
  int32_t first_noempty = -1;
357
649
  for (size_t i = 0; i < per_priority_panic_.size(); i++) {
358
412
    const HostSet& host_set = *priority_set_.hostSetsPerPriority()[i];
359
412
    const auto hosts_num = host_set.hosts().size();
360

            
361
412
    if ((-1 == first_noempty) && (0 != hosts_num)) {
362
237
      first_noempty = i;
363
237
    }
364
412
    const uint32_t priority_load = 100 * hosts_num / total_hosts_count;
365
412
    per_priority_load_.healthy_priority_load_.get()[i] = priority_load;
366
412
    per_priority_load_.degraded_priority_load_.get()[i] = 0;
367
412
    total_load -= priority_load;
368
412
  }
369

            
370
  // Add the remaining load to the first not empty load.
371
237
  per_priority_load_.healthy_priority_load_.get()[first_noempty] += total_load;
372

            
373
  // The total load should come up to 100%.
374
237
  ASSERT(100 == std::accumulate(per_priority_load_.healthy_priority_load_.get().begin(),
375
237
                                per_priority_load_.healthy_priority_load_.get().end(), 0));
376
237
}
377

            
378
std::pair<HostSet&, LoadBalancerBase::HostAvailability>
379
3267480
LoadBalancerBase::chooseHostSet(LoadBalancerContext* context, uint64_t hash) const {
380
3267480
  if (context) {
381
66114
    const auto priority_loads = context->determinePriorityLoad(
382
66114
        priority_set_, per_priority_load_, Upstream::RetryPriority::defaultPriorityMapping);
383
66114
    const auto priority_and_source = choosePriority(hash, priority_loads.healthy_priority_load_,
384
66114
                                                    priority_loads.degraded_priority_load_);
385
66114
    return {*priority_set_.hostSetsPerPriority()[priority_and_source.first],
386
66114
            priority_and_source.second};
387
66114
  }
388

            
389
3201366
  const auto priority_and_source = choosePriority(hash, per_priority_load_.healthy_priority_load_,
390
3201366
                                                  per_priority_load_.degraded_priority_load_);
391
3201366
  return {*priority_set_.hostSetsPerPriority()[priority_and_source.first],
392
3201366
          priority_and_source.second};
393
3267480
}
394

            
395
3267448
uint64_t LoadBalancerBase::random(bool peeking) {
396
3267448
  if (peeking) {
397
133
    stashed_random_.push_back(random_.random());
398
133
    return stashed_random_.back();
399
3267315
  } else {
400
3267315
    if (!stashed_random_.empty()) {
401
92
      auto random = stashed_random_.front();
402
92
      stashed_random_.pop_front();
403
92
      return random;
404
3267226
    } else {
405
3267223
      return random_.random();
406
3267223
    }
407
3267315
  }
408
3267448
}
409

            
410
ZoneAwareLoadBalancerBase::ZoneAwareLoadBalancerBase(
411
    const PrioritySet& priority_set, const PrioritySet* local_priority_set, ClusterLbStats& stats,
412
    Runtime::Loader& runtime, Random::RandomGenerator& random, uint32_t healthy_panic_threshold,
413
    const absl::optional<LocalityLbConfig> locality_config)
414
33449
    : LoadBalancerBase(priority_set, stats, runtime, random, healthy_panic_threshold),
415
33449
      local_priority_set_(local_priority_set),
416
33449
      min_cluster_size_(locality_config.has_value()
417
33449
                            ? PROTOBUF_GET_WRAPPED_OR_DEFAULT(
418
33449
                                  locality_config->zone_aware_lb_config(), min_cluster_size, 6U)
419
33449
                            : 6U),
420
33449
      force_local_zone_min_size_([&]() -> absl::optional<uint32_t> {
421
        // Check runtime value first
422
33448
        if (auto rt = runtime_.snapshot().getInteger(RuntimeForceLocalZoneMinSize, 0); rt > 0) {
423
          return static_cast<uint32_t>(rt);
424
        }
425

            
426
        // ForceLocalZone proto field supersedes deprecated ForceLocalityDirectRouting
427
33448
        if (locality_config.has_value()) {
428
37
          if (locality_config->zone_aware_lb_config().has_force_local_zone()) {
429
2
            return PROTOBUF_GET_WRAPPED_OR_DEFAULT(
430
2
                locality_config->zone_aware_lb_config().force_local_zone(), min_size, 1U);
431
2
          }
432
35
          if (locality_config->zone_aware_lb_config().force_locality_direct_routing()) {
433
            return 1U;
434
          }
435
35
        }
436
33446
        return absl::nullopt;
437
33448
      }()),
438
33449
      routing_enabled_(locality_config.has_value()
439
33449
                           ? PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
440
33449
                                 locality_config->zone_aware_lb_config(), routing_enabled, 100, 100)
441
33449
                           : 100),
442
33449
      locality_basis_(locality_config.has_value()
443
33449
                          ? locality_config->zone_aware_lb_config().locality_basis()
444
33449
                          : LocalityLbConfig::ZoneAwareLbConfig::HEALTHY_HOSTS_NUM),
445
33449
      fail_traffic_on_panic_(locality_config.has_value()
446
33449
                                 ? locality_config->zone_aware_lb_config().fail_traffic_on_panic()
447
33449
                                 : false),
448
33449
      locality_weighted_balancing_(locality_config.has_value() &&
449
33449
                                   locality_config->has_locality_weighted_lb_config()) {
450
33449
  ASSERT(!priority_set.hostSetsPerPriority().empty());
451
33449
  resizePerPriorityState();
452
33449
  if (locality_weighted_balancing_) {
453
46
    for (uint32_t priority = 0; priority < priority_set_.hostSetsPerPriority().size(); ++priority) {
454
25
      rebuildLocalityWrrForPriority(priority);
455
25
    }
456
21
  }
457

            
458
33449
  if (Runtime::runtimeFeatureEnabled(
459
33449
          "envoy.reloadable_features.coalesce_lb_rebuilds_on_batch_update")) {
460
33447
    priority_update_cb_ = priority_set_.addPriorityUpdateCb(
461
34669
        [this](uint32_t priority, const HostVector&, const HostVector&) {
462
34273
          dirty_priorities_.insert(priority);
463
34273
        });
464
33447
    member_update_cb_ =
465
34199
        priority_set_.addMemberUpdateCb([this](const HostVector&, const HostVector&) {
466
33802
          resizePerPriorityState();
467
33802
          bool p0_changed = dirty_priorities_.contains(0);
468
33802
          if (local_priority_set_ && p0_changed) {
469
100
            regenerateLocalityRoutingStructures();
470
100
          }
471
33802
          if (locality_weighted_balancing_) {
472
39
            for (uint32_t priority : dirty_priorities_) {
473
39
              rebuildLocalityWrrForPriority(priority);
474
39
            }
475
39
          }
476
33802
          dirty_priorities_.clear();
477
33802
        });
478
33447
  } else {
479
2
    priority_update_cb_ = priority_set_.addPriorityUpdateCb(
480
2
        [this](uint32_t priority, const HostVector&, const HostVector&) {
481
2
          resizePerPriorityState();
482
2
          if (local_priority_set_ && priority == 0) {
483
            regenerateLocalityRoutingStructures();
484
          }
485
2
          if (locality_weighted_balancing_) {
486
1
            rebuildLocalityWrrForPriority(priority);
487
1
          }
488
2
        });
489
2
  }
490
33449
  if (local_priority_set_) {
491
    // Multiple priorities are unsupported for local priority sets.
492
    // In order to support priorities correctly, one would have to make some assumptions about
493
    // routing (all local Envoys fail over at the same time) and use all priorities when computing
494
    // the locality routing structure.
495
90
    ASSERT(local_priority_set_->hostSetsPerPriority().size() == 1);
496
90
    local_priority_set_member_update_cb_handle_ = local_priority_set_->addPriorityUpdateCb(
497
114
        [this](uint32_t priority, const HostVector&, const HostVector&) {
498
98
          ASSERT(priority == 0);
499
          // If the set of local Envoys changes, regenerate routing for P=0 as it does priority
500
          // based routing.
501
98
          regenerateLocalityRoutingStructures();
502
98
        });
503
90
  }
504
33449
}
505

            
506
65
void ZoneAwareLoadBalancerBase::rebuildLocalityWrrForPriority(uint32_t priority) {
507
65
  ASSERT(priority < priority_set_.hostSetsPerPriority().size());
508
65
  auto& host_set = *priority_set_.hostSetsPerPriority()[priority];
509
65
  per_priority_state_[priority]->locality_wrr_ =
510
65
      std::make_unique<LocalityWrr>(host_set, random_.random());
511
65
}
512

            
513
198
void ZoneAwareLoadBalancerBase::regenerateLocalityRoutingStructures() {
514
198
  ASSERT(local_priority_set_);
515
198
  stats_.lb_recalculate_zone_structures_.inc();
516
  // resizePerPriorityState should ensure these stay in sync.
517
198
  ASSERT(per_priority_state_.size() == priority_set_.hostSetsPerPriority().size());
518

            
519
  // We only do locality routing for P=0
520
198
  uint32_t priority = 0;
521
198
  PerPriorityState& state = *per_priority_state_[priority];
522
  // Do not perform any calculations if we cannot perform locality routing based on non runtime
523
  // params.
524
198
  if (earlyExitNonLocalityRouting()) {
525
115
    state.locality_routing_state_ = LocalityRoutingState::NoLocalityRouting;
526
115
    return;
527
115
  }
528
83
  HostSet& host_set = *priority_set_.hostSetsPerPriority()[priority];
529
83
  const HostsPerLocality& upstreamHostsPerLocality = host_set.healthyHostsPerLocality();
530
83
  const size_t num_upstream_localities = upstreamHostsPerLocality.get().size();
531
83
  ASSERT(num_upstream_localities >= 2);
532

            
533
  // It is worth noting that all of the percentages calculated are orthogonal from
534
  // how much load this priority level receives, percentageLoad(priority).
535
  //
536
  // If the host sets are such that 20% of load is handled locally and 80% is residual, and then
537
  // half the hosts in all host sets go unhealthy, this priority set will
538
  // still send half of the incoming load to the local locality and 80% to residual.
539
  //
540
  // Basically, fairness across localities within a priority is guaranteed. Fairness across
541
  // localities across priorities is not.
542
83
  const HostsPerLocality& localHostsPerLocality = localHostSet().healthyHostsPerLocality();
543
83
  auto locality_percentages =
544
83
      calculateLocalityPercentages(localHostsPerLocality, upstreamHostsPerLocality);
545

            
546
83
  if (upstreamHostsPerLocality.hasLocalLocality()) {
547
    // If we have lower percent of hosts in the local cluster in the same locality,
548
    // we can push all of the requests directly to upstream cluster in the same locality.
549
81
    if ((locality_percentages[0].upstream_percentage > 0 &&
550
81
         locality_percentages[0].upstream_percentage >= locality_percentages[0].local_percentage) ||
551
        // When force_local_zone is enabled, always use LocalityDirect routing if there are enough
552
        // healthy upstreams in the local locality as determined by force_local_zone_min_size is
553
        // met.
554
81
        (force_local_zone_min_size_.has_value() &&
555
75
         upstreamHostsPerLocality.get()[0].size() >= *force_local_zone_min_size_)) {
556
7
      state.locality_routing_state_ = LocalityRoutingState::LocalityDirect;
557
7
      return;
558
7
    }
559
81
  }
560

            
561
76
  state.locality_routing_state_ = LocalityRoutingState::LocalityResidual;
562

            
563
  // If we cannot route all requests to the same locality, calculate what percentage can be routed.
564
  // For example, if local percentage is 20% and upstream is 10%
565
  // we can route only 50% of requests directly.
566
  // Local percent can be 0% if there are no upstream hosts in the local locality.
567
76
  state.local_percent_to_route_ =
568
76
      upstreamHostsPerLocality.hasLocalLocality() && locality_percentages[0].local_percentage > 0
569
76
          ? locality_percentages[0].upstream_percentage * 10000 /
570
66
                locality_percentages[0].local_percentage
571
76
          : 0;
572

            
573
  // Local locality does not have additional capacity (we have already routed what we could).
574
  // Now we need to figure out how much traffic we can route cross locality and to which exact
575
  // locality we should route. Percentage of requests routed cross locality to a specific locality
576
  // needed be proportional to the residual capacity upstream locality has.
577
  //
578
  // residual_capacity contains capacity left in a given locality, we keep accumulating residual
579
  // capacity to make search for sampled value easier.
580
  // For example, if we have the following upstream and local percentage:
581
  // local_percentage: 40000 40000 20000
582
  // upstream_percentage: 25000 50000 25000
583
  // Residual capacity would look like: 0 10000 5000. Now we need to sample proportionally to
584
  // bucket sizes (residual capacity). For simplicity of finding where specific
585
  // sampled value is, we accumulate values in residual capacity. This is what it will look like:
586
  // residual_capacity: 0 10000 15000
587
  // Now to find a locality to route (bucket) we could simply iterate over residual_capacity
588
  // searching where sampled value is placed.
589
76
  state.residual_capacity_.resize(num_upstream_localities);
590
322
  for (uint64_t i = 0; i < num_upstream_localities; ++i) {
591
246
    uint64_t last_residual_capacity = i > 0 ? state.residual_capacity_[i - 1] : 0;
592
246
    LocalityPercentages this_locality_percentages = locality_percentages[i];
593
246
    if (i == 0 && upstreamHostsPerLocality.hasLocalLocality()) {
594
      // This is a local locality, we have already routed what we could.
595
74
      state.residual_capacity_[i] = last_residual_capacity;
596
74
      continue;
597
74
    }
598

            
599
    // Only route to the localities that have additional capacity.
600
172
    if (this_locality_percentages.upstream_percentage >
601
172
        this_locality_percentages.local_percentage) {
602
149
      state.residual_capacity_[i] = last_residual_capacity +
603
149
                                    this_locality_percentages.upstream_percentage -
604
149
                                    this_locality_percentages.local_percentage;
605
149
    } else {
606
      // Locality with index "i" does not have residual capacity, but we keep accumulating previous
607
      // values to make search easier on the next step.
608
23
      state.residual_capacity_[i] = last_residual_capacity;
609
23
    }
610
172
  }
611
76
}
612

            
613
67252
void ZoneAwareLoadBalancerBase::resizePerPriorityState() {
614
67252
  const uint32_t size = priority_set_.hostSetsPerPriority().size();
615
101072
  while (per_priority_state_.size() < size) {
616
    // Note for P!=0, PerPriorityState is created with NoLocalityRouting and never changed.
617
33820
    per_priority_state_.push_back(std::make_unique<PerPriorityState>());
618
33820
  }
619
67252
}
620

            
621
198
bool ZoneAwareLoadBalancerBase::earlyExitNonLocalityRouting() {
622
  // We only do locality routing for P=0.
623
198
  HostSet& host_set = *priority_set_.hostSetsPerPriority()[0];
624
198
  if (host_set.healthyHostsPerLocality().get().size() < 2) {
625
111
    return true;
626
111
  }
627

            
628
  // Do not perform locality routing if there are too few local localities for zone routing to have
629
  // an effect. Skipped when ForceLocalZone is enabled.
630
87
  if (!force_local_zone_min_size_.has_value() &&
631
87
      localHostSet().hostsPerLocality().get().size() < 2) {
632
1
    return true;
633
1
  }
634

            
635
  // Do not perform locality routing if the local cluster doesn't have any hosts in the current
636
  // envoy's local locality. This breaks our assumptions about the local cluster being correctly
637
  // configured, so we don't have enough information to perform locality routing. Note: If other
638
  // envoys do exist according to the local cluster, they will still be able to perform locality
639
  // routing correctly. This will not cause a traffic imbalance because other envoys will not know
640
  // about the current one, so they will not factor it into locality routing calculations.
641
86
  if (!localHostSet().hostsPerLocality().hasLocalLocality() ||
642
86
      localHostSet().hostsPerLocality().get()[0].empty()) {
643
2
    stats_.lb_local_cluster_not_ok_.inc();
644
2
    return true;
645
2
  }
646

            
647
  // Do not perform locality routing for small clusters.
648
84
  const uint64_t min_cluster_size =
649
84
      runtime_.snapshot().getInteger(RuntimeMinClusterSize, min_cluster_size_);
650
84
  if (host_set.healthyHosts().size() < min_cluster_size) {
651
1
    stats_.lb_zone_cluster_too_small_.inc();
652
1
    return true;
653
1
  }
654

            
655
83
  return false;
656
84
}
657

            
658
3267303
HostSelectionResponse ZoneAwareLoadBalancerBase::chooseHost(LoadBalancerContext* context) {
659
3267303
  HostConstSharedPtr host;
660

            
661
3267303
  const size_t max_attempts = context ? context->hostSelectionRetryCount() + 1 : 1;
662
3267319
  for (size_t i = 0; i < max_attempts; ++i) {
663
3267315
    host = chooseHostOnce(context);
664

            
665
    // If host selection failed or the host is accepted by the filter, return.
666
    // Otherwise, try again.
667
    // Note: in the future we might want to allow retrying when chooseHostOnce returns nullptr.
668
3267315
    if (!host || !context || !context->shouldSelectAnotherHost(*host)) {
669
3267299
      return host;
670
3267299
    }
671
3267315
  }
672

            
673
  // If we didn't find anything, return the last host.
674
4
  return host;
675
3267303
}
676

            
677
34730
bool LoadBalancerBase::isHostSetInPanic(const HostSet& host_set) const {
678
34730
  uint64_t global_panic_threshold = std::min<uint64_t>(
679
34730
      100, runtime_.snapshot().getInteger(RuntimePanicThreshold, default_healthy_panic_percent_));
680
34730
  const auto host_count = host_set.hosts().size() - host_set.excludedHosts().size();
681
34730
  double healthy_percent =
682
34730
      host_count == 0 ? 0.0 : 100.0 * host_set.healthyHosts().size() / host_count;
683

            
684
34730
  double degraded_percent =
685
34730
      host_count == 0 ? 0.0 : 100.0 * host_set.degradedHosts().size() / host_count;
686
  // If the % of healthy hosts in the cluster is less than our panic threshold, we use all hosts.
687
34730
  if ((healthy_percent + degraded_percent) < global_panic_threshold) {
688
34586
    return true;
689
34586
  }
690

            
691
144
  return false;
692
34730
}
693

            
694
absl::FixedArray<ZoneAwareLoadBalancerBase::LocalityPercentages>
695
ZoneAwareLoadBalancerBase::calculateLocalityPercentages(
696
    const HostsPerLocality& local_hosts_per_locality,
697
83
    const HostsPerLocality& upstream_hosts_per_locality) {
698
83
  absl::flat_hash_map<envoy::config::core::v3::Locality, uint64_t, LocalityHash, LocalityEqualTo>
699
83
      local_weights;
700
83
  absl::flat_hash_map<envoy::config::core::v3::Locality, uint64_t, LocalityHash, LocalityEqualTo>
701
83
      upstream_weights;
702
83
  uint64_t total_local_weight = 0;
703
242
  for (const auto& locality_hosts : local_hosts_per_locality.get()) {
704
242
    uint64_t locality_weight = 0;
705
242
    switch (locality_basis_) {
706
    // If locality_basis_ is set to HEALTHY_HOSTS_WEIGHT, it uses the host's weight to calculate the
707
    // locality percentage.
708
2
    case LocalityLbConfig::ZoneAwareLbConfig::HEALTHY_HOSTS_WEIGHT:
709
3
      for (const auto& host : locality_hosts) {
710
3
        locality_weight += host->weight();
711
3
      }
712
2
      break;
713
    // By default it uses the number of healthy hosts in the locality.
714
240
    case LocalityLbConfig::ZoneAwareLbConfig::HEALTHY_HOSTS_NUM:
715
240
      locality_weight = locality_hosts.size();
716
240
      break;
717
    default:
718
      PANIC_DUE_TO_CORRUPT_ENUM;
719
242
    }
720
242
    total_local_weight += locality_weight;
721
    // If there is no entry in the map for a given locality, it is assumed to have 0 hosts.
722
242
    if (!locality_hosts.empty()) {
723
241
      local_weights.emplace(locality_hosts[0]->locality(), locality_weight);
724
241
    }
725
242
  }
726
83
  uint64_t total_upstream_weight = 0;
727
263
  for (const auto& locality_hosts : upstream_hosts_per_locality.get()) {
728
263
    uint64_t locality_weight = 0;
729
263
    switch (locality_basis_) {
730
    // If locality_basis_ is set to HEALTHY_HOSTS_WEIGHT, it uses the host's weight to calculate the
731
    // locality percentage.
732
2
    case LocalityLbConfig::ZoneAwareLbConfig::HEALTHY_HOSTS_WEIGHT:
733
2
      for (const auto& host : locality_hosts) {
734
2
        locality_weight += host->weight();
735
2
      }
736
2
      break;
737
    // By default it uses the number of healthy hosts in the locality.
738
261
    case LocalityLbConfig::ZoneAwareLbConfig::HEALTHY_HOSTS_NUM:
739
261
      locality_weight = locality_hosts.size();
740
261
      break;
741
    default:
742
      PANIC_DUE_TO_CORRUPT_ENUM;
743
263
    }
744
263
    total_upstream_weight += locality_weight;
745
    // If there is no entry in the map for a given locality, it is assumed to have 0 hosts.
746
263
    if (!locality_hosts.empty()) {
747
245
      upstream_weights.emplace(locality_hosts[0]->locality(), locality_weight);
748
245
    }
749
263
  }
750

            
751
83
  absl::FixedArray<LocalityPercentages> percentages(upstream_hosts_per_locality.get().size());
752
346
  for (uint32_t i = 0; i < upstream_hosts_per_locality.get().size(); ++i) {
753
263
    const auto& upstream_hosts = upstream_hosts_per_locality.get()[i];
754
263
    if (upstream_hosts.empty()) {
755
      // If there are no upstream hosts in a given locality, the upstream percentage is 0.
756
      // We can't determine the locality of this group, so we can't find the corresponding local
757
      // count. However, if there are no upstream hosts in a locality, the local percentage doesn't
758
      // matter.
759
18
      percentages[i] = LocalityPercentages{0, 0};
760
18
      continue;
761
18
    }
762
245
    const auto& locality = upstream_hosts[0]->locality();
763

            
764
245
    const auto local_weight_it = local_weights.find(locality);
765
245
    const uint64_t local_weight =
766
245
        local_weight_it == local_weights.end() ? 0 : local_weight_it->second;
767
245
    const auto upstream_weight_it = upstream_weights.find(locality);
768
245
    const uint64_t upstream_weight =
769
245
        upstream_weight_it == upstream_weights.end() ? 0 : upstream_weight_it->second;
770

            
771
245
    const uint64_t local_percentage =
772
245
        total_local_weight > 0 ? 10000ULL * local_weight / total_local_weight : 0;
773
245
    const uint64_t upstream_percentage =
774
245
        total_upstream_weight > 0 ? 10000ULL * upstream_weight / total_upstream_weight : 0;
775

            
776
245
    percentages[i] = LocalityPercentages{local_percentage, upstream_percentage};
777
245
  }
778

            
779
83
  return percentages;
780
83
}
781

            
782
97
uint32_t ZoneAwareLoadBalancerBase::tryChooseLocalLocalityHosts(const HostSet& host_set) const {
783
97
  PerPriorityState& state = *per_priority_state_[host_set.priority()];
784
97
  ASSERT(state.locality_routing_state_ != LocalityRoutingState::NoLocalityRouting);
785

            
786
  // At this point it's guaranteed to be at least 2 localities in the upstream host set.
787
97
  const size_t number_of_localities = host_set.healthyHostsPerLocality().get().size();
788
97
  ASSERT(number_of_localities >= 2U);
789

            
790
  // Try to push all of the requests to the same locality if possible.
791
97
  if (state.locality_routing_state_ == LocalityRoutingState::LocalityDirect) {
792
17
    ASSERT(host_set.healthyHostsPerLocality().hasLocalLocality());
793
17
    stats_.lb_zone_routing_all_directly_.inc();
794
17
    return 0;
795
17
  }
796

            
797
80
  ASSERT(state.locality_routing_state_ == LocalityRoutingState::LocalityResidual);
798
80
  ASSERT(host_set.healthyHostsPerLocality().hasLocalLocality() ||
799
80
         state.local_percent_to_route_ == 0);
800

            
801
  // If we cannot route all requests to the same locality, we already calculated how much we can
802
  // push to the local locality, check if we can push to local locality on current iteration.
803
80
  if (random_.random() % 10000 < state.local_percent_to_route_) {
804
24
    stats_.lb_zone_routing_sampled_.inc();
805
24
    return 0;
806
24
  }
807

            
808
  // At this point we must route cross locality as we cannot route to the local locality.
809
56
  stats_.lb_zone_routing_cross_zone_.inc();
810

            
811
  // This is *extremely* unlikely but possible due to rounding errors when calculating
812
  // locality percentages. In this case just select random locality.
813
56
  if (state.residual_capacity_[number_of_localities - 1] == 0) {
814
1
    stats_.lb_zone_no_capacity_left_.inc();
815
1
    return random_.random() % number_of_localities;
816
1
  }
817

            
818
  // Random sampling to select specific locality for cross locality traffic based on the
819
  // additional capacity in localities.
820
55
  uint64_t threshold = random_.random() % state.residual_capacity_[number_of_localities - 1];
821

            
822
  // This potentially can be optimized to be O(log(N)) where N is the number of localities.
823
  // Linear scan should be faster for smaller N, in most of the scenarios N will be small.
824
  //
825
  // Bucket 1: [0, state.residual_capacity_[0] - 1]
826
  // Bucket 2: [state.residual_capacity_[0], state.residual_capacity_[1] - 1]
827
  // ...
828
  // Bucket N: [state.residual_capacity_[N-2], state.residual_capacity_[N-1] - 1]
829
55
  int i = 0;
830
155
  while (threshold >= state.residual_capacity_[i]) {
831
100
    i++;
832
100
  }
833

            
834
55
  return i;
835
56
}
836

            
837
absl::optional<ZoneAwareLoadBalancerBase::HostsSource>
838
3267448
ZoneAwareLoadBalancerBase::hostSourceToUse(LoadBalancerContext* context, uint64_t hash) const {
839
3267448
  auto host_set_and_source = chooseHostSet(context, hash);
840

            
841
  // The second argument tells us which availability we should target from the selected host set.
842
3267448
  const auto host_availability = host_set_and_source.second;
843
3267448
  auto& host_set = host_set_and_source.first;
844
3267448
  HostsSource hosts_source;
845
3267448
  hosts_source.priority_ = host_set.priority();
846

            
847
  // If the selected host set has insufficient healthy hosts, return all hosts (unless we should
848
  // fail traffic on panic, in which case return no host).
849
3267448
  if (per_priority_panic_[hosts_source.priority_]) {
850
66
    stats_.lb_healthy_panic_.inc();
851
66
    if (fail_traffic_on_panic_) {
852
6
      return absl::nullopt;
853
60
    } else {
854
60
      hosts_source.source_type_ = HostsSource::SourceType::AllHosts;
855
60
      return hosts_source;
856
60
    }
857
66
  }
858

            
859
  // If we're doing locality weighted balancing, pick locality.
860
  //
861
  // The chooseDegradedLocality or chooseHealthyLocality may return valid locality index
862
  // when the locality_weighted_lb_config is set or load balancing policy extension is used.
863
  // This if statement is to make sure we only do locality weighted balancing when the
864
  // locality_weighted_lb_config is set explicitly even the hostSourceToUse is called in the
865
  // load balancing policy extensions.
866
3267382
  if (locality_weighted_balancing_) {
867
1147
    absl::optional<uint32_t> locality;
868
1147
    if (host_availability == HostAvailability::Degraded) {
869
4
      locality = chooseDegradedLocality(host_set);
870
1143
    } else {
871
1143
      locality = chooseHealthyLocality(host_set);
872
1143
    }
873

            
874
1147
    if (locality.has_value()) {
875
1129
      auto source_type = localitySourceType(host_availability);
876
1129
      if (!source_type) {
877
        return absl::nullopt;
878
      }
879
1129
      hosts_source.source_type_ = source_type.value();
880
1129
      hosts_source.locality_index_ = locality.value();
881
1129
      return hosts_source;
882
1129
    }
883
1147
  }
884

            
885
  // If we've latched that we can't do locality-based routing, return healthy or degraded hosts
886
  // for the selected host set.
887
3266253
  if (per_priority_state_[host_set.priority()]->locality_routing_state_ ==
888
3266253
      LocalityRoutingState::NoLocalityRouting) {
889
3266153
    auto source_type = sourceType(host_availability);
890
3266153
    if (!source_type) {
891
      return absl::nullopt;
892
    }
893
3266153
    hosts_source.source_type_ = source_type.value();
894
3266153
    return hosts_source;
895
3266153
  }
896

            
897
  // Determine if the load balancer should do zone based routing for this pick.
898
100
  if (!runtime_.snapshot().featureEnabled(RuntimeZoneEnabled, routing_enabled_)) {
899
1
    auto source_type = sourceType(host_availability);
900
1
    if (!source_type) {
901
      return absl::nullopt;
902
    }
903
1
    hosts_source.source_type_ = source_type.value();
904
1
    return hosts_source;
905
1
  }
906

            
907
99
  if (isHostSetInPanic(localHostSet())) {
908
2
    stats_.lb_local_cluster_not_ok_.inc();
909
    // If the local Envoy instances are in global panic, and we should not fail traffic, do
910
    // not do locality based routing.
911
2
    if (fail_traffic_on_panic_) {
912
1
      return absl::nullopt;
913
1
    } else {
914
1
      auto source_type = sourceType(host_availability);
915
1
      if (!source_type) {
916
        return absl::nullopt;
917
      }
918
1
      hosts_source.source_type_ = source_type.value();
919
1
      return hosts_source;
920
1
    }
921
2
  }
922

            
923
97
  auto source_type = localitySourceType(host_availability);
924
97
  if (!source_type) {
925
    return absl::nullopt;
926
  }
927
97
  hosts_source.source_type_ = source_type.value();
928
97
  hosts_source.locality_index_ = tryChooseLocalLocalityHosts(host_set);
929
97
  return hosts_source;
930
97
}
931

            
932
2666457
const HostVector& ZoneAwareLoadBalancerBase::hostSourceToHosts(HostsSource hosts_source) const {
933
2666457
  const HostSet& host_set = *priority_set_.hostSetsPerPriority()[hosts_source.priority_];
934
2666457
  switch (hosts_source.source_type_) {
935
60
  case HostsSource::SourceType::AllHosts:
936
60
    return host_set.hosts();
937
2664929
  case HostsSource::SourceType::HealthyHosts:
938
2664929
    return host_set.healthyHosts();
939
242
  case HostsSource::SourceType::DegradedHosts:
940
242
    return host_set.degradedHosts();
941
1226
  case HostsSource::SourceType::LocalityHealthyHosts:
942
1226
    return host_set.healthyHostsPerLocality().get()[hosts_source.locality_index_];
943
  case HostsSource::SourceType::LocalityDegradedHosts:
944
    return host_set.degradedHostsPerLocality().get()[hosts_source.locality_index_];
945
2666457
  }
946
  PANIC_DUE_TO_CORRUPT_ENUM;
947
}
948

            
949
EdfLoadBalancerBase::EdfLoadBalancerBase(
950
    const PrioritySet& priority_set, const PrioritySet* local_priority_set, ClusterLbStats& stats,
951
    Runtime::Loader& runtime, Random::RandomGenerator& random, uint32_t healthy_panic_threshold,
952
    const absl::optional<LocalityLbConfig> locality_config,
953
    const absl::optional<SlowStartConfig> slow_start_config, TimeSource& time_source)
954
33271
    : ZoneAwareLoadBalancerBase(priority_set, local_priority_set, stats, runtime, random,
955
33271
                                healthy_panic_threshold, locality_config),
956
33271
      seed_(random_.random()),
957
33271
      slow_start_window_(slow_start_config.has_value()
958
33271
                             ? std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
959
19
                                   slow_start_config.value().slow_start_window()))
960
33271
                             : std::chrono::milliseconds(0)),
961
      aggression_runtime_(
962
33271
          slow_start_config.has_value() && slow_start_config.value().has_aggression()
963
33271
              ? absl::optional<Runtime::Double>({slow_start_config.value().aggression(), runtime})
964
33271
              : absl::nullopt),
965
33271
      time_source_(time_source), latest_host_added_time_(time_source_.monotonicTime()),
966
33271
      slow_start_min_weight_percent_(slow_start_config.has_value()
967
33271
                                         ? PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(
968
19
                                               slow_start_config.value(), min_weight_percent, 10) /
969
19
                                               100.0
970
33271
                                         : 0.1) {
971
  // We fully recompute the schedulers for a given host set here on membership change, which is
972
  // consistent with what other LB implementations do (e.g. thread aware).
973
  // The downside of a full recompute is that time complexity is O(n * log n),
974
  // so we will need to do better at delta tracking to scale (see
975
  // https://github.com/envoyproxy/envoy/issues/2874).
976

            
977
33271
  if (Runtime::runtimeFeatureEnabled(
978
33271
          "envoy.reloadable_features.coalesce_lb_rebuilds_on_batch_update")) {
979
33270
    priority_update_cb_ = priority_set.addPriorityUpdateCb(
980
34453
        [this](uint32_t priority, const HostVector&, const HostVector&) {
981
34064
          dirty_priorities_.insert(priority);
982
34064
        });
983
33270
    member_update_cb_ =
984
34016
        priority_set.addMemberUpdateCb([this](const HostVector& hosts_added, const HostVector&) {
985
33631
          for (uint32_t priority : dirty_priorities_) {
986
33631
            refresh(priority);
987
33631
          }
988
33627
          dirty_priorities_.clear();
989
33627
          if (isSlowStartEnabled()) {
990
78
            recalculateHostsInSlowStart(hosts_added);
991
78
          }
992
33627
        });
993
33270
  } else {
994
1
    priority_update_cb_ = priority_set.addPriorityUpdateCb(
995
1
        [this](uint32_t priority, const HostVector&, const HostVector&) { refresh(priority); });
996
1
    member_update_cb_ =
997
1
        priority_set.addMemberUpdateCb([this](const HostVector& hosts_added, const HostVector&) {
998
1
          if (isSlowStartEnabled()) {
999
1
            recalculateHostsInSlowStart(hosts_added);
1
          }
1
        });
1
  }
33271
}
33270
void EdfLoadBalancerBase::initialize() {
66706
  for (uint32_t priority = 0; priority < priority_set_.hostSetsPerPriority().size(); ++priority) {
33436
    refresh(priority);
33436
  }
33270
}
415
void EdfLoadBalancerBase::recalculateHostsInSlowStart(const HostVector& hosts) {
  // TODO(nezdolik): linear scan can be improved with using flat hash set for hosts in slow start.
415
  for (const auto& host : hosts) {
324
    auto current_time = time_source_.monotonicTime();
    // Host enters slow start if only it has transitioned into healthy state.
324
    if (host->coarseHealth() == Upstream::Host::Health::Healthy) {
282
      auto host_last_hc_pass_time =
282
          host->lastHcPassTime() ? host->lastHcPassTime().value() : current_time;
282
      auto in_healthy_state_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
282
          current_time - host_last_hc_pass_time);
      // If there is no active HC enabled or HC has not run, start slow start window from current
      // time.
282
      if (!host->lastHcPassTime()) {
33
        host->setLastHcPassTime(std::move(current_time));
33
      }
      // Check if host existence time is within slow start window.
282
      if (host_last_hc_pass_time > latest_host_added_time_ &&
282
          in_healthy_state_duration <= slow_start_window_) {
30
        latest_host_added_time_ = host_last_hc_pass_time;
30
      }
282
    }
324
  }
415
}
67145
void EdfLoadBalancerBase::refresh(uint32_t priority) {
  // Ensure that priority is within hostSetsPerPriority.
67145
  if (priority >= priority_set_.hostSetsPerPriority().size()) {
1
    return;
1
  }
267934
  const auto add_hosts_source = [this](HostsSource source, const HostVector& hosts) {
    // Nuke existing scheduler if it exists.
267934
    auto& scheduler = scheduler_[source] = Scheduler{};
267934
    refreshHostSource(source);
267934
    if (isSlowStartEnabled()) {
336
      recalculateHostsInSlowStart(hosts);
336
    }
    // Check if the original host weights are equal and no hosts are in slow start mode, in that
    // case EDF creation is skipped. When all original weights are equal and no hosts are in slow
    // start mode we can rely on unweighted host pick to do optimal round robin and least-loaded
    // host selection with lower memory and CPU overhead.
267936
    if (hostWeightsAreEqual(hosts) && noHostsAreInSlowStart()) {
      // Skip edf creation.
267485
      return;
267485
    }
    // If there are no hosts or a single one, there is no need for an EDF scheduler
    // (thus lowering memory and CPU overhead), as the (possibly) single host
    // will be the one always selected by the scheduler.
450
    if (hosts.size() <= 1) {
200
      return;
200
    }
    // Populate the scheduler with the host list with a randomized starting point.
    // TODO(mattklein123): We must build the EDF schedule even if all of the hosts are currently
    // weighted 1. This is because currently we don't refresh host sets if only weights change.
    // We should probably change this to refresh at all times. See the comment in
    // BaseDynamicClusterImpl::updateDynamicHostList about this.
250
    scheduler.edf_ = std::make_unique<EdfScheduler<Host>>(EdfScheduler<Host>::createWithPicks(
250
        hosts,
        // We use a fixed weight here. While the weight may change without
        // notification, this will only be stale until this host is next picked,
        // at which point it is reinserted into the EdfScheduler with its new
        // weight in chooseHost().
780
        [this](const Host& host) { return hostWeight(host); }, seed_));
250
  };
  // Populate EdfSchedulers for each valid HostsSource value for the host set at this priority.
67144
  const auto& host_set = priority_set_.hostSetsPerPriority()[priority];
67144
  add_hosts_source(HostsSource(priority, HostsSource::SourceType::AllHosts), host_set->hosts());
67144
  add_hosts_source(HostsSource(priority, HostsSource::SourceType::HealthyHosts),
67144
                   host_set->healthyHosts());
67144
  add_hosts_source(HostsSource(priority, HostsSource::SourceType::DegradedHosts),
67144
                   host_set->degradedHosts());
67144
  for (uint32_t locality_index = 0;
100779
       locality_index < host_set->healthyHostsPerLocality().get().size(); ++locality_index) {
33635
    add_hosts_source(
33635
        HostsSource(priority, HostsSource::SourceType::LocalityHealthyHosts, locality_index),
33635
        host_set->healthyHostsPerLocality().get()[locality_index]);
33635
  }
67144
  for (uint32_t locality_index = 0;
100012
       locality_index < host_set->degradedHostsPerLocality().get().size(); ++locality_index) {
32868
    add_hosts_source(
32868
        HostsSource(priority, HostsSource::SourceType::LocalityDegradedHosts, locality_index),
32868
        host_set->degradedHostsPerLocality().get()[locality_index]);
32868
  }
67144
}
1171117
bool EdfLoadBalancerBase::isSlowStartEnabled() const {
1171117
  return slow_start_window_ > std::chrono::milliseconds(0);
1171117
}
869555
bool EdfLoadBalancerBase::noHostsAreInSlowStart() const {
869556
  if (!isSlowStartEnabled()) {
868503
    return true;
868503
  }
1053
  auto current_time = time_source_.monotonicTime();
1053
  if (std::chrono::duration_cast<std::chrono::milliseconds>(
1053
          current_time - latest_host_added_time_) <= slow_start_window_) {
1022
    return false;
1022
  }
31
  return true;
1053
}
151
HostConstSharedPtr EdfLoadBalancerBase::peekAnotherHost(LoadBalancerContext* context) {
151
  if (tooManyPreconnects(stashed_random_.size(), total_healthy_hosts_)) {
22
    return nullptr;
22
  }
129
  const absl::optional<HostsSource> hosts_source = hostSourceToUse(context, random(true));
129
  if (!hosts_source) {
    return nullptr;
  }
129
  auto scheduler_it = scheduler_.find(*hosts_source);
  // We should always have a scheduler for any return value from
  // hostSourceToUse() via the construction in refresh();
129
  ASSERT(scheduler_it != scheduler_.end());
129
  auto& scheduler = scheduler_it->second;
  // As has been commented in both EdfLoadBalancerBase::refresh and
  // BaseDynamicClusterImpl::updateDynamicHostList, we must do a runtime pivot here to determine
  // whether to use EDF or do unweighted (fast) selection. EDF is non-null iff the original
  // weights of 2 or more hosts differ.
129
  if (scheduler.edf_ != nullptr) {
    return scheduler.edf_->peekAgain([this](const Host& host) { return hostWeight(host); });
129
  } else {
129
    const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source);
129
    if (hosts_to_use.empty()) {
      return nullptr;
    }
129
    return unweightedHostPeek(hosts_to_use, *hosts_source);
129
  }
129
}
3250615
HostConstSharedPtr EdfLoadBalancerBase::chooseHostOnce(LoadBalancerContext* context) {
3250615
  const absl::optional<HostsSource> hosts_source = hostSourceToUse(context, random(false));
3250615
  if (!hosts_source) {
5
    return nullptr;
5
  }
3250610
  auto scheduler_it = scheduler_.find(*hosts_source);
  // We should always have a scheduler for any return value from
  // hostSourceToUse() via the construction in refresh();
3250610
  ASSERT(scheduler_it != scheduler_.end());
3250610
  auto& scheduler = scheduler_it->second;
  // As has been commented in both EdfLoadBalancerBase::refresh and
  // BaseDynamicClusterImpl::updateDynamicHostList, we must do a runtime pivot here to determine
  // whether to use EDF or do unweighted (fast) selection. EDF is non-null iff the original
  // weights of 2 or more hosts differ.
3250610
  if (scheduler.edf_ != nullptr) {
600984
    auto host = scheduler.edf_->pickAndAdd([this](const Host& host) { return hostWeight(host); });
600984
    return host;
2649763
  } else {
2649626
    const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source);
2649626
    if (hosts_to_use.empty()) {
44
      return nullptr;
44
    }
2649582
    return unweightedHostPick(hosts_to_use, *hosts_source);
2649626
  }
3250610
}
namespace {
518
double applyAggressionFactor(double time_factor, double aggression) {
518
  if (aggression == 1.0 || time_factor == 1.0) {
414
    return time_factor;
438
  } else {
104
    return std::pow(time_factor, 1.0 / aggression);
104
  }
518
}
} // namespace
716
double EdfLoadBalancerBase::applySlowStartFactor(double host_weight, const Host& host) const {
  // We can reliably apply slow start weight only if `last_hc_pass_time` in host has been populated
  // either by active HC or by `member_update_cb_` in `EdfLoadBalancerBase`.
716
  if (host.lastHcPassTime() && host.coarseHealth() == Upstream::Host::Health::Healthy) {
658
    auto in_healthy_state_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
658
        time_source_.monotonicTime() - host.lastHcPassTime().value());
658
    if (in_healthy_state_duration < slow_start_window_) {
518
      double aggression =
518
          aggression_runtime_ != absl::nullopt ? aggression_runtime_.value().value() : 1.0;
518
      if (aggression <= 0.0 || std::isnan(aggression)) {
        ENVOY_LOG_EVERY_POW_2(error, "Invalid runtime value provided for aggression parameter, "
                                     "aggression cannot be less than 0.0");
        aggression = 1.0;
      }
518
      ASSERT(aggression > 0.0);
518
      auto time_factor = static_cast<double>(std::max(std::chrono::milliseconds(1).count(),
518
                                                      in_healthy_state_duration.count())) /
518
                         slow_start_window_.count();
518
      return host_weight * std::max(applyAggressionFactor(time_factor, aggression),
518
                                    slow_start_min_weight_percent_);
518
    } else {
140
      return host_weight;
140
    }
658
  } else {
58
    return host_weight;
58
  }
716
}
} // namespace Upstream
} // namespace Envoy