1
#include "source/extensions/load_balancing_policies/ring_hash/ring_hash_lb.h"
2

            
3
#include <cstdint>
4
#include <iostream>
5
#include <string>
6
#include <vector>
7

            
8
#include "envoy/config/cluster/v3/cluster.pb.h"
9

            
10
#include "source/common/common/assert.h"
11

            
12
#include "absl/container/inlined_vector.h"
13
#include "absl/strings/string_view.h"
14

            
15
namespace Envoy {
16
namespace Upstream {
17

            
18
TypedRingHashLbConfig::TypedRingHashLbConfig(const CommonLbConfigProto& common_lb_config,
19
37
                                             const LegacyRingHashLbProto& lb_config) {
20
37
  LoadBalancerConfigHelper::convertHashLbConfigTo(common_lb_config, lb_config_);
21
37
  if (common_lb_config.has_locality_weighted_lb_config()) {
22
2
    lb_config_.mutable_locality_weighted_lb_config();
23
2
  }
24

            
25
37
  if (lb_config.has_minimum_ring_size()) {
26
3
    *lb_config_.mutable_minimum_ring_size() = lb_config.minimum_ring_size();
27
3
  }
28
37
  if (lb_config.has_maximum_ring_size()) {
29
1
    *lb_config_.mutable_maximum_ring_size() = lb_config.maximum_ring_size();
30
1
  }
31
37
  if (lb_config.hash_function() ==
32
37
      envoy::config::cluster::v3::Cluster::RingHashLbConfig::MURMUR_HASH_2) {
33
1
    lb_config_.set_hash_function(RingHashLbProto::MURMUR_HASH_2);
34
1
  }
35
37
}
36

            
37
TypedRingHashLbConfig::TypedRingHashLbConfig(const RingHashLbProto& lb_config,
38
                                             Regex::Engine& regex_engine,
39
                                             absl::Status& creation_status)
40
57
    : TypedHashLbConfigBase(lb_config.consistent_hashing_lb_config().hash_policy(), regex_engine,
41
57
                            creation_status),
42
57
      lb_config_(lb_config) {}
43

            
44
RingHashLoadBalancer::RingHashLoadBalancer(const PrioritySet& priority_set, ClusterLbStats& stats,
45
                                           Stats::Scope& scope, Runtime::Loader& runtime,
46
                                           Random::RandomGenerator& random,
47
                                           uint32_t healthy_panic_threshold,
48
                                           const RingHashLbProto& config,
49
                                           HashPolicySharedPtr hash_policy)
50
91
    : ThreadAwareLoadBalancerBase(priority_set, stats, runtime, random, healthy_panic_threshold,
51
91
                                  config.has_locality_weighted_lb_config(), std::move(hash_policy)),
52
91
      scope_(scope.createScope("ring_hash_lb.")), stats_(generateStats(*scope_)),
53
      min_ring_size_(
54
91
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, minimum_ring_size, DefaultMinRingSize)),
55
      max_ring_size_(
56
91
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, maximum_ring_size, DefaultMaxRingSize)),
57
91
      hash_function_(config.hash_function()),
58
      use_hostname_for_hashing_(
59
91
          config.has_consistent_hashing_lb_config()
60
91
              ? config.consistent_hashing_lb_config().use_hostname_for_hashing()
61
91
              : config.use_hostname_for_hashing()),
