Line data Source code
1 : #include "source/extensions/load_balancing_policies/ring_hash/ring_hash_lb.h"
2 :
3 : #include <cstdint>
4 : #include <iostream>
5 : #include <string>
6 : #include <vector>
7 :
8 : #include "envoy/config/cluster/v3/cluster.pb.h"
9 :
10 : #include "source/common/common/assert.h"
11 : #include "source/common/upstream/load_balancer_impl.h"
12 :
13 : #include "absl/container/inlined_vector.h"
14 : #include "absl/strings/string_view.h"
15 :
16 : namespace Envoy {
17 : namespace Upstream {
18 :
19 0 : LegacyRingHashLbConfig::LegacyRingHashLbConfig(const ClusterProto& cluster) {
20 0 : if (cluster.has_ring_hash_lb_config()) {
21 0 : lb_config_ = cluster.ring_hash_lb_config();
22 0 : }
23 0 : }
24 :
25 : TypedRingHashLbConfig::TypedRingHashLbConfig(const RingHashLbProto& lb_config)
26 0 : : lb_config_(lb_config) {}
27 :
28 : RingHashLoadBalancer::RingHashLoadBalancer(
29 : const PrioritySet& priority_set, ClusterLbStats& stats, Stats::Scope& scope,
30 : Runtime::Loader& runtime, Random::RandomGenerator& random,
31 : OptRef<const envoy::config::cluster::v3::Cluster::RingHashLbConfig> config,
32 : const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
33 : : ThreadAwareLoadBalancerBase(priority_set, stats, runtime, random,
34 : PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
35 : common_config, healthy_panic_threshold, 100, 50),
36 : common_config.has_locality_weighted_lb_config()),
37 : scope_(scope.createScope("ring_hash_lb.")), stats_(generateStats(*scope_)),
38 : min_ring_size_(config.has_value() ? PROTOBUF_GET_WRAPPED_OR_DEFAULT(
39 : config.ref(), minimum_ring_size, DefaultMinRingSize)
40 : : DefaultMinRingSize),
41 : max_ring_size_(config.has_value() ? PROTOBUF_GET_WRAPPED_OR_DEFAULT(
42 : config.ref(), maximum_ring_size, DefaultMaxRingSize)
43 : : DefaultMaxRingSize),
44 : hash_function_(config.has_value()
45 : ? config->hash_function()
46 : : HashFunction::Cluster_RingHashLbConfig_HashFunction_XX_HASH),
47 : use_hostname_for_hashing_(
48 : common_config.has_consistent_hashing_lb_config()
49 : ? common_config.consistent_hashing_lb_config().use_hostname_for_hashing()
50 : : false),
51 : hash_balance_factor_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
52 0 : common_config.consistent_hashing_lb_config(), hash_balance_factor, 0)) {
53 : // It's important to do any config validation here, rather than deferring to Ring's ctor,
54 : // because any exceptions thrown here will be caught and handled properly.
55 0 : if (min_ring_size_ > max_ring_size_) {
56 0 : throw EnvoyException(fmt::format("ring hash: minimum_ring_size ({}) > maximum_ring_size ({})",
57 0 : min_ring_size_, max_ring_size_));
58 0 : }
59 0 : }
60 :
61 : RingHashLoadBalancer::RingHashLoadBalancer(
62 : const PrioritySet& priority_set, ClusterLbStats& stats, Stats::Scope& scope,
63 : Runtime::Loader& runtime, Random::RandomGenerator& random, uint32_t healthy_panic_threshold,
64 : const envoy::extensions::load_balancing_policies::ring_hash::v3::RingHash& config)
65 : : ThreadAwareLoadBalancerBase(priority_set, stats, runtime, random, healthy_panic_threshold,
66 : config.has_locality_weighted_lb_config()),
67 : scope_(scope.createScope("ring_hash_lb.")), stats_(generateStats(*scope_)),
68 : min_ring_size_(
69 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, minimum_ring_size, DefaultMinRingSize)),
70 : max_ring_size_(
71 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, maximum_ring_size, DefaultMaxRingSize)),
72 : hash_function_(static_cast<HashFunction>(config.hash_function())),
73 : use_hostname_for_hashing_(
74 : config.has_consistent_hashing_lb_config()
75 : ? config.consistent_hashing_lb_config().use_hostname_for_hashing()
76 : : config.use_hostname_for_hashing()),
77 : hash_balance_factor_(config.has_consistent_hashing_lb_config()
78 : ? PROTOBUF_GET_WRAPPED_OR_DEFAULT(
79 : config.consistent_hashing_lb_config(), hash_balance_factor, 0)
80 0 : : PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, hash_balance_factor, 0)) {
81 : // It's important to do any config validation here, rather than deferring to Ring's ctor,
82 : // because any exceptions thrown here will be caught and handled properly.
83 0 : if (min_ring_size_ > max_ring_size_) {
84 0 : throw EnvoyException(fmt::format("ring hash: minimum_ring_size ({}) > maximum_ring_size ({})",
85 0 : min_ring_size_, max_ring_size_));
86 0 : }
87 0 : }
88 :
89 0 : RingHashLoadBalancerStats RingHashLoadBalancer::generateStats(Stats::Scope& scope) {
90 0 : return {ALL_RING_HASH_LOAD_BALANCER_STATS(POOL_GAUGE(scope))};
91 0 : }
92 :
93 0 : HostConstSharedPtr RingHashLoadBalancer::Ring::chooseHost(uint64_t h, uint32_t attempt) const {
94 0 : if (ring_.empty()) {
95 0 : return nullptr;
96 0 : }
97 :
98 : // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c (ketama_get_server)
99 : // I've generally kept the variable names to make the code easier to compare.
100 : // NOTE: The algorithm depends on using signed integers for lowp, midp, and highp. Do not
101 : // change them!
102 0 : int64_t lowp = 0;
103 0 : int64_t highp = ring_.size();
104 0 : int64_t midp = 0;
105 0 : while (true) {
106 0 : midp = (lowp + highp) / 2;
107 :
108 0 : if (midp == static_cast<int64_t>(ring_.size())) {
109 0 : midp = 0;
110 0 : break;
111 0 : }
112 :
113 0 : uint64_t midval = ring_[midp].hash_;
114 0 : uint64_t midval1 = midp == 0 ? 0 : ring_[midp - 1].hash_;
115 :
116 0 : if (h <= midval && h > midval1) {
117 0 : break;
118 0 : }
119 :
120 0 : if (midval < h) {
121 0 : lowp = midp + 1;
122 0 : } else {
123 0 : highp = midp - 1;
124 0 : }
125 :
126 0 : if (lowp > highp) {
127 0 : midp = 0;
128 0 : break;
129 0 : }
130 0 : }
131 :
132 : // If a retry host predicate is being applied, behave as if this host was not in the ring.
133 : // Note that this does not guarantee a different host: e.g., attempt == ring_.size() or
134 : // when the offset causes us to select the same host at another location in the ring.
135 0 : if (attempt > 0) {
136 0 : midp = (midp + attempt) % ring_.size();
137 0 : }
138 :
139 0 : return ring_[midp].host_;
140 0 : }
141 :
142 : using HashFunction = envoy::config::cluster::v3::Cluster::RingHashLbConfig::HashFunction;
143 : RingHashLoadBalancer::Ring::Ring(const NormalizedHostWeightVector& normalized_host_weights,
144 : double min_normalized_weight, uint64_t min_ring_size,
145 : uint64_t max_ring_size, HashFunction hash_function,
146 : bool use_hostname_for_hashing, RingHashLoadBalancerStats& stats)
147 0 : : stats_(stats) {
148 0 : ENVOY_LOG(trace, "ring hash: building ring");
149 :
150 : // We can't do anything sensible with no hosts.
151 0 : if (normalized_host_weights.empty()) {
152 0 : return;
153 0 : }
154 :
155 : // Scale up the number of hashes per host such that the least-weighted host gets a whole number
156 : // of hashes on the ring. Other hosts might not end up with whole numbers, and that's fine (the
157 : // ring-building algorithm below can handle this). This preserves the original implementation's
158 : // behavior: when weights aren't provided, all hosts should get an equal number of hashes. In
159 : // the case where this number exceeds the max_ring_size, it's scaled back down to fit.
160 0 : const double scale =
161 0 : std::min(std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
162 0 : static_cast<double>(max_ring_size));
163 :
164 : // Reserve memory for the entire ring up front.
165 0 : const uint64_t ring_size = std::ceil(scale);
166 0 : ring_.reserve(ring_size);
167 :
168 : // Populate the hash ring by walking through the (host, weight) pairs in
169 : // normalized_host_weights, and generating (scale * weight) hashes for each host. Since these
170 : // aren't necessarily whole numbers, we maintain running sums -- current_hashes and
171 : // target_hashes -- which allows us to populate the ring in a mostly stable way.
172 : //
173 : // For example, suppose we have 4 hosts, each with a normalized weight of 0.25, and a scale of
174 : // 6.0 (because the max_ring_size is 6). That means we want to generate 1.5 hashes per host.
175 : // We start the outer loop with current_hashes = 0 and target_hashes = 0.
176 : // - For the first host, we set target_hashes = 1.5. After one run of the inner loop,
177 : // current_hashes = 1. After another run, current_hashes = 2, so the inner loop ends.
178 : // - For the second host, target_hashes becomes 3.0, and current_hashes is 2 from before.
179 : // After only one run of the inner loop, current_hashes = 3, so the inner loop ends.
180 : // - Likewise, the third host gets two hashes, and the fourth host gets one hash.
181 : //
182 : // For stats reporting, keep track of the minimum and maximum actual number of hashes per host.
183 : // Users should hopefully pay attention to these numbers and alert if min_hashes_per_host is too
184 : // low, since that implies an inaccurate request distribution.
185 :
186 0 : absl::InlinedVector<char, 196> hash_key_buffer;
187 0 : double current_hashes = 0.0;
188 0 : double target_hashes = 0.0;
189 0 : uint64_t min_hashes_per_host = ring_size;
190 0 : uint64_t max_hashes_per_host = 0;
191 0 : for (const auto& entry : normalized_host_weights) {
192 0 : const auto& host = entry.first;
193 0 : const absl::string_view key_to_hash = hashKey(host, use_hostname_for_hashing);
194 0 : ASSERT(!key_to_hash.empty());
195 :
196 0 : hash_key_buffer.assign(key_to_hash.begin(), key_to_hash.end());
197 0 : hash_key_buffer.emplace_back('_');
198 0 : auto offset_start = hash_key_buffer.end();
199 :
200 : // As noted above: maintain current_hashes and target_hashes as running sums across the entire
201 : // host set. `i` is needed only to construct the hash key, and tally min/max hashes per host.
202 0 : target_hashes += scale * entry.second;
203 0 : uint64_t i = 0;
204 0 : while (current_hashes < target_hashes) {
205 0 : const std::string i_str = absl::StrCat("", i);
206 0 : hash_key_buffer.insert(offset_start, i_str.begin(), i_str.end());
207 :
208 0 : absl::string_view hash_key(static_cast<char*>(hash_key_buffer.data()),
209 0 : hash_key_buffer.size());
210 :
211 0 : const uint64_t hash =
212 0 : (hash_function == HashFunction::Cluster_RingHashLbConfig_HashFunction_MURMUR_HASH_2)
213 0 : ? MurmurHash::murmurHash2(hash_key, MurmurHash::STD_HASH_SEED)
214 0 : : HashUtil::xxHash64(hash_key);
215 :
216 0 : ENVOY_LOG(trace, "ring hash: hash_key={} hash={}", hash_key, hash);
217 0 : ring_.push_back({hash, host});
218 0 : ++i;
219 0 : ++current_hashes;
220 0 : hash_key_buffer.erase(offset_start, hash_key_buffer.end());
221 0 : }
222 0 : min_hashes_per_host = std::min(i, min_hashes_per_host);
223 0 : max_hashes_per_host = std::max(i, max_hashes_per_host);
224 0 : }
225 :
226 0 : std::sort(ring_.begin(), ring_.end(), [](const RingEntry& lhs, const RingEntry& rhs) -> bool {
227 0 : return lhs.hash_ < rhs.hash_;
228 0 : });
229 0 : if (ENVOY_LOG_CHECK_LEVEL(trace)) {
230 0 : for (const auto& entry : ring_) {
231 0 : const absl::string_view key_to_hash = hashKey(entry.host_, use_hostname_for_hashing);
232 0 : ENVOY_LOG(trace, "ring hash: host={} hash={}", key_to_hash, entry.hash_);
233 0 : }
234 0 : }
235 :
236 0 : stats_.size_.set(ring_size);
237 0 : stats_.min_hashes_per_host_.set(min_hashes_per_host);
238 0 : stats_.max_hashes_per_host_.set(max_hashes_per_host);
239 0 : }
240 :
241 : } // namespace Upstream
242 : } // namespace Envoy
|