Line data Source code
1 : #include "load_balancer_impl.h"
2 : #include "source/common/upstream/load_balancer_impl.h"
3 :
4 : #include <atomic>
5 : #include <bitset>
6 : #include <cstdint>
7 : #include <map>
8 : #include <memory>
9 : #include <string>
10 : #include <vector>
11 :
12 : #include "envoy/config/cluster/v3/cluster.pb.h"
13 : #include "envoy/config/core/v3/base.pb.h"
14 : #include "envoy/runtime/runtime.h"
15 : #include "envoy/upstream/upstream.h"
16 :
17 : #include "source/common/common/assert.h"
18 : #include "source/common/common/logger.h"
19 : #include "source/common/protobuf/utility.h"
20 : #include "source/common/runtime/runtime_features.h"
21 :
22 : #include "absl/container/fixed_array.h"
23 :
24 : namespace Envoy {
25 : namespace Upstream {
26 :
27 : namespace {
28 : static const std::string RuntimeZoneEnabled = "upstream.zone_routing.enabled";
29 : static const std::string RuntimeMinClusterSize = "upstream.zone_routing.min_cluster_size";
30 : static const std::string RuntimePanicThreshold = "upstream.healthy_panic_threshold";
31 :
32 499 : bool tooManyPreconnects(size_t num_preconnect_picks, uint32_t healthy_hosts) {
33 : // Currently we only allow the number of preconnected connections to equal the
34 : // number of healthy hosts.
35 499 : return num_preconnect_picks >= healthy_hosts;
36 499 : }
37 :
38 : // Distributes load between priorities based on the per priority availability and the normalized
39 : // total availability. Load is assigned to each priority according to how available each priority is
40 : // adjusted for the normalized total availability.
41 : //
42 : // @param per_priority_load vector of loads that should be populated.
43 : // @param per_priority_availability the percentage availability of each priority, used to determine
44 : // how much load each priority can handle.
45 : // @param total_load the amount of load that may be distributed. Will be updated with the amount of
46 : // load remaining after distribution.
47 : // @param normalized_total_availability the total availability, up to a max of 100. Used to
48 : // scale the load when the total availability is less than 100%.
49 : // @return the first available priority and the remaining load
50 : std::pair<int32_t, size_t> distributeLoad(PriorityLoad& per_priority_load,
51 : const PriorityAvailability& per_priority_availability,
52 938 : size_t total_load, size_t normalized_total_availability) {
53 938 : int32_t first_available_priority = -1;
54 2212 : for (size_t i = 0; i < per_priority_availability.get().size(); ++i) {
55 1274 : if (first_available_priority < 0 && per_priority_availability.get()[i] > 0) {
56 493 : first_available_priority = i;
57 493 : }
58 : // Now assign as much load as possible to the high priority levels and cease assigning load
59 : // when total_load runs out.
60 1274 : per_priority_load.get()[i] = std::min<uint32_t>(
61 1274 : total_load, per_priority_availability.get()[i] * 100 / normalized_total_availability);
62 1274 : total_load -= per_priority_load.get()[i];
63 1274 : }
64 :
65 938 : return {first_available_priority, total_load};
66 938 : }
67 :
68 : // Returns true if the weights of all the hosts in the HostVector are equal.
69 4782 : bool hostWeightsAreEqual(const HostVector& hosts) {
70 4782 : if (hosts.size() <= 1) {
71 4157 : return true;
72 4157 : }
73 625 : const uint32_t weight = hosts[0]->weight();
74 14050 : for (size_t i = 1; i < hosts.size(); ++i) {
75 13805 : if (hosts[i]->weight() != weight) {
76 380 : return false;
77 380 : }
78 13805 : }
79 245 : return true;
80 625 : }
81 :
82 : } // namespace
83 :
84 : absl::optional<envoy::extensions::load_balancing_policies::common::v3::LocalityLbConfig>
85 : LoadBalancerConfigHelper::localityLbConfigFromCommonLbConfig(
86 613 : const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) {
87 :
88 613 : if (common_config.has_locality_weighted_lb_config()) {
89 25 : envoy::extensions::load_balancing_policies::common::v3::LocalityLbConfig locality_lb_config;
90 25 : locality_lb_config.mutable_locality_weighted_lb_config();
91 25 : return locality_lb_config;
92 588 : } else if (common_config.has_zone_aware_lb_config()) {
93 18 : envoy::extensions::load_balancing_policies::common::v3::LocalityLbConfig locality_lb_config;
94 18 : auto& zone_aware_lb_config = *locality_lb_config.mutable_zone_aware_lb_config();
95 :
96 18 : const auto& legacy_zone_aware_lb_config = common_config.zone_aware_lb_config();
97 18 : if (legacy_zone_aware_lb_config.has_routing_enabled()) {
98 2 : *zone_aware_lb_config.mutable_routing_enabled() =
99 2 : legacy_zone_aware_lb_config.routing_enabled();
100 2 : }
101 18 : if (legacy_zone_aware_lb_config.has_min_cluster_size()) {
102 1 : *zone_aware_lb_config.mutable_min_cluster_size() =
103 1 : legacy_zone_aware_lb_config.min_cluster_size();
104 1 : }
105 18 : zone_aware_lb_config.set_fail_traffic_on_panic(
106 18 : legacy_zone_aware_lb_config.fail_traffic_on_panic());
107 :
108 18 : return locality_lb_config;
109 18 : }
110 :
111 570 : return {};
112 613 : }
113 :
114 : std::pair<uint32_t, LoadBalancerBase::HostAvailability>
115 : LoadBalancerBase::choosePriority(uint64_t hash, const HealthyLoad& healthy_per_priority_load,
116 977 : const DegradedLoad& degraded_per_priority_load) {
117 977 : hash = hash % 100 + 1; // 1-100
118 977 : uint32_t aggregate_percentage_load = 0;
119 : // As with tryChooseLocalLocalityHosts, this can be refactored for efficiency
120 : // but O(N) is good enough for now given the expected number of priorities is
121 : // small.
122 :
123 : // We first attempt to select a priority based on healthy availability.
124 1194 : for (size_t priority = 0; priority < healthy_per_priority_load.get().size(); ++priority) {
125 1155 : aggregate_percentage_load += healthy_per_priority_load.get()[priority];
126 1155 : if (hash <= aggregate_percentage_load) {
127 938 : return {static_cast<uint32_t>(priority), HostAvailability::Healthy};
128 938 : }
129 1155 : }
130 :
131 : // If no priorities were selected due to health, we'll select a priority based degraded
132 : // availability.
133 39 : for (size_t priority = 0; priority < degraded_per_priority_load.get().size(); ++priority) {
134 39 : aggregate_percentage_load += degraded_per_priority_load.get()[priority];
135 39 : if (hash <= aggregate_percentage_load) {
136 39 : return {static_cast<uint32_t>(priority), HostAvailability::Degraded};
137 39 : }
138 39 : }
139 :
140 : // The percentages should always add up to 100 but we have to have a return for the compiler.
141 0 : IS_ENVOY_BUG("unexpected load error");
142 0 : return {0, HostAvailability::Healthy};
143 39 : }
144 :
145 : LoadBalancerBase::LoadBalancerBase(const PrioritySet& priority_set, ClusterLbStats& stats,
146 : Runtime::Loader& runtime, Random::RandomGenerator& random,
147 : uint32_t healthy_panic_threshold)
148 : : stats_(stats), runtime_(runtime), random_(random),
149 613 : default_healthy_panic_percent_(healthy_panic_threshold), priority_set_(priority_set) {
150 888 : for (auto& host_set : priority_set_.hostSetsPerPriority()) {
151 888 : recalculatePerPriorityState(host_set->priority(), priority_set_, per_priority_load_,
152 888 : per_priority_health_, per_priority_degraded_, total_healthy_hosts_);
153 888 : }
154 : // Recalculate panic mode for all levels.
155 613 : recalculatePerPriorityPanic();
156 :
157 613 : priority_update_cb_ = priority_set_.addPriorityUpdateCb(
158 635 : [this](uint32_t priority, const HostVector&, const HostVector&) -> void {
159 620 : recalculatePerPriorityState(priority, priority_set_, per_priority_load_,
160 620 : per_priority_health_, per_priority_degraded_,
161 620 : total_healthy_hosts_);
162 620 : recalculatePerPriorityPanic();
163 620 : stashed_random_.clear();
164 620 : });
165 613 : }
166 :
167 : // The following cases are handled by
168 : // recalculatePerPriorityState and recalculatePerPriorityPanic methods (normalized total health is
169 : // sum of all priorities' health values and capped at 100).
170 : // - normalized total health is = 100%. It means there are enough healthy hosts to handle the load.
171 : // Do not enter panic mode, even if a specific priority has low number of healthy hosts.
172 : // - normalized total health is < 100%. There are not enough healthy hosts to handle the load.
173 : // Continue distributing the load among priority sets, but turn on panic mode for a given priority
174 : // if # of healthy hosts in priority set is low.
175 : // - all host sets are in panic mode. Situation called TotalPanic. Load distribution is
176 : // calculated based on the number of hosts in each priority regardless of their health.
177 : // - all hosts in all priorities are down (normalized total health is 0%). If panic
178 : // threshold > 0% the cluster is in TotalPanic (see above). If panic threshold == 0
179 : // then priorities are not in panic, but there are no healthy hosts to route to.
180 : // In this case just mark P=0 as recipient of 100% of the traffic (nothing will be routed
181 : // to P=0 anyways as there are no healthy hosts there).
182 : void LoadBalancerBase::recalculatePerPriorityState(uint32_t priority,
183 : const PrioritySet& priority_set,
184 : HealthyAndDegradedLoad& per_priority_load,
185 : HealthyAvailability& per_priority_health,
186 : DegradedAvailability& per_priority_degraded,
187 1508 : uint32_t& total_healthy_hosts) {
188 1508 : per_priority_load.healthy_priority_load_.get().resize(priority_set.hostSetsPerPriority().size());
189 1508 : per_priority_load.degraded_priority_load_.get().resize(priority_set.hostSetsPerPriority().size());
190 1508 : per_priority_health.get().resize(priority_set.hostSetsPerPriority().size());
191 1508 : per_priority_degraded.get().resize(priority_set.hostSetsPerPriority().size());
192 1508 : total_healthy_hosts = 0;
193 :
194 : // Determine the health of the newly modified priority level.
195 : // Health ranges from 0-100, and is the ratio of healthy/degraded hosts to total hosts, modified
196 : // by the overprovisioning factor.
197 1508 : HostSet& host_set = *priority_set.hostSetsPerPriority()[priority];
198 1508 : per_priority_health.get()[priority] = 0;
199 1508 : per_priority_degraded.get()[priority] = 0;
200 1508 : const auto host_count = host_set.hosts().size() - host_set.excludedHosts().size();
201 :
202 1508 : if (host_count > 0) {
203 1082 : uint64_t healthy_weight = 0;
204 1082 : uint64_t degraded_weight = 0;
205 1082 : uint64_t total_weight = 0;
206 1082 : if (host_set.weightedPriorityHealth()) {
207 0 : for (const auto& host : host_set.healthyHosts()) {
208 0 : healthy_weight += host->weight();
209 0 : }
210 :
211 0 : for (const auto& host : host_set.degradedHosts()) {
212 0 : degraded_weight += host->weight();
213 0 : }
214 :
215 0 : for (const auto& host : host_set.hosts()) {
216 0 : total_weight += host->weight();
217 0 : }
218 :
219 0 : uint64_t excluded_weight = 0;
220 0 : for (const auto& host : host_set.excludedHosts()) {
221 0 : excluded_weight += host->weight();
222 0 : }
223 0 : ASSERT(total_weight >= excluded_weight);
224 0 : total_weight -= excluded_weight;
225 1082 : } else {
226 1082 : healthy_weight = host_set.healthyHosts().size();
227 1082 : degraded_weight = host_set.degradedHosts().size();
228 1082 : total_weight = host_count;
229 1082 : }
230 : // Each priority level's health is ratio of healthy hosts to total number of hosts in a
231 : // priority multiplied by overprovisioning factor of 1.4 and capped at 100%. It means that if
232 : // all hosts are healthy that priority's health is 100%*1.4=140% and is capped at 100% which
233 : // results in 100%. If 80% of hosts are healthy, that priority's health is still 100%
234 : // (80%*1.4=112% and capped at 100%).
235 1082 : per_priority_health.get()[priority] =
236 1082 : std::min<uint32_t>(100,
237 : // NOLINTNEXTLINE(clang-analyzer-core.DivideZero)
238 1082 : (host_set.overprovisioningFactor() * healthy_weight / total_weight));
239 :
240 : // We perform the same computation for degraded hosts.
241 1082 : per_priority_degraded.get()[priority] = std::min<uint32_t>(
242 1082 : 100, (host_set.overprovisioningFactor() * degraded_weight / total_weight));
243 :
244 1082 : ENVOY_LOG(trace,
245 1082 : "recalculated priority state: priority level {}, healthy weight {}, total weight {}, "
246 1082 : "overprovision factor {}, healthy result {}, degraded result {}",
247 1082 : priority, healthy_weight, total_weight, host_set.overprovisioningFactor(),
248 1082 : per_priority_health.get()[priority], per_priority_degraded.get()[priority]);
249 1082 : }
250 :
251 : // Now that we've updated health for the changed priority level, we need to calculate percentage
252 : // load for all priority levels.
253 :
254 : // First, determine if the load needs to be scaled relative to availability (healthy + degraded).
255 : // For example if there are 3 host sets with 10% / 20% / 10% health and 20% / 10% / 0% degraded
256 : // they will get 16% / 28% / 14% load to healthy hosts and 28% / 14% / 0% load to degraded hosts
257 : // to ensure total load adds up to 100. Note the first healthy priority is receiving 2% additional
258 : // load due to rounding.
259 : //
260 : // Sum of priority levels' health and degraded values may exceed 100, so it is capped at 100 and
261 : // referred as normalized total availability.
262 1508 : const uint32_t normalized_total_availability =
263 1508 : calculateNormalizedTotalAvailability(per_priority_health, per_priority_degraded);
264 1508 : if (normalized_total_availability == 0) {
265 : // Everything is terrible. There is nothing to calculate here.
266 : // Let recalculatePerPriorityPanic and recalculateLoadInTotalPanic deal with
267 : // load calculation.
268 1039 : return;
269 1039 : }
270 :
271 : // We start of with a total load of 100 and distribute it between priorities based on
272 : // availability. We first attempt to distribute this load to healthy priorities based on healthy
273 : // availability.
274 469 : const auto first_healthy_and_remaining =
275 469 : distributeLoad(per_priority_load.healthy_priority_load_, per_priority_health, 100,
276 469 : normalized_total_availability);
277 :
278 : // Using the remaining load after allocating load to healthy priorities, distribute it based on
279 : // degraded availability.
280 469 : const auto remaining_load_for_degraded = first_healthy_and_remaining.second;
281 469 : const auto first_degraded_and_remaining =
282 469 : distributeLoad(per_priority_load.degraded_priority_load_, per_priority_degraded,
283 469 : remaining_load_for_degraded, normalized_total_availability);
284 :
285 : // Anything that remains should just be rounding errors, so allocate that to the first available
286 : // priority, either as healthy or degraded.
287 469 : const auto remaining_load = first_degraded_and_remaining.second;
288 469 : if (remaining_load != 0) {
289 12 : const auto first_healthy = first_healthy_and_remaining.first;
290 12 : const auto first_degraded = first_degraded_and_remaining.first;
291 12 : ASSERT(first_healthy != -1 || first_degraded != -1);
292 :
293 : // Attempt to allocate the remainder to the first healthy priority first. If no such priority
294 : // exist, allocate to the first degraded priority.
295 12 : ASSERT(remaining_load < per_priority_load.healthy_priority_load_.get().size() +
296 12 : per_priority_load.degraded_priority_load_.get().size());
297 12 : if (first_healthy != -1) {
298 12 : per_priority_load.healthy_priority_load_.get()[first_healthy] += remaining_load;
299 12 : } else {
300 0 : per_priority_load.degraded_priority_load_.get()[first_degraded] += remaining_load;
301 0 : }
302 12 : }
303 :
304 : // The allocated load between healthy and degraded should be exactly 100.
305 469 : ASSERT(100 == std::accumulate(per_priority_load.healthy_priority_load_.get().begin(),
306 469 : per_priority_load.healthy_priority_load_.get().end(), 0) +
307 469 : std::accumulate(per_priority_load.degraded_priority_load_.get().begin(),
308 469 : per_priority_load.degraded_priority_load_.get().end(), 0));
309 :
310 637 : for (auto& host_set : priority_set.hostSetsPerPriority()) {
311 637 : total_healthy_hosts += host_set->healthyHosts().size();
312 637 : }
313 469 : }
314 :
315 : // Method iterates through priority levels and turns on/off panic mode.
316 1233 : void LoadBalancerBase::recalculatePerPriorityPanic() {
317 1233 : per_priority_panic_.resize(priority_set_.hostSetsPerPriority().size());
318 :
319 1233 : const uint32_t normalized_total_availability =
320 1233 : calculateNormalizedTotalAvailability(per_priority_health_, per_priority_degraded_);
321 :
322 1233 : const uint64_t panic_threshold = std::min<uint64_t>(
323 1233 : 100, runtime_.snapshot().getInteger(RuntimePanicThreshold, default_healthy_panic_percent_));
324 :
325 : // This is corner case when panic is disabled and there is no hosts available.
326 : // LoadBalancerBase::choosePriority method expects that the sum of
327 : // load percentages always adds up to 100.
328 : // To satisfy that requirement 100% is assigned to P=0.
329 : // In reality no traffic will be routed to P=0 priority, because
330 : // the panic mode is disabled and LoadBalancer will try to find
331 : // a healthy node and none is available.
332 1233 : if (panic_threshold == 0 && normalized_total_availability == 0) {
333 46 : per_priority_load_.healthy_priority_load_.get()[0] = 100;
334 46 : return;
335 46 : }
336 :
337 1187 : bool total_panic = true;
338 2923 : for (size_t i = 0; i < per_priority_health_.get().size(); ++i) {
339 : // For each level check if it should run in panic mode. Never set panic mode if
340 : // normalized total health is 100%, even when individual priority level has very low # of
341 : // healthy hosts.
342 1736 : const HostSet& priority_host_set = *priority_set_.hostSetsPerPriority()[i];
343 1736 : per_priority_panic_[i] =
344 1736 : (normalized_total_availability == 100 ? false : isHostSetInPanic(priority_host_set));
345 1736 : total_panic = total_panic && per_priority_panic_[i];
346 1736 : }
347 :
348 : // If all priority levels are in panic mode, load distribution
349 : // is done differently.
350 1187 : if (total_panic) {
351 752 : recalculateLoadInTotalPanic();
352 752 : }
353 1187 : }
354 :
355 : // recalculateLoadInTotalPanic method is called when all priority levels
356 : // are in panic mode. The load distribution is done NOT based on number
357 : // of healthy hosts in the priority, but based on number of hosts
358 : // in each priority regardless of its health.
359 752 : void LoadBalancerBase::recalculateLoadInTotalPanic() {
360 : // First calculate total number of hosts across all priorities regardless
361 : // whether they are healthy or not.
362 752 : const uint32_t total_hosts_count =
363 752 : std::accumulate(priority_set_.hostSetsPerPriority().begin(),
364 752 : priority_set_.hostSetsPerPriority().end(), static_cast<size_t>(0),
365 1168 : [](size_t acc, const std::unique_ptr<Envoy::Upstream::HostSet>& host_set) {
366 1168 : return acc + host_set->hosts().size();
367 1168 : });
368 :
369 752 : if (0 == total_hosts_count) {
370 : // Backend is empty, but load must be distributed somewhere.
371 312 : per_priority_load_.healthy_priority_load_.get()[0] = 100;
372 312 : return;
373 312 : }
374 :
375 : // Now iterate through all priority levels and calculate how much
376 : // load is supposed to go to each priority. In panic mode the calculation
377 : // is based not on the number of healthy hosts but based on the number of
378 : // total hosts in the priority.
379 440 : uint32_t total_load = 100;
380 440 : int32_t first_noempty = -1;
381 1290 : for (size_t i = 0; i < per_priority_panic_.size(); i++) {
382 850 : const HostSet& host_set = *priority_set_.hostSetsPerPriority()[i];
383 850 : const auto hosts_num = host_set.hosts().size();
384 :
385 850 : if ((-1 == first_noempty) && (0 != hosts_num)) {
386 440 : first_noempty = i;
387 440 : }
388 850 : const uint32_t priority_load = 100 * hosts_num / total_hosts_count;
389 850 : per_priority_load_.healthy_priority_load_.get()[i] = priority_load;
390 850 : per_priority_load_.degraded_priority_load_.get()[i] = 0;
391 850 : total_load -= priority_load;
392 850 : }
393 :
394 : // Add the remaining load to the first not empty load.
395 440 : per_priority_load_.healthy_priority_load_.get()[first_noempty] += total_load;
396 :
397 : // The total load should come up to 100%.
398 440 : ASSERT(100 == std::accumulate(per_priority_load_.healthy_priority_load_.get().begin(),
399 440 : per_priority_load_.healthy_priority_load_.get().end(), 0));
400 440 : }
401 :
402 : std::pair<HostSet&, LoadBalancerBase::HostAvailability>
403 977 : LoadBalancerBase::chooseHostSet(LoadBalancerContext* context, uint64_t hash) const {
404 977 : if (context) {
405 251 : const auto priority_loads = context->determinePriorityLoad(
406 251 : priority_set_, per_priority_load_, Upstream::RetryPriority::defaultPriorityMapping);
407 251 : const auto priority_and_source = choosePriority(hash, priority_loads.healthy_priority_load_,
408 251 : priority_loads.degraded_priority_load_);
409 251 : return {*priority_set_.hostSetsPerPriority()[priority_and_source.first],
410 251 : priority_and_source.second};
411 251 : }
412 :
413 726 : const auto priority_and_source = choosePriority(hash, per_priority_load_.healthy_priority_load_,
414 726 : per_priority_load_.degraded_priority_load_);
415 726 : return {*priority_set_.hostSetsPerPriority()[priority_and_source.first],
416 726 : priority_and_source.second};
417 977 : }
418 :
419 : ZoneAwareLoadBalancerBase::ZoneAwareLoadBalancerBase(
420 : const PrioritySet& priority_set, const PrioritySet* local_priority_set, ClusterLbStats& stats,
421 : Runtime::Loader& runtime, Random::RandomGenerator& random, uint32_t healthy_panic_threshold,
422 : const absl::optional<LocalityLbConfig> locality_config)
423 : : LoadBalancerBase(priority_set, stats, runtime, random, healthy_panic_threshold),
424 : local_priority_set_(local_priority_set),
425 : min_cluster_size_(locality_config.has_value()
426 : ? PROTOBUF_GET_WRAPPED_OR_DEFAULT(
427 : locality_config->zone_aware_lb_config(), min_cluster_size, 6U)
428 : : 6U),
429 : routing_enabled_(locality_config.has_value()
430 : ? PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
431 : locality_config->zone_aware_lb_config(), routing_enabled, 100, 100)
432 : : 100),
433 : fail_traffic_on_panic_(locality_config.has_value()
434 : ? locality_config->zone_aware_lb_config().fail_traffic_on_panic()
435 : : false),
436 : use_new_locality_routing_(Runtime::runtimeFeatureEnabled(
437 : "envoy.reloadable_features.locality_routing_use_new_routing_logic")),
438 : locality_weighted_balancing_(locality_config.has_value() &&
439 613 : locality_config->has_locality_weighted_lb_config()) {
440 613 : ASSERT(!priority_set.hostSetsPerPriority().empty());
441 613 : resizePerPriorityState();
442 613 : priority_update_cb_ = priority_set_.addPriorityUpdateCb(
443 635 : [this](uint32_t priority, const HostVector&, const HostVector&) -> void {
444 : // Make sure per_priority_state_ is as large as priority_set_.hostSetsPerPriority()
445 620 : resizePerPriorityState();
446 : // If P=0 changes, regenerate locality routing structures. Locality based routing is
447 : // disabled at all other levels.
448 620 : if (local_priority_set_ && priority == 0) {
449 82 : if (use_new_locality_routing_) {
450 82 : regenerateLocalityRoutingStructuresNew();
451 82 : } else {
452 0 : regenerateLocalityRoutingStructures();
453 0 : }
454 82 : }
455 620 : });
456 613 : if (local_priority_set_) {
457 : // Multiple priorities are unsupported for local priority sets.
458 : // In order to support priorities correctly, one would have to make some assumptions about
459 : // routing (all local Envoys fail over at the same time) and use all priorities when computing
460 : // the locality routing structure.
461 82 : ASSERT(local_priority_set_->hostSetsPerPriority().size() == 1);
462 82 : local_priority_set_member_update_cb_handle_ = local_priority_set_->addPriorityUpdateCb(
463 83 : [this](uint32_t priority, const HostVector&, const HostVector&) -> void {
464 82 : ASSERT(priority == 0);
465 : // If the set of local Envoys changes, regenerate routing for P=0 as it does priority
466 : // based routing.
467 82 : if (use_new_locality_routing_) {
468 82 : regenerateLocalityRoutingStructuresNew();
469 82 : } else {
470 0 : regenerateLocalityRoutingStructures();
471 0 : }
472 82 : });
473 82 : }
474 613 : }
475 :
476 164 : void ZoneAwareLoadBalancerBase::regenerateLocalityRoutingStructuresNew() {
477 164 : ASSERT(local_priority_set_);
478 164 : stats_.lb_recalculate_zone_structures_.inc();
479 : // resizePerPriorityState should ensure these stay in sync.
480 164 : ASSERT(per_priority_state_.size() == priority_set_.hostSetsPerPriority().size());
481 :
482 : // We only do locality routing for P=0
483 164 : uint32_t priority = 0;
484 164 : PerPriorityState& state = *per_priority_state_[priority];
485 : // Do not perform any calculations if we cannot perform locality routing based on non runtime
486 : // params.
487 164 : if (earlyExitNonLocalityRoutingNew()) {
488 164 : state.locality_routing_state_ = LocalityRoutingState::NoLocalityRouting;
489 164 : return;
490 164 : }
491 0 : HostSet& host_set = *priority_set_.hostSetsPerPriority()[priority];
492 0 : const HostsPerLocality& upstreamHostsPerLocality = host_set.healthyHostsPerLocality();
493 0 : const size_t num_upstream_localities = upstreamHostsPerLocality.get().size();
494 0 : ASSERT(num_upstream_localities >= 2);
495 :
496 : // It is worth noting that all of the percentages calculated are orthogonal from
497 : // how much load this priority level receives, percentageLoad(priority).
498 : //
499 : // If the host sets are such that 20% of load is handled locally and 80% is residual, and then
500 : // half the hosts in all host sets go unhealthy, this priority set will
501 : // still send half of the incoming load to the local locality and 80% to residual.
502 : //
503 : // Basically, fairness across localities within a priority is guaranteed. Fairness across
504 : // localities across priorities is not.
505 0 : const HostsPerLocality& localHostsPerLocality = localHostSet().healthyHostsPerLocality();
506 0 : auto locality_percentages =
507 0 : calculateLocalityPercentagesNew(localHostsPerLocality, upstreamHostsPerLocality);
508 :
509 : // If we have lower percent of hosts in the local cluster in the same locality,
510 : // we can push all of the requests directly to upstream cluster in the same locality.
511 0 : if (upstreamHostsPerLocality.hasLocalLocality() &&
512 0 : locality_percentages[0].upstream_percentage > 0 &&
513 0 : locality_percentages[0].upstream_percentage >= locality_percentages[0].local_percentage) {
514 0 : state.locality_routing_state_ = LocalityRoutingState::LocalityDirect;
515 0 : return;
516 0 : }
517 :
518 0 : state.locality_routing_state_ = LocalityRoutingState::LocalityResidual;
519 :
520 : // If we cannot route all requests to the same locality, calculate what percentage can be routed.
521 : // For example, if local percentage is 20% and upstream is 10%
522 : // we can route only 50% of requests directly.
523 : // Local percent can be 0% if there are no upstream hosts in the local locality.
524 0 : state.local_percent_to_route_ =
525 0 : upstreamHostsPerLocality.hasLocalLocality() && locality_percentages[0].local_percentage > 0
526 0 : ? locality_percentages[0].upstream_percentage * 10000 /
527 0 : locality_percentages[0].local_percentage
528 0 : : 0;
529 :
530 : // Local locality does not have additional capacity (we have already routed what we could).
531 : // Now we need to figure out how much traffic we can route cross locality and to which exact
532 : // locality we should route. Percentage of requests routed cross locality to a specific locality
533 : // needed be proportional to the residual capacity upstream locality has.
534 : //
535 : // residual_capacity contains capacity left in a given locality, we keep accumulating residual
536 : // capacity to make search for sampled value easier.
537 : // For example, if we have the following upstream and local percentage:
538 : // local_percentage: 40000 40000 20000
539 : // upstream_percentage: 25000 50000 25000
540 : // Residual capacity would look like: 0 10000 5000. Now we need to sample proportionally to
541 : // bucket sizes (residual capacity). For simplicity of finding where specific
542 : // sampled value is, we accumulate values in residual capacity. This is what it will look like:
543 : // residual_capacity: 0 10000 15000
544 : // Now to find a locality to route (bucket) we could simply iterate over residual_capacity
545 : // searching where sampled value is placed.
546 0 : state.residual_capacity_.resize(num_upstream_localities);
547 0 : for (uint64_t i = 0; i < num_upstream_localities; ++i) {
548 0 : uint64_t last_residual_capacity = i > 0 ? state.residual_capacity_[i - 1] : 0;
549 0 : LocalityPercentages this_locality_percentages = locality_percentages[i];
550 0 : if (i == 0 && upstreamHostsPerLocality.hasLocalLocality()) {
551 : // This is a local locality, we have already routed what we could.
552 0 : state.residual_capacity_[i] = last_residual_capacity;
553 0 : continue;
554 0 : }
555 :
556 : // Only route to the localities that have additional capacity.
557 0 : if (this_locality_percentages.upstream_percentage >
558 0 : this_locality_percentages.local_percentage) {
559 0 : state.residual_capacity_[i] = last_residual_capacity +
560 0 : this_locality_percentages.upstream_percentage -
561 0 : this_locality_percentages.local_percentage;
562 0 : } else {
563 : // Locality with index "i" does not have residual capacity, but we keep accumulating previous
564 : // values to make search easier on the next step.
565 0 : state.residual_capacity_[i] = last_residual_capacity;
566 0 : }
567 0 : }
568 0 : }
569 :
570 0 : void ZoneAwareLoadBalancerBase::regenerateLocalityRoutingStructures() {
571 0 : ASSERT(local_priority_set_);
572 0 : stats_.lb_recalculate_zone_structures_.inc();
573 : // resizePerPriorityState should ensure these stay in sync.
574 0 : ASSERT(per_priority_state_.size() == priority_set_.hostSetsPerPriority().size());
575 :
576 : // We only do locality routing for P=0
577 0 : uint32_t priority = 0;
578 0 : PerPriorityState& state = *per_priority_state_[priority];
579 : // Do not perform any calculations if we cannot perform locality routing based on non runtime
580 : // params.
581 0 : if (earlyExitNonLocalityRouting()) {
582 0 : state.locality_routing_state_ = LocalityRoutingState::NoLocalityRouting;
583 0 : return;
584 0 : }
585 0 : HostSet& host_set = *priority_set_.hostSetsPerPriority()[priority];
586 0 : ASSERT(host_set.healthyHostsPerLocality().hasLocalLocality());
587 0 : const size_t num_localities = host_set.healthyHostsPerLocality().get().size();
588 0 : ASSERT(num_localities > 0);
589 :
590 : // It is worth noting that all of the percentages calculated are orthogonal from
591 : // how much load this priority level receives, percentageLoad(priority).
592 : //
593 : // If the host sets are such that 20% of load is handled locally and 80% is residual, and then
594 : // half the hosts in all host sets go unhealthy, this priority set will
595 : // still send half of the incoming load to the local locality and 80% to residual.
596 : //
597 : // Basically, fairness across localities within a priority is guaranteed. Fairness across
598 : // localities across priorities is not.
599 0 : absl::FixedArray<uint64_t> local_percentage(num_localities);
600 0 : calculateLocalityPercentage(localHostSet().healthyHostsPerLocality(), local_percentage.begin());
601 0 : absl::FixedArray<uint64_t> upstream_percentage(num_localities);
602 0 : calculateLocalityPercentage(host_set.healthyHostsPerLocality(), upstream_percentage.begin());
603 :
604 : // If we have lower percent of hosts in the local cluster in the same locality,
605 : // we can push all of the requests directly to upstream cluster in the same locality.
606 0 : if (upstream_percentage[0] >= local_percentage[0]) {
607 0 : state.locality_routing_state_ = LocalityRoutingState::LocalityDirect;
608 0 : return;
609 0 : }
610 :
611 0 : state.locality_routing_state_ = LocalityRoutingState::LocalityResidual;
612 :
613 : // If we cannot route all requests to the same locality, calculate what percentage can be routed.
614 : // For example, if local percentage is 20% and upstream is 10%
615 : // we can route only 50% of requests directly.
616 0 : state.local_percent_to_route_ = upstream_percentage[0] * 10000 / local_percentage[0];
617 :
618 : // Local locality does not have additional capacity (we have already routed what we could).
619 : // Now we need to figure out how much traffic we can route cross locality and to which exact
620 : // locality we should route. Percentage of requests routed cross locality to a specific locality
621 : // needed be proportional to the residual capacity upstream locality has.
622 : //
623 : // residual_capacity contains capacity left in a given locality, we keep accumulating residual
624 : // capacity to make search for sampled value easier.
625 : // For example, if we have the following upstream and local percentage:
626 : // local_percentage: 40000 40000 20000
627 : // upstream_percentage: 25000 50000 25000
628 : // Residual capacity would look like: 0 10000 5000. Now we need to sample proportionally to
629 : // bucket sizes (residual capacity). For simplicity of finding where specific
630 : // sampled value is, we accumulate values in residual capacity. This is what it will look like:
631 : // residual_capacity: 0 10000 15000
632 : // Now to find a locality to route (bucket) we could simply iterate over residual_capacity
633 : // searching where sampled value is placed.
634 0 : state.residual_capacity_.resize(num_localities);
635 :
636 : // Local locality (index 0) does not have residual capacity as we have routed all we could.
637 0 : state.residual_capacity_[0] = 0;
638 0 : for (size_t i = 1; i < num_localities; ++i) {
639 : // Only route to the localities that have additional capacity.
640 0 : if (upstream_percentage[i] > local_percentage[i]) {
641 0 : state.residual_capacity_[i] =
642 0 : state.residual_capacity_[i - 1] + upstream_percentage[i] - local_percentage[i];
643 0 : } else {
644 : // Locality with index "i" does not have residual capacity, but we keep accumulating previous
645 : // values to make search easier on the next step.
646 0 : state.residual_capacity_[i] = state.residual_capacity_[i - 1];
647 0 : }
648 0 : }
649 0 : }
650 :
651 1233 : void ZoneAwareLoadBalancerBase::resizePerPriorityState() {
652 1233 : const uint32_t size = priority_set_.hostSetsPerPriority().size();
653 2121 : while (per_priority_state_.size() < size) {
654 : // Note for P!=0, PerPriorityState is created with NoLocalityRouting and never changed.
655 888 : per_priority_state_.push_back(std::make_unique<PerPriorityState>());
656 888 : }
657 1233 : }
658 :
659 164 : bool ZoneAwareLoadBalancerBase::earlyExitNonLocalityRoutingNew() {
660 : // We only do locality routing for P=0.
661 164 : HostSet& host_set = *priority_set_.hostSetsPerPriority()[0];
662 164 : if (host_set.healthyHostsPerLocality().get().size() < 2) {
663 0 : return true;
664 0 : }
665 :
666 : // Do not perform locality routing if there are too few local localities for zone routing to have
667 : // an effect.
668 164 : if (localHostSet().hostsPerLocality().get().size() < 2) {
669 164 : return true;
670 164 : }
671 :
672 : // Do not perform locality routing if the local cluster doesn't have any hosts in the current
673 : // envoy's local locality. This breaks our assumptions about the local cluster being correctly
674 : // configured, so we don't have enough information to perform locality routing. Note: If other
675 : // envoys do exist according to the local cluster, they will still be able to perform locality
676 : // routing correctly. This will not cause a traffic imbalance because other envoys will not know
677 : // about the current one, so they will not factor it into locality routing calculations.
678 0 : if (!localHostSet().hostsPerLocality().hasLocalLocality() ||
679 0 : localHostSet().hostsPerLocality().get()[0].empty()) {
680 0 : stats_.lb_local_cluster_not_ok_.inc();
681 0 : return true;
682 0 : }
683 :
684 : // If the runtime guard is not enabled, keep the old behavior of not performing locality routing
685 : // if the number of localities in the local cluster is different from the number of localities
686 : // in the upstream cluster.
687 : // The lb_zone_number_differs stat is only relevant if the runtime guard is disabled,
688 : // so it is only incremented in that case.
689 0 : if (!Runtime::runtimeFeatureEnabled(
690 0 : "envoy.reloadable_features.enable_zone_routing_different_zone_counts") &&
691 0 : host_set.healthyHostsPerLocality().get().size() !=
692 0 : localHostSet().healthyHostsPerLocality().get().size()) {
693 0 : stats_.lb_zone_number_differs_.inc();
694 0 : return true;
695 0 : }
696 :
697 : // Do not perform locality routing for small clusters.
698 0 : const uint64_t min_cluster_size =
699 0 : runtime_.snapshot().getInteger(RuntimeMinClusterSize, min_cluster_size_);
700 0 : if (host_set.healthyHosts().size() < min_cluster_size) {
701 0 : stats_.lb_zone_cluster_too_small_.inc();
702 0 : return true;
703 0 : }
704 :
705 0 : return false;
706 0 : }
707 :
708 0 : bool ZoneAwareLoadBalancerBase::earlyExitNonLocalityRouting() {
709 : // We only do locality routing for P=0.
710 0 : HostSet& host_set = *priority_set_.hostSetsPerPriority()[0];
711 0 : if (host_set.healthyHostsPerLocality().get().size() < 2) {
712 0 : return true;
713 0 : }
714 :
715 : // lb_local_cluster_not_ok is bumped for "Local host set is not set or it is
716 : // panic mode for local cluster".
717 0 : if (!host_set.healthyHostsPerLocality().hasLocalLocality() ||
718 0 : host_set.healthyHostsPerLocality().get()[0].empty()) {
719 0 : stats_.lb_local_cluster_not_ok_.inc();
720 0 : return true;
721 0 : }
722 :
723 : // Same number of localities should be for local and upstream cluster.
724 0 : if (host_set.healthyHostsPerLocality().get().size() !=
725 0 : localHostSet().healthyHostsPerLocality().get().size()) {
726 0 : stats_.lb_zone_number_differs_.inc();
727 0 : return true;
728 0 : }
729 :
730 : // Do not perform locality routing for small clusters.
731 0 : const uint64_t min_cluster_size =
732 0 : runtime_.snapshot().getInteger(RuntimeMinClusterSize, min_cluster_size_);
733 0 : if (host_set.healthyHosts().size() < min_cluster_size) {
734 0 : stats_.lb_zone_cluster_too_small_.inc();
735 0 : return true;
736 0 : }
737 :
738 0 : return false;
739 0 : }
740 :
741 779 : HostConstSharedPtr ZoneAwareLoadBalancerBase::chooseHost(LoadBalancerContext* context) {
742 779 : HostConstSharedPtr host;
743 :
744 779 : const size_t max_attempts = context ? context->hostSelectionRetryCount() + 1 : 1;
745 779 : for (size_t i = 0; i < max_attempts; ++i) {
746 779 : host = chooseHostOnce(context);
747 :
748 : // If host selection failed or the host is accepted by the filter, return.
749 : // Otherwise, try again.
750 : // Note: in the future we might want to allow retrying when chooseHostOnce returns nullptr.
751 779 : if (!host || !context || !context->shouldSelectAnotherHost(*host)) {
752 779 : return host;
753 779 : }
754 779 : }
755 :
756 : // If we didn't find anything, return the last host.
757 0 : return host;
758 779 : }
759 :
760 1248 : bool LoadBalancerBase::isHostSetInPanic(const HostSet& host_set) const {
761 1248 : uint64_t global_panic_threshold = std::min<uint64_t>(
762 1248 : 100, runtime_.snapshot().getInteger(RuntimePanicThreshold, default_healthy_panic_percent_));
763 1248 : const auto host_count = host_set.hosts().size() - host_set.excludedHosts().size();
764 1248 : double healthy_percent =
765 1248 : host_count == 0 ? 0.0 : 100.0 * host_set.healthyHosts().size() / host_count;
766 :
767 1248 : double degraded_percent =
768 1248 : host_count == 0 ? 0.0 : 100.0 * host_set.degradedHosts().size() / host_count;
769 : // If the % of healthy hosts in the cluster is less than our panic threshold, we use all hosts.
770 1248 : if ((healthy_percent + degraded_percent) < global_panic_threshold) {
771 1206 : return true;
772 1206 : }
773 :
774 42 : return false;
775 1248 : }
776 :
777 : absl::FixedArray<ZoneAwareLoadBalancerBase::LocalityPercentages>
778 : ZoneAwareLoadBalancerBase::calculateLocalityPercentagesNew(
779 : const HostsPerLocality& local_hosts_per_locality,
780 0 : const HostsPerLocality& upstream_hosts_per_locality) {
781 0 : uint64_t total_local_hosts = 0;
782 0 : std::map<envoy::config::core::v3::Locality, uint64_t, LocalityLess> local_counts;
783 0 : for (const auto& locality_hosts : local_hosts_per_locality.get()) {
784 0 : total_local_hosts += locality_hosts.size();
785 : // If there is no entry in the map for a given locality, it is assumed to have 0 hosts.
786 0 : if (!locality_hosts.empty()) {
787 0 : local_counts.insert(std::make_pair(locality_hosts[0]->locality(), locality_hosts.size()));
788 0 : }
789 0 : }
790 0 : uint64_t total_upstream_hosts = 0;
791 0 : for (const auto& locality_hosts : upstream_hosts_per_locality.get()) {
792 0 : total_upstream_hosts += locality_hosts.size();
793 0 : }
794 :
795 0 : absl::FixedArray<LocalityPercentages> percentages(upstream_hosts_per_locality.get().size());
796 0 : for (uint32_t i = 0; i < upstream_hosts_per_locality.get().size(); ++i) {
797 0 : const auto& upstream_hosts = upstream_hosts_per_locality.get()[i];
798 0 : if (upstream_hosts.empty()) {
799 : // If there are no upstream hosts in a given locality, the upstream percentage is 0.
800 : // We can't determine the locality of this group, so we can't find the corresponding local
801 : // count. However, if there are no upstream hosts in a locality, the local percentage doesn't
802 : // matter.
803 0 : percentages[i] = LocalityPercentages{0, 0};
804 0 : continue;
805 0 : }
806 0 : const auto& locality = upstream_hosts[0]->locality();
807 :
808 0 : const auto& local_count_it = local_counts.find(locality);
809 0 : const uint64_t local_count = local_count_it == local_counts.end() ? 0 : local_count_it->second;
810 :
811 0 : const uint64_t local_percentage =
812 0 : total_local_hosts > 0 ? 10000ULL * local_count / total_local_hosts : 0;
813 0 : const uint64_t upstream_percentage =
814 0 : total_upstream_hosts > 0 ? 10000ULL * upstream_hosts.size() / total_upstream_hosts : 0;
815 :
816 0 : percentages[i] = LocalityPercentages{local_percentage, upstream_percentage};
817 0 : }
818 :
819 0 : return percentages;
820 0 : }
821 :
822 : void ZoneAwareLoadBalancerBase::calculateLocalityPercentage(
823 0 : const HostsPerLocality& hosts_per_locality, uint64_t* ret) {
824 0 : uint64_t total_hosts = 0;
825 0 : for (const auto& locality_hosts : hosts_per_locality.get()) {
826 0 : total_hosts += locality_hosts.size();
827 0 : }
828 :
829 : // TODO(snowp): Should we ignore excluded hosts here too?
830 :
831 0 : size_t i = 0;
832 0 : for (const auto& locality_hosts : hosts_per_locality.get()) {
833 0 : ret[i++] = total_hosts > 0 ? 10000ULL * locality_hosts.size() / total_hosts : 0;
834 0 : }
835 0 : }
836 :
837 0 : uint32_t ZoneAwareLoadBalancerBase::tryChooseLocalLocalityHosts(const HostSet& host_set) const {
838 0 : PerPriorityState& state = *per_priority_state_[host_set.priority()];
839 0 : ASSERT(state.locality_routing_state_ != LocalityRoutingState::NoLocalityRouting);
840 :
841 : // At this point it's guaranteed to be at least 2 localities in the upstream host set.
842 0 : const size_t number_of_localities = host_set.healthyHostsPerLocality().get().size();
843 0 : ASSERT(number_of_localities >= 2U);
844 :
845 : // Try to push all of the requests to the same locality if possible.
846 0 : if (state.locality_routing_state_ == LocalityRoutingState::LocalityDirect) {
847 0 : ASSERT(host_set.healthyHostsPerLocality().hasLocalLocality());
848 0 : stats_.lb_zone_routing_all_directly_.inc();
849 0 : return 0;
850 0 : }
851 :
852 0 : ASSERT(state.locality_routing_state_ == LocalityRoutingState::LocalityResidual);
853 0 : ASSERT(host_set.healthyHostsPerLocality().hasLocalLocality() ||
854 0 : state.local_percent_to_route_ == 0);
855 :
856 : // If we cannot route all requests to the same locality, we already calculated how much we can
857 : // push to the local locality, check if we can push to local locality on current iteration.
858 0 : if (random_.random() % 10000 < state.local_percent_to_route_) {
859 0 : stats_.lb_zone_routing_sampled_.inc();
860 0 : return 0;
861 0 : }
862 :
863 : // At this point we must route cross locality as we cannot route to the local locality.
864 0 : stats_.lb_zone_routing_cross_zone_.inc();
865 :
866 : // This is *extremely* unlikely but possible due to rounding errors when calculating
867 : // locality percentages. In this case just select random locality.
868 0 : if (state.residual_capacity_[number_of_localities - 1] == 0) {
869 0 : stats_.lb_zone_no_capacity_left_.inc();
870 0 : return random_.random() % number_of_localities;
871 0 : }
872 :
873 : // Random sampling to select specific locality for cross locality traffic based on the
874 : // additional capacity in localities.
875 0 : uint64_t threshold = random_.random() % state.residual_capacity_[number_of_localities - 1];
876 :
877 : // This potentially can be optimized to be O(log(N)) where N is the number of localities.
878 : // Linear scan should be faster for smaller N, in most of the scenarios N will be small.
879 : //
880 : // Bucket 1: [0, state.residual_capacity_[0] - 1]
881 : // Bucket 2: [state.residual_capacity_[0], state.residual_capacity_[1] - 1]
882 : // ...
883 : // Bucket N: [state.residual_capacity_[N-2], state.residual_capacity_[N-1] - 1]
884 0 : int i = 0;
885 0 : while (threshold >= state.residual_capacity_[i]) {
886 0 : i++;
887 0 : }
888 :
889 0 : return i;
890 0 : }
891 :
892 : absl::optional<ZoneAwareLoadBalancerBase::HostsSource>
893 977 : ZoneAwareLoadBalancerBase::hostSourceToUse(LoadBalancerContext* context, uint64_t hash) const {
894 977 : auto host_set_and_source = chooseHostSet(context, hash);
895 :
896 : // The second argument tells us which availability we should target from the selected host set.
897 977 : const auto host_availability = host_set_and_source.second;
898 977 : auto& host_set = host_set_and_source.first;
899 977 : HostsSource hosts_source;
900 977 : hosts_source.priority_ = host_set.priority();
901 :
902 : // If the selected host set has insufficient healthy hosts, return all hosts (unless we should
903 : // fail traffic on panic, in which case return no host).
904 977 : if (per_priority_panic_[hosts_source.priority_]) {
905 366 : stats_.lb_healthy_panic_.inc();
906 366 : if (fail_traffic_on_panic_) {
907 8 : return absl::nullopt;
908 358 : } else {
909 358 : hosts_source.source_type_ = HostsSource::SourceType::AllHosts;
910 358 : return hosts_source;
911 358 : }
912 366 : }
913 :
914 : // If we're doing locality weighted balancing, pick locality.
915 : //
916 : // The chooseDegradedLocality or chooseHealthyLocality may return valid locality index
917 : // when the locality_weighted_lb_config is set or load balancing policy extension is used.
918 : // This if statement is to make sure we only do locality weighted balancing when the
919 : // locality_weighted_lb_config is set explicitly even the hostSourceToUse is called in the
920 : // load balancing policy extensions.
921 611 : if (locality_weighted_balancing_) {
922 19 : absl::optional<uint32_t> locality;
923 19 : if (host_availability == HostAvailability::Degraded) {
924 0 : locality = host_set.chooseDegradedLocality();
925 19 : } else {
926 19 : locality = host_set.chooseHealthyLocality();
927 19 : }
928 :
929 19 : if (locality.has_value()) {
930 0 : auto source_type = localitySourceType(host_availability);
931 0 : if (!source_type) {
932 0 : return absl::nullopt;
933 0 : }
934 0 : hosts_source.source_type_ = source_type.value();
935 0 : hosts_source.locality_index_ = locality.value();
936 0 : return hosts_source;
937 0 : }
938 19 : }
939 :
940 : // If we've latched that we can't do locality-based routing, return healthy or degraded hosts
941 : // for the selected host set.
942 611 : if (per_priority_state_[host_set.priority()]->locality_routing_state_ ==
943 611 : LocalityRoutingState::NoLocalityRouting) {
944 611 : auto source_type = sourceType(host_availability);
945 611 : if (!source_type) {
946 0 : return absl::nullopt;
947 0 : }
948 611 : hosts_source.source_type_ = source_type.value();
949 611 : return hosts_source;
950 611 : }
951 :
952 : // Determine if the load balancer should do zone based routing for this pick.
953 0 : if (!runtime_.snapshot().featureEnabled(RuntimeZoneEnabled, routing_enabled_)) {
954 0 : auto source_type = sourceType(host_availability);
955 0 : if (!source_type) {
956 0 : return absl::nullopt;
957 0 : }
958 0 : hosts_source.source_type_ = source_type.value();
959 0 : return hosts_source;
960 0 : }
961 :
962 0 : if (isHostSetInPanic(localHostSet())) {
963 0 : stats_.lb_local_cluster_not_ok_.inc();
964 : // If the local Envoy instances are in global panic, and we should not fail traffic, do
965 : // not do locality based routing.
966 0 : if (fail_traffic_on_panic_) {
967 0 : return absl::nullopt;
968 0 : } else {
969 0 : auto source_type = sourceType(host_availability);
970 0 : if (!source_type) {
971 0 : return absl::nullopt;
972 0 : }
973 0 : hosts_source.source_type_ = source_type.value();
974 0 : return hosts_source;
975 0 : }
976 0 : }
977 :
978 0 : auto source_type = localitySourceType(host_availability);
979 0 : if (!source_type) {
980 0 : return absl::nullopt;
981 0 : }
982 0 : hosts_source.source_type_ = source_type.value();
983 0 : hosts_source.locality_index_ = tryChooseLocalLocalityHosts(host_set);
984 0 : return hosts_source;
985 0 : }
986 :
987 824 : const HostVector& ZoneAwareLoadBalancerBase::hostSourceToHosts(HostsSource hosts_source) const {
988 824 : const HostSet& host_set = *priority_set_.hostSetsPerPriority()[hosts_source.priority_];
989 824 : switch (hosts_source.source_type_) {
990 267 : case HostsSource::SourceType::AllHosts:
991 267 : return host_set.hosts();
992 534 : case HostsSource::SourceType::HealthyHosts:
993 534 : return host_set.healthyHosts();
994 23 : case HostsSource::SourceType::DegradedHosts:
995 23 : return host_set.degradedHosts();
996 0 : case HostsSource::SourceType::LocalityHealthyHosts:
997 0 : return host_set.healthyHostsPerLocality().get()[hosts_source.locality_index_];
998 0 : case HostsSource::SourceType::LocalityDegradedHosts:
999 0 : return host_set.degradedHostsPerLocality().get()[hosts_source.locality_index_];
1000 824 : }
1001 0 : PANIC_DUE_TO_CORRUPT_ENUM;
1002 0 : }
1003 :
1004 : EdfLoadBalancerBase::EdfLoadBalancerBase(
1005 : const PrioritySet& priority_set, const PrioritySet* local_priority_set, ClusterLbStats& stats,
1006 : Runtime::Loader& runtime, Random::RandomGenerator& random, uint32_t healthy_panic_threshold,
1007 : const absl::optional<LocalityLbConfig> locality_config,
1008 : const absl::optional<SlowStartConfig> slow_start_config, TimeSource& time_source)
1009 : : ZoneAwareLoadBalancerBase(priority_set, local_priority_set, stats, runtime, random,
1010 : healthy_panic_threshold, locality_config),
1011 : seed_(random_.random()),
1012 : slow_start_window_(slow_start_config.has_value()
1013 : ? std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
1014 : slow_start_config.value().slow_start_window()))
1015 : : std::chrono::milliseconds(0)),
1016 : aggression_runtime_(
1017 : slow_start_config.has_value() && slow_start_config.value().has_aggression()
1018 : ? absl::optional<Runtime::Double>({slow_start_config.value().aggression(), runtime})
1019 : : absl::nullopt),
1020 : time_source_(time_source), latest_host_added_time_(time_source_.monotonicTime()),
1021 : slow_start_min_weight_percent_(slow_start_config.has_value()
1022 : ? PROTOBUF_PERCENT_TO_DOUBLE_OR_DEFAULT(
1023 : slow_start_config.value(), min_weight_percent, 10) /
1024 : 100.0
1025 477 : : 0.1) {
1026 : // We fully recompute the schedulers for a given host set here on membership change, which is
1027 : // consistent with what other LB implementations do (e.g. thread aware).
1028 : // The downside of a full recompute is that time complexity is O(n * log n),
1029 : // so we will need to do better at delta tracking to scale (see
1030 : // https://github.com/envoyproxy/envoy/issues/2874).
1031 477 : priority_update_cb_ = priority_set.addPriorityUpdateCb(
1032 479 : [this](uint32_t priority, const HostVector&, const HostVector&) { refresh(priority); });
1033 477 : member_update_cb_ = priority_set.addMemberUpdateCb(
1034 479 : [this](const HostVector& hosts_added, const HostVector&) -> void {
1035 464 : if (isSlowStartEnabled()) {
1036 1 : recalculateHostsInSlowStart(hosts_added);
1037 1 : }
1038 464 : });
1039 477 : }
1040 :
1041 476 : void EdfLoadBalancerBase::initialize() {
1042 1086 : for (uint32_t priority = 0; priority < priority_set_.hostSetsPerPriority().size(); ++priority) {
1043 610 : refresh(priority);
1044 610 : }
1045 476 : }
1046 :
1047 100 : void EdfLoadBalancerBase::recalculateHostsInSlowStart(const HostVector& hosts) {
1048 : // TODO(nezdolik): linear scan can be improved with using flat hash set for hosts in slow start.
1049 2938 : for (const auto& host : hosts) {
1050 2938 : auto current_time = time_source_.monotonicTime();
1051 : // Host enters slow start if only it has transitioned into healthy state.
1052 2938 : if (host->coarseHealth() == Upstream::Host::Health::Healthy) {
1053 2938 : auto host_last_hc_pass_time =
1054 2938 : host->lastHcPassTime() ? host->lastHcPassTime().value() : current_time;
1055 2938 : auto in_healthy_state_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
1056 2938 : current_time - host_last_hc_pass_time);
1057 : // If there is no active HC enabled or HC has not run, start slow start window from current
1058 : // time.
1059 2938 : if (!host->lastHcPassTime()) {
1060 196 : host->setLastHcPassTime(std::move(current_time));
1061 196 : }
1062 : // Check if host existence time is within slow start window.
1063 2938 : if (host_last_hc_pass_time > latest_host_added_time_ &&
1064 2938 : in_healthy_state_duration <= slow_start_window_) {
1065 0 : latest_host_added_time_ = host_last_hc_pass_time;
1066 0 : }
1067 2938 : }
1068 2938 : }
1069 100 : }
1070 :
1071 1074 : void EdfLoadBalancerBase::refresh(uint32_t priority) {
1072 4782 : const auto add_hosts_source = [this](HostsSource source, const HostVector& hosts) {
1073 : // Nuke existing scheduler if it exists.
1074 4782 : auto& scheduler = scheduler_[source] = Scheduler{};
1075 4782 : refreshHostSource(source);
1076 4782 : if (isSlowStartEnabled()) {
1077 99 : recalculateHostsInSlowStart(hosts);
1078 99 : }
1079 :
1080 : // Check if the original host weights are equal and no hosts are in slow start mode, in that
1081 : // case EDF creation is skipped. When all original weights are equal and no hosts are in slow
1082 : // start mode we can rely on unweighted host pick to do optimal round robin and least-loaded
1083 : // host selection with lower memory and CPU overhead.
1084 4782 : if (hostWeightsAreEqual(hosts) && noHostsAreInSlowStart()) {
1085 : // Skip edf creation.
1086 4311 : return;
1087 4311 : }
1088 471 : scheduler.edf_ = std::make_unique<EdfScheduler<const Host>>();
1089 :
1090 : // Populate scheduler with host list.
1091 : // TODO(mattklein123): We must build the EDF schedule even if all of the hosts are currently
1092 : // weighted 1. This is because currently we don't refresh host sets if only weights change.
1093 : // We should probably change this to refresh at all times. See the comment in
1094 : // BaseDynamicClusterImpl::updateDynamicHostList about this.
1095 642439 : for (const auto& host : hosts) {
1096 : // We use a fixed weight here. While the weight may change without
1097 : // notification, this will only be stale until this host is next picked,
1098 : // at which point it is reinserted into the EdfScheduler with its new
1099 : // weight in chooseHost().
1100 642439 : scheduler.edf_->add(hostWeight(*host), host);
1101 642439 : }
1102 :
1103 : // Cycle through hosts to achieve the intended offset behavior.
1104 : // TODO(htuch): Consider how we can avoid biasing towards earlier hosts in the schedule across
1105 : // refreshes for the weighted case.
1106 471 : if (!hosts.empty()) {
1107 525749 : for (uint32_t i = 0; i < seed_ % hosts.size(); ++i) {
1108 525346 : auto host =
1109 525346 : scheduler.edf_->pickAndAdd([this](const Host& host) { return hostWeight(host); });
1110 525346 : }
1111 403 : }
1112 471 : };
1113 : // Populate EdfSchedulers for each valid HostsSource value for the host set at this priority.
1114 1074 : const auto& host_set = priority_set_.hostSetsPerPriority()[priority];
1115 1074 : add_hosts_source(HostsSource(priority, HostsSource::SourceType::AllHosts), host_set->hosts());
1116 1074 : add_hosts_source(HostsSource(priority, HostsSource::SourceType::HealthyHosts),
1117 1074 : host_set->healthyHosts());
1118 1074 : add_hosts_source(HostsSource(priority, HostsSource::SourceType::DegradedHosts),
1119 1074 : host_set->degradedHosts());
1120 1074 : for (uint32_t locality_index = 0;
1121 1854 : locality_index < host_set->healthyHostsPerLocality().get().size(); ++locality_index) {
1122 780 : add_hosts_source(
1123 780 : HostsSource(priority, HostsSource::SourceType::LocalityHealthyHosts, locality_index),
1124 780 : host_set->healthyHostsPerLocality().get()[locality_index]);
1125 780 : }
1126 1074 : for (uint32_t locality_index = 0;
1127 1854 : locality_index < host_set->degradedHostsPerLocality().get().size(); ++locality_index) {
1128 780 : add_hosts_source(
1129 780 : HostsSource(priority, HostsSource::SourceType::LocalityDegradedHosts, locality_index),
1130 780 : host_set->degradedHostsPerLocality().get()[locality_index]);
1131 780 : }
1132 1074 : }
1133 :
1134 1177562 : bool EdfLoadBalancerBase::isSlowStartEnabled() const {
1135 1177562 : return slow_start_window_ > std::chrono::milliseconds(0);
1136 1177562 : }
1137 :
1138 1172316 : bool EdfLoadBalancerBase::noHostsAreInSlowStart() const {
1139 1172316 : if (!isSlowStartEnabled()) {
1140 1168349 : return true;
1141 1168349 : }
1142 3967 : auto current_time = time_source_.monotonicTime();
1143 3967 : if (std::chrono::duration_cast<std::chrono::milliseconds>(
1144 3967 : current_time - latest_host_added_time_) <= slow_start_window_) {
1145 3967 : return false;
1146 3967 : }
1147 0 : return true;
1148 3967 : }
1149 :
1150 242 : HostConstSharedPtr EdfLoadBalancerBase::peekAnotherHost(LoadBalancerContext* context) {
1151 242 : if (tooManyPreconnects(stashed_random_.size(), total_healthy_hosts_)) {
1152 137 : return nullptr;
1153 137 : }
1154 :
1155 105 : const absl::optional<HostsSource> hosts_source = hostSourceToUse(context, random(true));
1156 105 : if (!hosts_source) {
1157 0 : return nullptr;
1158 0 : }
1159 :
1160 105 : auto scheduler_it = scheduler_.find(*hosts_source);
1161 : // We should always have a scheduler for any return value from
1162 : // hostSourceToUse() via the construction in refresh();
1163 105 : ASSERT(scheduler_it != scheduler_.end());
1164 105 : auto& scheduler = scheduler_it->second;
1165 :
1166 : // As has been commented in both EdfLoadBalancerBase::refresh and
1167 : // BaseDynamicClusterImpl::updateDynamicHostList, we must do a runtime pivot here to determine
1168 : // whether to use EDF or do unweighted (fast) selection. EDF is non-null iff the original
1169 : // weights of 2 or more hosts differ.
1170 105 : if (scheduler.edf_ != nullptr) {
1171 20 : return scheduler.edf_->peekAgain([this](const Host& host) { return hostWeight(host); });
1172 85 : } else {
1173 85 : const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source);
1174 85 : if (hosts_to_use.empty()) {
1175 0 : return nullptr;
1176 0 : }
1177 85 : return unweightedHostPeek(hosts_to_use, *hosts_source);
1178 85 : }
1179 105 : }
1180 :
1181 521 : HostConstSharedPtr EdfLoadBalancerBase::chooseHostOnce(LoadBalancerContext* context) {
1182 521 : const absl::optional<HostsSource> hosts_source = hostSourceToUse(context, random(false));
1183 521 : if (!hosts_source) {
1184 5 : return nullptr;
1185 5 : }
1186 516 : auto scheduler_it = scheduler_.find(*hosts_source);
1187 : // We should always have a scheduler for any return value from
1188 : // hostSourceToUse() via the construction in refresh();
1189 516 : ASSERT(scheduler_it != scheduler_.end());
1190 516 : auto& scheduler = scheduler_it->second;
1191 :
1192 : // As has been commented in both EdfLoadBalancerBase::refresh and
1193 : // BaseDynamicClusterImpl::updateDynamicHostList, we must do a runtime pivot here to determine
1194 : // whether to use EDF or do unweighted (fast) selection. EDF is non-null iff the original
1195 : // weights of 2 or more hosts differ.
1196 516 : if (scheduler.edf_ != nullptr) {
1197 125 : auto host = scheduler.edf_->pickAndAdd([this](const Host& host) { return hostWeight(host); });
1198 125 : return host;
1199 453 : } else {
1200 391 : const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source);
1201 391 : if (hosts_to_use.empty()) {
1202 4 : return nullptr;
1203 4 : }
1204 387 : return unweightedHostPick(hosts_to_use, *hosts_source);
1205 391 : }
1206 516 : }
1207 :
1208 : namespace {
1209 3876 : double applyAggressionFactor(double time_factor, double aggression) {
1210 3876 : if (aggression == 1.0 || time_factor == 1.0) {
1211 3876 : return time_factor;
1212 3876 : } else {
1213 0 : return std::pow(time_factor, 1.0 / aggression);
1214 0 : }
1215 3876 : }
1216 : } // namespace
1217 :
1218 3876 : double EdfLoadBalancerBase::applySlowStartFactor(double host_weight, const Host& host) const {
1219 : // We can reliably apply slow start weight only if `last_hc_pass_time` in host has been populated
1220 : // either by active HC or by `member_update_cb_` in `EdfLoadBalancerBase`.
1221 3876 : if (host.lastHcPassTime() && host.coarseHealth() == Upstream::Host::Health::Healthy) {
1222 3876 : auto in_healthy_state_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
1223 3876 : time_source_.monotonicTime() - host.lastHcPassTime().value());
1224 3876 : if (in_healthy_state_duration < slow_start_window_) {
1225 3876 : double aggression =
1226 3876 : aggression_runtime_ != absl::nullopt ? aggression_runtime_.value().value() : 1.0;
1227 3876 : if (aggression <= 0.0 || std::isnan(aggression)) {
1228 0 : ENVOY_LOG_EVERY_POW_2(error, "Invalid runtime value provided for aggression parameter, "
1229 0 : "aggression cannot be less than 0.0");
1230 0 : aggression = 1.0;
1231 0 : }
1232 :
1233 3876 : ASSERT(aggression > 0.0);
1234 3876 : auto time_factor = static_cast<double>(std::max(std::chrono::milliseconds(1).count(),
1235 3876 : in_healthy_state_duration.count())) /
1236 3876 : slow_start_window_.count();
1237 3876 : return host_weight * std::max(applyAggressionFactor(time_factor, aggression),
1238 3876 : slow_start_min_weight_percent_);
1239 3876 : } else {
1240 0 : return host_weight;
1241 0 : }
1242 3876 : } else {
1243 0 : return host_weight;
1244 0 : }
1245 3876 : }
1246 :
1247 1040370 : double LeastRequestLoadBalancer::hostWeight(const Host& host) const {
1248 : // This method is called to calculate the dynamic weight as following when all load balancing
1249 : // weights are not equal:
1250 : //
1251 : // `weight = load_balancing_weight / (active_requests + 1)^active_request_bias`
1252 : //
1253 : // `active_request_bias` can be configured via runtime and its value is cached in
1254 : // `active_request_bias_` to avoid having to do a runtime lookup each time a host weight is
1255 : // calculated.
1256 : //
1257 : // When `active_request_bias == 0.0` we behave like `RoundRobinLoadBalancer` and return the
1258 : // host weight without considering the number of active requests at the time we do the pick.
1259 : //
1260 : // When `active_request_bias > 0.0` we scale the host weight by the number of active
1261 : // requests at the time we do the pick. We always add 1 to avoid division by 0.
1262 : //
1263 : // It might be possible to do better by picking two hosts off of the schedule, and selecting the
1264 : // one with fewer active requests at the time of selection.
1265 :
1266 1040370 : double host_weight = static_cast<double>(host.weight());
1267 :
1268 : // If the value of active requests is the max value, adding +1 will overflow
1269 : // it and cause a divide by zero. This won't happen in normal cases but stops
1270 : // failing fuzz tests
1271 1040370 : const uint64_t active_request_value =
1272 1040370 : host.stats().rq_active_.value() != std::numeric_limits<uint64_t>::max()
1273 1040370 : ? host.stats().rq_active_.value() + 1
1274 1040370 : : host.stats().rq_active_.value();
1275 :
1276 1040370 : if (active_request_bias_ == 1.0) {
1277 1023509 : host_weight = static_cast<double>(host.weight()) / active_request_value;
1278 1023509 : } else if (active_request_bias_ != 0.0) {
1279 270 : host_weight =
1280 270 : static_cast<double>(host.weight()) / std::pow(active_request_value, active_request_bias_);
1281 270 : }
1282 :
1283 1040370 : if (!noHostsAreInSlowStart()) {
1284 3540 : return applySlowStartFactor(host_weight, host);
1285 1036830 : } else {
1286 1036830 : return host_weight;
1287 1036830 : }
1288 1040370 : }
1289 :
1290 : HostConstSharedPtr LeastRequestLoadBalancer::unweightedHostPeek(const HostVector&,
1291 25 : const HostsSource&) {
1292 : // LeastRequestLoadBalancer can not do deterministic preconnecting, because
1293 : // any other thread might select the least-requested-host between preconnect and
1294 : // host-pick, and change the rq_active checks.
1295 25 : return nullptr;
1296 25 : }
1297 :
1298 : HostConstSharedPtr LeastRequestLoadBalancer::unweightedHostPick(const HostVector& hosts_to_use,
1299 47 : const HostsSource&) {
1300 47 : HostSharedPtr candidate_host = nullptr;
1301 :
1302 141 : for (uint32_t choice_idx = 0; choice_idx < choice_count_; ++choice_idx) {
1303 94 : const int rand_idx = random_.random() % hosts_to_use.size();
1304 94 : const HostSharedPtr& sampled_host = hosts_to_use[rand_idx];
1305 :
1306 94 : if (candidate_host == nullptr) {
1307 :
1308 : // Make a first choice to start the comparisons.
1309 47 : candidate_host = sampled_host;
1310 47 : continue;
1311 47 : }
1312 :
1313 47 : const auto candidate_active_rq = candidate_host->stats().rq_active_.value();
1314 47 : const auto sampled_active_rq = sampled_host->stats().rq_active_.value();
1315 47 : if (sampled_active_rq < candidate_active_rq) {
1316 8 : candidate_host = sampled_host;
1317 8 : }
1318 47 : }
1319 :
1320 47 : return candidate_host;
1321 47 : }
1322 :
1323 257 : HostConstSharedPtr RandomLoadBalancer::peekAnotherHost(LoadBalancerContext* context) {
1324 257 : if (tooManyPreconnects(stashed_random_.size(), total_healthy_hosts_)) {
1325 164 : return nullptr;
1326 164 : }
1327 93 : return peekOrChoose(context, true);
1328 257 : }
1329 :
1330 258 : HostConstSharedPtr RandomLoadBalancer::chooseHostOnce(LoadBalancerContext* context) {
1331 258 : return peekOrChoose(context, false);
1332 258 : }
1333 :
1334 351 : HostConstSharedPtr RandomLoadBalancer::peekOrChoose(LoadBalancerContext* context, bool peek) {
1335 351 : uint64_t random_hash = random(peek);
1336 351 : const absl::optional<HostsSource> hosts_source = hostSourceToUse(context, random_hash);
1337 351 : if (!hosts_source) {
1338 3 : return nullptr;
1339 3 : }
1340 :
1341 348 : const HostVector& hosts_to_use = hostSourceToHosts(*hosts_source);
1342 348 : if (hosts_to_use.empty()) {
1343 14 : return nullptr;
1344 14 : }
1345 :
1346 334 : return hosts_to_use[random_hash % hosts_to_use.size()];
1347 348 : }
1348 :
1349 : } // namespace Upstream
1350 : } // namespace Envoy
|