62
91
      hash_balance_factor_(config.has_consistent_hashing_lb_config()
63
91
                               ? PROTOBUF_GET_WRAPPED_OR_DEFAULT(
64
91
                                     config.consistent_hashing_lb_config(), hash_balance_factor, 0)
65
91
                               : PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, hash_balance_factor, 0)) {
66
  // It's important to do any config validation here, rather than deferring to Ring's ctor,
67
  // because any exceptions thrown here will be caught and handled properly.
68
91
  if (min_ring_size_ > max_ring_size_) {
69
3
    throw EnvoyException(fmt::format("ring hash: minimum_ring_size ({}) > maximum_ring_size ({})",
70
3
                                     min_ring_size_, max_ring_size_));
71
3
  }
72
91
}
73

            
74
91
RingHashLoadBalancerStats RingHashLoadBalancer::generateStats(Stats::Scope& scope) {
75
91
  return {ALL_RING_HASH_LOAD_BALANCER_STATS(POOL_GAUGE(scope))};
76
91
}
77

            
78
47420
HostSelectionResponse RingHashLoadBalancer::Ring::chooseHost(uint64_t h, uint32_t attempt) const {
79
47420
  if (ring_.empty()) {
80
4
    return {nullptr};
81
4
  }
82

            
83
  // Ported from https://github.com/RJ/ketama/blob/master/libketama/ketama.c (ketama_get_server)
84
  // I've generally kept the variable names to make the code easier to compare.
85
  // NOTE: The algorithm depends on using signed integers for lowp, midp, and highp. Do not
86
  //       change them!
87
47416
  int64_t lowp = 0;
88
47416
  int64_t highp = ring_.size();
89
47416
  int64_t midp = 0;
90
531878
  while (true) {
91
531878
    midp = (lowp + highp) / 2;
92

            
93
531878
    if (midp == static_cast<int64_t>(ring_.size())) {
94
270
      midp = 0;
95
270
      break;
96
270
    }
97

            
98
531608
    uint64_t midval = ring_[midp].hash_;
99
531608
    uint64_t midval1 = midp == 0 ? 0 : ring_[midp - 1].hash_;
100

            
101
531608
    if (h <= midval && h > midval1) {
102
47050
      break;
103
47050
    }
104

            
105
484558
    if (midval < h) {
106
248626
      lowp = midp + 1;
107
249762
    } else {
108
235932
      highp = midp - 1;
109
235932
    }
110

            
111
484558
    if (lowp > highp) {
112
96
      midp = 0;
113
96
      break;
114
96
    }
115
484558
  }
116

            
117
  // If a retry host predicate is being applied, behave as if this host was not in the ring.
118
  // Note that this does not guarantee a different host: e.g., attempt == ring_.size() or
119
  // when the offset causes us to select the same host at another location in the ring.
120
47416
  if (attempt > 0) {
121
32
    midp = (midp + attempt) % ring_.size();
122
32
  }
123

            
124
47416
  return ring_[midp].host_;
125
47420
}
126

            
127
using HashFunction = envoy::config::cluster::v3::Cluster::RingHashLbConfig::HashFunction;
128
RingHashLoadBalancer::Ring::Ring(const NormalizedHostWeightVector& normalized_host_weights,
129
                                 double min_normalized_weight, uint64_t min_ring_size,
130
                                 uint64_t max_ring_size, HashFunction hash_function,
131
                                 bool use_hostname_for_hashing, RingHashLoadBalancerStats& stats)
