/proc/self/cwd/source/extensions/load_balancing_policies/ring_hash/ring_hash_lb.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/load_balancing_policies/ring_hash/ring_hash_lb.h" |
2 | | |
3 | | #include <cstdint> |
4 | | #include <iostream> |
5 | | #include <string> |
6 | | #include <vector> |
7 | | |
8 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
9 | | |
10 | | #include "source/common/common/assert.h" |
11 | | #include "source/common/upstream/load_balancer_impl.h" |
12 | | |
13 | | #include "absl/container/inlined_vector.h" |
14 | | #include "absl/strings/string_view.h" |
15 | | |
16 | | namespace Envoy { |
17 | | namespace Upstream { |
18 | | |
19 | 0 | LegacyRingHashLbConfig::LegacyRingHashLbConfig(const ClusterProto& cluster) { |
20 | 0 | if (cluster.has_ring_hash_lb_config()) { |
21 | 0 | lb_config_ = cluster.ring_hash_lb_config(); |
22 | 0 | } |
23 | 0 | } |
24 | | |
25 | | TypedRingHashLbConfig::TypedRingHashLbConfig(const RingHashLbProto& lb_config) |
26 | 0 | : lb_config_(lb_config) {} |
27 | | |
28 | | RingHashLoadBalancer::RingHashLoadBalancer( |
29 | | const PrioritySet& priority_set, ClusterLbStats& stats, Stats::Scope& scope, |
30 | | Runtime::Loader& runtime, Random::RandomGenerator& random, |
31 | | OptRef<const envoy::config::cluster::v3::Cluster::RingHashLbConfig> config, |
32 | | const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) |
33 | | : ThreadAwareLoadBalancerBase(priority_set, stats, runtime, random, |
34 | | PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT( |
35 | | common_config, healthy_panic_threshold, 100, 50), |
36 | | common_config.has_locality_weighted_lb_config()), |
37 | | scope_(scope.createScope("ring_hash_lb.")), stats_(generateStats(*scope_)), |
38 | | min_ring_size_(config.has_value() ? PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
39 | | config.ref(), minimum_ring_size, DefaultMinRingSize) |
40 | | : DefaultMinRingSize), |
41 | | max_ring_size_(config.has_value() ? PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
42 | | config.ref(), maximum_ring_size, DefaultMaxRingSize) |
43 | | : DefaultMaxRingSize), |
44 | | hash_function_(config.has_value() |
45 | | ? config->hash_function() |
46 | | : HashFunction::Cluster_RingHashLbConfig_HashFunction_XX_HASH), |
47 | | use_hostname_for_hashing_( |
48 | | common_config.has_consistent_hashing_lb_config() |
49 | | ? common_config.consistent_hashing_lb_config().use_hostname_for_hashing() |
50 | | : false), |
51 | | hash_balance_factor_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
52 | 0 | common_config.consistent_hashing_lb_config(), hash_balance_factor, 0)) { |
53 | | // It's important to do any config validation here, rather than deferring to Ring's ctor, |
54 | | // because any exceptions thrown here will be caught and handled properly. |
55 | 0 | if (min_ring_size_ > max_ring_size_) { |
56 | 0 | throw EnvoyException(fmt::format("ring hash: minimum_ring_size ({}) > maximum_ring_size ({})", |
57 | 0 | min_ring_size_, max_ring_size_)); |
58 | 0 | } |
59 | 0 | } |
60 | | |
61 | | RingHashLoadBalancer::RingHashLoadBalancer( |
62 | | const PrioritySet& priority_set, ClusterLbStats& stats, Stats::Scope& scope, |
63 | | Runtime::Loader& runtime, Random::RandomGenerator& random, uint32_t healthy_panic_threshold, |
64 | | const envoy::extensions::load_balancing_policies::ring_hash::v3::RingHash& config) |
65 | | : ThreadAwareLoadBalancerBase(priority_set, stats, runtime, random, healthy_panic_threshold, |
66 | | config.has_locality_weighted_lb_config()), |
67 | | scope_(scope.createScope("ring_hash_lb.")), stats_(generateStats(*scope_)), |
68 | | min_ring_size_( |
69 | | PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, minimum_ring_size, DefaultMinRingSize)), |
70 | | max_ring_size_( |
71 | | PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, maximum_ring_size, DefaultMaxRingSize)), |
72 | | hash_function_(static_cast<HashFunction>(config.hash_function())), |
73 | | use_hostname_for_hashing_( |
74 | | config.has_consistent_hashing_lb_config() |
75 | | ? config.consistent_hashing_lb_config().use_hostname_for_hashing() |
76 | | : config.use_hostname_for_hashing()), |
77 | | hash_balance_factor_(config.has_consistent_hashing_lb_config() |
78 | | ? PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
79 | | config.consistent_hashing_lb_config(), hash_balance_factor, 0) |
80 | 0 | : PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, hash_balance_factor, 0)) { |
81 | | // It's important to do any config validation here, rather than deferring to Ring's ctor, |
82 | | // because any exceptions thrown here will be caught and handled properly. |
83 | 0 | if (min_ring_size_ > max_ring_size_) { |
84 | 0 | throw EnvoyException(fmt::format("ring hash: minimum_ring_size ({}) > maximum_ring_size ({})", |
85 | 0 | min_ring_size_, max_ring_size_)); |
86 | 0 | } |
87 | 0 | } |
88 | | |
89 | 0 | RingHashLoadBalancerStats RingHashLoadBalancer::generateStats(Stats::Scope& scope) { |
90 | 0 | return {ALL_RING_HASH_LOAD_BALANCER_STATS(POOL_GAUGE(scope))}; |
91 | 0 | } |
92 | | |
93 | 0 | HostConstSharedPtr RingHashLoadBalancer::Ring::chooseHost(uint64_t h, uint32_t attempt) const { |
94 | 0 | if (ring_.empty()) { |
95 | 0 | return nullptr; |
96 | 0 | } |
97 | | |
98 | | // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c (ketama_get_server) |
99 | | // I've generally kept the variable names to make the code easier to compare. |
100 | | // NOTE: The algorithm depends on using signed integers for lowp, midp, and highp. Do not |
101 | | // change them! |
102 | 0 | int64_t lowp = 0; |
103 | 0 | int64_t highp = ring_.size(); |
104 | 0 | int64_t midp = 0; |
105 | 0 | while (true) { |
106 | 0 | midp = (lowp + highp) / 2; |
107 | |
|
108 | 0 | if (midp == static_cast<int64_t>(ring_.size())) { |
109 | 0 | midp = 0; |
110 | 0 | break; |
111 | 0 | } |
112 | | |
113 | 0 | uint64_t midval = ring_[midp].hash_; |
114 | 0 | uint64_t midval1 = midp == 0 ? 0 : ring_[midp - 1].hash_; |
115 | |
|
116 | 0 | if (h <= midval && h > midval1) { |
117 | 0 | break; |
118 | 0 | } |
119 | | |
120 | 0 | if (midval < h) { |
121 | 0 | lowp = midp + 1; |
122 | 0 | } else { |
123 | 0 | highp = midp - 1; |
124 | 0 | } |
125 | |
|
126 | 0 | if (lowp > highp) { |
127 | 0 | midp = 0; |
128 | 0 | break; |
129 | 0 | } |
130 | 0 | } |
131 | | |
132 | | // If a retry host predicate is being applied, behave as if this host was not in the ring. |
133 | | // Note that this does not guarantee a different host: e.g., attempt == ring_.size() or |
134 | | // when the offset causes us to select the same host at another location in the ring. |
135 | 0 | if (attempt > 0) { |
136 | 0 | midp = (midp + attempt) % ring_.size(); |
137 | 0 | } |
138 | |
|
139 | 0 | return ring_[midp].host_; |
140 | 0 | } |
141 | | |
142 | | using HashFunction = envoy::config::cluster::v3::Cluster::RingHashLbConfig::HashFunction; |
143 | | RingHashLoadBalancer::Ring::Ring(const NormalizedHostWeightVector& normalized_host_weights, |
144 | | double min_normalized_weight, uint64_t min_ring_size, |
145 | | uint64_t max_ring_size, HashFunction hash_function, |
146 | | bool use_hostname_for_hashing, RingHashLoadBalancerStats& stats) |
147 | 0 | : stats_(stats) { |
148 | 0 | ENVOY_LOG(trace, "ring hash: building ring"); |
149 | | |
150 | | // We can't do anything sensible with no hosts. |
151 | 0 | if (normalized_host_weights.empty()) { |
152 | 0 | return; |
153 | 0 | } |
154 | | |
155 | | // Scale up the number of hashes per host such that the least-weighted host gets a whole number |
156 | | // of hashes on the ring. Other hosts might not end up with whole numbers, and that's fine (the |
157 | | // ring-building algorithm below can handle this). This preserves the original implementation's |
158 | | // behavior: when weights aren't provided, all hosts should get an equal number of hashes. In |
159 | | // the case where this number exceeds the max_ring_size, it's scaled back down to fit. |
160 | 0 | const double scale = |
161 | 0 | std::min(std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight, |
162 | 0 | static_cast<double>(max_ring_size)); |
163 | | |
164 | | // Reserve memory for the entire ring up front. |
165 | 0 | const uint64_t ring_size = std::ceil(scale); |
166 | 0 | ring_.reserve(ring_size); |
167 | | |
168 | | // Populate the hash ring by walking through the (host, weight) pairs in |
169 | | // normalized_host_weights, and generating (scale * weight) hashes for each host. Since these |
170 | | // aren't necessarily whole numbers, we maintain running sums -- current_hashes and |
171 | | // target_hashes -- which allows us to populate the ring in a mostly stable way. |
172 | | // |
173 | | // For example, suppose we have 4 hosts, each with a normalized weight of 0.25, and a scale of |
174 | | // 6.0 (because the max_ring_size is 6). That means we want to generate 1.5 hashes per host. |
175 | | // We start the outer loop with current_hashes = 0 and target_hashes = 0. |
176 | | // - For the first host, we set target_hashes = 1.5. After one run of the inner loop, |
177 | | // current_hashes = 1. After another run, current_hashes = 2, so the inner loop ends. |
178 | | // - For the second host, target_hashes becomes 3.0, and current_hashes is 2 from before. |
179 | | // After only one run of the inner loop, current_hashes = 3, so the inner loop ends. |
180 | | // - Likewise, the third host gets two hashes, and the fourth host gets one hash. |
181 | | // |
182 | | // For stats reporting, keep track of the minimum and maximum actual number of hashes per host. |
183 | | // Users should hopefully pay attention to these numbers and alert if min_hashes_per_host is too |
184 | | // low, since that implies an inaccurate request distribution. |
185 | |
|
186 | 0 | absl::InlinedVector<char, 196> hash_key_buffer; |
187 | 0 | double current_hashes = 0.0; |
188 | 0 | double target_hashes = 0.0; |
189 | 0 | uint64_t min_hashes_per_host = ring_size; |
190 | 0 | uint64_t max_hashes_per_host = 0; |
191 | 0 | for (const auto& entry : normalized_host_weights) { |
192 | 0 | const auto& host = entry.first; |
193 | 0 | const absl::string_view key_to_hash = hashKey(host, use_hostname_for_hashing); |
194 | 0 | ASSERT(!key_to_hash.empty()); |
195 | | |
196 | 0 | hash_key_buffer.assign(key_to_hash.begin(), key_to_hash.end()); |
197 | 0 | hash_key_buffer.emplace_back('_'); |
198 | 0 | auto offset_start = hash_key_buffer.end(); |
199 | | |
200 | | // As noted above: maintain current_hashes and target_hashes as running sums across the entire |
201 | | // host set. `i` is needed only to construct the hash key, and tally min/max hashes per host. |
202 | 0 | target_hashes += scale * entry.second; |
203 | 0 | uint64_t i = 0; |
204 | 0 | while (current_hashes < target_hashes) { |
205 | 0 | const std::string i_str = absl::StrCat("", i); |
206 | 0 | hash_key_buffer.insert(offset_start, i_str.begin(), i_str.end()); |
207 | |
|
208 | 0 | absl::string_view hash_key(static_cast<char*>(hash_key_buffer.data()), |
209 | 0 | hash_key_buffer.size()); |
210 | |
|
211 | 0 | const uint64_t hash = |
212 | 0 | (hash_function == HashFunction::Cluster_RingHashLbConfig_HashFunction_MURMUR_HASH_2) |
213 | 0 | ? MurmurHash::murmurHash2(hash_key, MurmurHash::STD_HASH_SEED) |
214 | 0 | : HashUtil::xxHash64(hash_key); |
215 | |
|
216 | 0 | ENVOY_LOG(trace, "ring hash: hash_key={} hash={}", hash_key, hash); |
217 | 0 | ring_.push_back({hash, host}); |
218 | 0 | ++i; |
219 | 0 | ++current_hashes; |
220 | 0 | hash_key_buffer.erase(offset_start, hash_key_buffer.end()); |
221 | 0 | } |
222 | 0 | min_hashes_per_host = std::min(i, min_hashes_per_host); |
223 | 0 | max_hashes_per_host = std::max(i, max_hashes_per_host); |
224 | 0 | } |
225 | | |
226 | 0 | std::sort(ring_.begin(), ring_.end(), [](const RingEntry& lhs, const RingEntry& rhs) -> bool { |
227 | 0 | return lhs.hash_ < rhs.hash_; |
228 | 0 | }); |
229 | 0 | if (ENVOY_LOG_CHECK_LEVEL(trace)) { |
230 | 0 | for (const auto& entry : ring_) { |
231 | 0 | const absl::string_view key_to_hash = hashKey(entry.host_, use_hostname_for_hashing); |
232 | 0 | ENVOY_LOG(trace, "ring hash: host={} hash={}", key_to_hash, entry.hash_); |
233 | 0 | } |
234 | 0 | } |
235 | |
|
236 | 0 | stats_.size_.set(ring_size); |
237 | 0 | stats_.min_hashes_per_host_.set(min_hashes_per_host); |
238 | 0 | stats_.max_hashes_per_host_.set(max_hashes_per_host); |
239 | 0 | } |
240 | | |
241 | | } // namespace Upstream |
242 | | } // namespace Envoy |