Line data Source code
1 : #include "source/common/upstream/thread_aware_lb_impl.h"
2 :
3 : #include <memory>
4 : #include <random>
5 :
6 : namespace Envoy {
7 : namespace Upstream {
8 :
9 : // TODO(mergeconflict): Adjust locality weights for partial availability, as is done in
10 : // HostSetImpl::effectiveLocalityWeight.
11 : namespace {
12 :
13 : void normalizeHostWeights(const HostVector& hosts, double normalized_locality_weight,
14 : NormalizedHostWeightVector& normalized_host_weights,
15 0 : double& min_normalized_weight, double& max_normalized_weight) {
16 : // sum should be at most uint32_t max value, so we can validate it by accumulating into unit64_t
17 : // and making sure there was no overflow
18 0 : uint64_t sum = 0;
19 0 : for (const auto& host : hosts) {
20 0 : sum += host->weight();
21 0 : if (sum > std::numeric_limits<uint32_t>::max()) {
22 0 : throwEnvoyExceptionOrPanic(
23 0 : fmt::format("The sum of weights of all upstream hosts in a locality exceeds {}",
24 0 : std::numeric_limits<uint32_t>::max()));
25 0 : }
26 0 : }
27 :
28 0 : for (const auto& host : hosts) {
29 0 : const double weight = host->weight() * normalized_locality_weight / sum;
30 0 : normalized_host_weights.push_back({host, weight});
31 0 : min_normalized_weight = std::min(min_normalized_weight, weight);
32 0 : max_normalized_weight = std::max(max_normalized_weight, weight);
33 0 : }
34 0 : }
35 :
36 : void normalizeLocalityWeights(const HostsPerLocality& hosts_per_locality,
37 : const LocalityWeights& locality_weights,
38 : NormalizedHostWeightVector& normalized_host_weights,
39 0 : double& min_normalized_weight, double& max_normalized_weight) {
40 0 : ASSERT(locality_weights.size() == hosts_per_locality.get().size());
41 :
42 : // sum should be at most uint32_t max value, so we can validate it by accumulating into unit64_t
43 : // and making sure there was no overflow
44 0 : uint64_t sum = 0;
45 0 : for (const auto weight : locality_weights) {
46 0 : sum += weight;
47 0 : if (sum > std::numeric_limits<uint32_t>::max()) {
48 0 : throwEnvoyExceptionOrPanic(
49 0 : fmt::format("The sum of weights of all localities at the same priority exceeds {}",
50 0 : std::numeric_limits<uint32_t>::max()));
51 0 : }
52 0 : }
53 :
54 : // Locality weights (unlike host weights) may be 0. If _all_ locality weights were 0, bail out.
55 0 : if (sum == 0) {
56 0 : return;
57 0 : }
58 :
59 : // Compute normalized weights for all hosts in each locality. If a locality was assigned zero
60 : // weight, all hosts in that locality will be skipped.
61 0 : for (LocalityWeights::size_type i = 0; i < locality_weights.size(); ++i) {
62 0 : if (locality_weights[i] != 0) {
63 0 : const HostVector& hosts = hosts_per_locality.get()[i];
64 0 : const double normalized_locality_weight = static_cast<double>(locality_weights[i]) / sum;
65 0 : normalizeHostWeights(hosts, normalized_locality_weight, normalized_host_weights,
66 0 : min_normalized_weight, max_normalized_weight);
67 0 : }
68 0 : }
69 0 : }
70 :
71 : void normalizeWeights(const HostSet& host_set, bool in_panic,
72 : NormalizedHostWeightVector& normalized_host_weights,
73 : double& min_normalized_weight, double& max_normalized_weight,
74 0 : bool locality_weighted_balancing) {
75 0 : if (!locality_weighted_balancing || host_set.localityWeights() == nullptr ||
76 0 : host_set.localityWeights()->empty()) {
77 : // If we're not dealing with locality weights, just normalize weights for the flat set of hosts.
78 0 : const auto& hosts = in_panic ? host_set.hosts() : host_set.healthyHosts();
79 0 : normalizeHostWeights(hosts, 1.0, normalized_host_weights, min_normalized_weight,
80 0 : max_normalized_weight);
81 0 : } else {
82 : // Otherwise, normalize weights across all localities.
83 0 : const auto& hosts_per_locality =
84 0 : in_panic ? host_set.hostsPerLocality() : host_set.healthyHostsPerLocality();
85 0 : normalizeLocalityWeights(hosts_per_locality, *(host_set.localityWeights()),
86 0 : normalized_host_weights, min_normalized_weight, max_normalized_weight);
87 0 : }
88 0 : }
89 :
90 : } // namespace
91 :
92 0 : void ThreadAwareLoadBalancerBase::initialize() {
93 : // TODO(mattklein123): In the future, once initialized and the initial LB is built, it would be
94 : // better to use a background thread for computing LB updates. This has the substantial benefit
95 : // that if the LB computation thread falls behind, host set updates can be trivially collapsed.
96 : // I will look into doing this in a follow up. Doing everything using a background thread heavily
97 : // complicated initialization as the load balancer would need its own initialized callback. I
98 : // think the synchronous/asynchronous split is probably the best option.
99 0 : priority_update_cb_ = priority_set_.addPriorityUpdateCb(
100 0 : [this](uint32_t, const HostVector&, const HostVector&) -> void { refresh(); });
101 :
102 0 : refresh();
103 0 : }
104 :
105 0 : void ThreadAwareLoadBalancerBase::refresh() {
106 0 : auto per_priority_state_vector = std::make_shared<std::vector<PerPriorityStatePtr>>(
107 0 : priority_set_.hostSetsPerPriority().size());
108 0 : auto healthy_per_priority_load =
109 0 : std::make_shared<HealthyLoad>(per_priority_load_.healthy_priority_load_);
110 0 : auto degraded_per_priority_load =
111 0 : std::make_shared<DegradedLoad>(per_priority_load_.degraded_priority_load_);
112 :
113 0 : for (const auto& host_set : priority_set_.hostSetsPerPriority()) {
114 0 : const uint32_t priority = host_set->priority();
115 0 : (*per_priority_state_vector)[priority] = std::make_unique<PerPriorityState>();
116 0 : const auto& per_priority_state = (*per_priority_state_vector)[priority];
117 : // Copy panic flag from LoadBalancerBase. It is calculated when there is a change
118 : // in hosts set or hosts' health.
119 0 : per_priority_state->global_panic_ = per_priority_panic_[priority];
120 :
121 : // Normalize host and locality weights such that the sum of all normalized weights is 1.
122 0 : NormalizedHostWeightVector normalized_host_weights;
123 0 : double min_normalized_weight = 1.0;
124 0 : double max_normalized_weight = 0.0;
125 0 : normalizeWeights(*host_set, per_priority_state->global_panic_, normalized_host_weights,
126 0 : min_normalized_weight, max_normalized_weight, locality_weighted_balancing_);
127 0 : per_priority_state->current_lb_ = createLoadBalancer(
128 0 : std::move(normalized_host_weights), min_normalized_weight, max_normalized_weight);
129 0 : }
130 :
131 0 : {
132 0 : absl::WriterMutexLock lock(&factory_->mutex_);
133 0 : factory_->healthy_per_priority_load_ = healthy_per_priority_load;
134 0 : factory_->degraded_per_priority_load_ = degraded_per_priority_load;
135 0 : factory_->per_priority_state_ = per_priority_state_vector;
136 0 : }
137 0 : }
138 :
139 : HostConstSharedPtr
140 0 : ThreadAwareLoadBalancerBase::LoadBalancerImpl::chooseHost(LoadBalancerContext* context) {
141 : // Make sure we correctly return nullptr for any early chooseHost() calls.
142 0 : if (per_priority_state_ == nullptr) {
143 0 : return nullptr;
144 0 : }
145 :
146 0 : HostConstSharedPtr host;
147 :
148 : // If there is no hash in the context, just choose a random value (this effectively becomes
149 : // the random LB but it won't crash if someone configures it this way).
150 : // computeHashKey() may be computed on demand, so get it only once.
151 0 : absl::optional<uint64_t> hash;
152 0 : if (context) {
153 0 : hash = context->computeHashKey();
154 0 : }
155 0 : const uint64_t h = hash ? hash.value() : random_.random();
156 :
157 0 : const uint32_t priority =
158 0 : LoadBalancerBase::choosePriority(h, *healthy_per_priority_load_, *degraded_per_priority_load_)
159 0 : .first;
160 0 : const auto& per_priority_state = (*per_priority_state_)[priority];
161 0 : if (per_priority_state->global_panic_) {
162 0 : stats_.lb_healthy_panic_.inc();
163 0 : }
164 :
165 0 : const uint32_t max_attempts = context ? context->hostSelectionRetryCount() + 1 : 1;
166 0 : for (uint32_t i = 0; i < max_attempts; ++i) {
167 0 : host = per_priority_state->current_lb_->chooseHost(h, i);
168 :
169 : // If host selection failed or the host is accepted by the filter, return.
170 : // Otherwise, try again.
171 0 : if (!host || !context || !context->shouldSelectAnotherHost(*host)) {
172 0 : return host;
173 0 : }
174 0 : }
175 0 : return host;
176 0 : }
177 :
178 0 : LoadBalancerPtr ThreadAwareLoadBalancerBase::LoadBalancerFactoryImpl::create(LoadBalancerParams) {
179 0 : auto lb = std::make_unique<LoadBalancerImpl>(stats_, random_);
180 :
181 : // We must protect current_lb_ via a RW lock since it is accessed and written to by multiple
182 : // threads. All complex processing has already been precalculated however.
183 0 : absl::ReaderMutexLock lock(&mutex_);
184 0 : lb->healthy_per_priority_load_ = healthy_per_priority_load_;
185 0 : lb->degraded_per_priority_load_ = degraded_per_priority_load_;
186 0 : lb->per_priority_state_ = per_priority_state_;
187 0 : return lb;
188 0 : }
189 :
190 : double ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::hostOverloadFactor(
191 0 : const Host& host, double weight) const {
192 : // TODO(scheler): This will not work if rq_active cluster stat is disabled, need to detect
193 : // and alert the user if that's the case.
194 :
195 0 : const uint32_t overall_active = host.cluster().trafficStats()->upstream_rq_active_.value();
196 0 : const uint32_t host_active = host.stats().rq_active_.value();
197 :
198 0 : const uint32_t total_slots = ((overall_active + 1) * hash_balance_factor_ + 99) / 100;
199 0 : const uint32_t slots =
200 0 : std::max(static_cast<uint32_t>(std::ceil(total_slots * weight)), static_cast<uint32_t>(1));
201 :
202 0 : if (host.stats().rq_active_.value() > slots) {
203 0 : ENVOY_LOG_MISC(
204 0 : debug,
205 0 : "ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::chooseHost: "
206 0 : "host {} overloaded; overall_active {}, host_weight {}, host_active {} > slots {}",
207 0 : host.address()->asString(), overall_active, weight, host_active, slots);
208 0 : }
209 0 : return static_cast<double>(host.stats().rq_active_.value()) / slots;
210 0 : }
211 :
212 : HostConstSharedPtr
213 : ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::chooseHost(uint64_t hash,
214 0 : uint32_t attempt) const {
215 :
216 : // This is implemented based on the method described in the paper
217 : // https://arxiv.org/abs/1608.01350. For the specified `hash_balance_factor`, requests to any
218 : // upstream host are capped at `hash_balance_factor/100` times the average number of requests
219 : // across the cluster. When a request arrives for an upstream host that is currently serving at
220 : // its max capacity, linear probing is used to identify an eligible host. Further, the linear
221 : // probe is implemented using a random jump on hosts ring/table to identify the eligible host
222 : // (this technique is as described in the paper https://arxiv.org/abs/1908.08762 - the random jump
223 : // avoids the cascading overflow effect when choosing the next host on the ring/table).
224 : //
225 : // If weights are specified on the hosts, they are respected.
226 : //
227 : // This is an O(N) algorithm, unlike other load balancers. Using a lower `hash_balance_factor`
228 : // results in more hosts being probed, so use a higher value if you require better performance.
229 :
230 0 : if (normalized_host_weights_.empty()) {
231 0 : return nullptr;
232 0 : }
233 :
234 0 : HostConstSharedPtr host = hashing_lb_ptr_->chooseHost(hash, attempt);
235 0 : if (host == nullptr) {
236 0 : return nullptr;
237 0 : }
238 0 : const double weight = normalized_host_weights_map_.at(host);
239 0 : double overload_factor = hostOverloadFactor(*host, weight);
240 0 : if (overload_factor <= 1.0) {
241 0 : ENVOY_LOG_MISC(debug,
242 0 : "ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::chooseHost: "
243 0 : "selected host #{} (attempt:1)",
244 0 : host->address()->asString());
245 0 : return host;
246 0 : }
247 :
248 : // When a host is overloaded, we choose the next host in a random manner rather than picking the
249 : // next one in the ring. The random sequence is seeded by the hash, so the same input gets the
250 : // same sequence of hosts all the time.
251 0 : const uint32_t num_hosts = normalized_host_weights_.size();
252 0 : auto host_index = std::vector<uint32_t>(num_hosts);
253 0 : for (uint32_t i = 0; i < num_hosts; i++) {
254 0 : host_index[i] = i;
255 0 : }
256 :
257 : // Not using Random::RandomGenerator as it does not take a seed. Seeded RNG is a requirement
258 : // here as we need the same shuffle sequence for the same hash every time.
259 : // Further, not using std::default_random_engine and std::uniform_int_distribution as they
260 : // are not consistent across Linux and Windows platforms.
261 0 : const uint64_t seed = hash;
262 0 : std::mt19937 random(seed);
263 :
264 : // generates a random number in the range [0,k) uniformly.
265 0 : auto uniform_int = [](std::mt19937& random, uint32_t k) -> uint32_t {
266 0 : uint32_t x = k;
267 0 : while (x >= k) {
268 0 : x = random() / ((static_cast<uint64_t>(random.max()) + 1u) / k);
269 0 : }
270 0 : return x;
271 0 : };
272 :
273 0 : HostConstSharedPtr alt_host, least_overloaded_host = host;
274 0 : double least_overload_factor = overload_factor;
275 0 : for (uint32_t i = 0; i < num_hosts; i++) {
276 : // The random shuffle algorithm
277 0 : const uint32_t j = uniform_int(random, num_hosts - i);
278 0 : std::swap(host_index[i], host_index[i + j]);
279 :
280 0 : const uint32_t k = host_index[i];
281 0 : alt_host = normalized_host_weights_[k].first;
282 0 : if (alt_host == host) {
283 0 : continue;
284 0 : }
285 :
286 0 : const double alt_host_weight = normalized_host_weights_[k].second;
287 0 : overload_factor = hostOverloadFactor(*alt_host, alt_host_weight);
288 :
289 0 : if (overload_factor <= 1.0) {
290 0 : ENVOY_LOG_MISC(debug,
291 0 : "ThreadAwareLoadBalancerBase::BoundedLoadHashingLoadBalancer::chooseHost: "
292 0 : "selected host #{}:{} (attempt:{})",
293 0 : k, alt_host->address()->asString(), i + 2);
294 0 : return alt_host;
295 0 : }
296 :
297 0 : if (least_overload_factor > overload_factor) {
298 0 : least_overloaded_host = alt_host;
299 0 : least_overload_factor = overload_factor;
300 0 : }
301 0 : }
302 :
303 0 : return least_overloaded_host;
304 0 : }
305 :
306 : } // namespace Upstream
307 : } // namespace Envoy
|