132
171
    : stats_(stats) {
133
171
  ENVOY_LOG(trace, "ring hash: building ring");
134

            
135
  // We can't do anything sensible with no hosts.
136
171
  if (normalized_host_weights.empty()) {
137
75
    return;
138
75
  }
139

            
140
  // Scale up the number of hashes per host such that the least-weighted host gets a whole number
141
  // of hashes on the ring. Other hosts might not end up with whole numbers, and that's fine (the
142
  // ring-building algorithm below can handle this). This preserves the original implementation's
143
  // behavior: when weights aren't provided, all hosts should get an equal number of hashes. In
144
  // the case where this number exceeds the max_ring_size, it's scaled back down to fit.
145
96
  const double scale =
146
96
      std::min(std::ceil(min_normalized_weight * min_ring_size) / min_normalized_weight,
147
96
               static_cast<double>(max_ring_size));
148

            
149
  // Reserve memory for the entire ring up front.
150
96
  const uint64_t ring_size = std::ceil(scale);
151
96
  ring_.reserve(ring_size);
152

            
153
  // Populate the hash ring by walking through the (host, weight) pairs in
154
  // normalized_host_weights, and generating (scale * weight) hashes for each host. Since these
155
  // aren't necessarily whole numbers, we maintain running sums -- current_hashes and
156
  // target_hashes -- which allows us to populate the ring in a mostly stable way.
157
  //
158
  // For example, suppose we have 4 hosts, each with a normalized weight of 0.25, and a scale of
159
  // 6.0 (because the max_ring_size is 6). That means we want to generate 1.5 hashes per host.
160
  // We start the outer loop with current_hashes = 0 and target_hashes = 0.
161
  //   - For the first host, we set target_hashes = 1.5. After one run of the inner loop,
162
  //     current_hashes = 1. After another run, current_hashes = 2, so the inner loop ends.
163
  //   - For the second host, target_hashes becomes 3.0, and current_hashes is 2 from before.
164
  //     After only one run of the inner loop, current_hashes = 3, so the inner loop ends.
165
  //   - Likewise, the third host gets two hashes, and the fourth host gets one hash.
166
  //
167
  // For stats reporting, keep track of the minimum and maximum actual number of hashes per host.
168
  // Users should hopefully pay attention to these numbers and alert if min_hashes_per_host is too
169
  // low, since that implies an inaccurate request distribution.
170

            
171
96
  absl::InlinedVector<char, 196> hash_key_buffer;
172
96
  double current_hashes = 0.0;
173
96
  double target_hashes = 0.0;
174
96
  uint64_t min_hashes_per_host = ring_size;
175
96
  uint64_t max_hashes_per_host = 0;
176
2380
  for (const auto& entry : normalized_host_weights) {
177
2380
    const auto& host = entry.first;
178
2380
    const absl::string_view key_to_hash = hashKey(host, use_hostname_for_hashing);
179
2380
    ASSERT(!key_to_hash.empty());
180

            
181
2380
    hash_key_buffer.assign(key_to_hash.begin(), key_to_hash.end());
182
2380
    hash_key_buffer.emplace_back('_');
183
2380
    auto offset_start = hash_key_buffer.end();
184

            
185
    // As noted above: maintain current_hashes and target_hashes as running sums across the entire
186
    // host set. `i` is needed only to construct the hash key, and tally min/max hashes per host.
187
2380
    target_hashes += scale * entry.second;
188
2380
    uint64_t i = 0;
189
95221
    while (current_hashes < target_hashes) {
190
92841
      const std::string i_str = absl::StrCat("", i);
191
92841
      hash_key_buffer.insert(offset_start, i_str.begin(), i_str.end());
192

            
193
92841
      absl::string_view hash_key(static_cast<char*>(hash_key_buffer.data()),
194
92841
                                 hash_key_buffer.size());
195

            
196
92841
      const uint64_t hash = (hash_function == HashFunction::RingHash_HashFunction_MURMUR_HASH_2)
197
92841
                                ? MurmurHash::murmurHash2(hash_key, MurmurHash::STD_HASH_SEED)
198
92841
                                : HashUtil::xxHash64(hash_key);
199

            
200
92841
      ENVOY_LOG(trace, "ring hash: hash_key={} hash={}", hash_key, hash);
201
92841
      ring_.push_back({hash, host});
202
92841
      ++i;
203
92841
      ++current_hashes;
204
92841
      hash_key_buffer.erase(offset_start, hash_key_buffer.end());
205
92841
    }
206
2380
    min_hashes_per_host = std::min(i, min_hashes_per_host);
207
2380
    max_hashes_per_host = std::max(i, max_hashes_per_host);
208
2380
  }
209

            
210
1191785
  std::sort(ring_.begin(), ring_.end(), [](const RingEntry& lhs, const RingEntry& rhs) -> bool {
211
1191785
    return lhs.hash_ < rhs.hash_;
212
1191785
  });
213
96
  if (ENVOY_LOG_CHECK_LEVEL(trace)) {
214
    for (const auto& entry : ring_) {
215
      const absl::string_view key_to_hash = hashKey(entry.host_, use_hostname_for_hashing);
216
      ENVOY_LOG(trace, "ring hash: host={} hash={}", key_to_hash, entry.hash_);
217
    }
218
  }
219

            
220
96
  stats_.size_.set(ring_size);
221
96
  stats_.min_hashes_per_host_.set(min_hashes_per_host);
222
96
  stats_.max_hashes_per_host_.set(max_hashes_per_host);
223
96
}
224

            
225
} // namespace Upstream
226
} // namespace Envoy