1
#pragma once
2

            
3
#include <bitset>
4

            
5
#include "envoy/common/callback.h"
6
#include "envoy/config/cluster/v3/cluster.pb.h"
7

            
8
#include "source/common/common/logger.h"
9
#include "source/common/config/metadata.h"
10
#include "source/common/config/well_known_names.h"
11
#include "source/common/http/hash_policy.h"
12
#include "source/extensions/load_balancing_policies/common/load_balancer_impl.h"
13

            
14
#include "absl/strings/string_view.h"
15
#include "absl/synchronization/mutex.h"
16

            
17
namespace Envoy {
18
namespace Upstream {
19

            
20
using NormalizedHostWeightVector = std::vector<std::pair<HostConstSharedPtr, double>>;
21
using NormalizedHostWeightMap = std::map<HostConstSharedPtr, double>;
22

            
23
using HashPolicyProto = envoy::config::route::v3::RouteAction::HashPolicy;
24
using HashPolicySharedPtr = std::shared_ptr<Http::HashPolicy>;
25

            
26
class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareLoadBalancer {
27
public:
28
  /**
29
   * Base class for a hashing load balancer implemented for use in a thread aware load balancer.
30
   * TODO(mattklein123): Currently only RingHash and Maglev use the thread aware load balancer.
31
   *                     The hash is pre-computed prior to getting to the real load balancer for
32
   *                     use in priority selection. In the future we likely we will want to pass
33
   *                     through the full load balancer context in case a future implementation
34
   *                     wants to use it.
35
   */
36
  class HashingLoadBalancer {
37
  public:
38
691
    virtual ~HashingLoadBalancer() = default;
39
    virtual HostSelectionResponse chooseHost(uint64_t hash, uint32_t attempt) const PURE;
40
5006
    const absl::string_view hashKey(HostConstSharedPtr host, bool use_hostname) const {
41
5006
      const Protobuf::Value& val = Config::Metadata::metadataValue(
42
5006
          host->metadata().get(), Config::MetadataFilters::get().ENVOY_LB,
43
5006
          Config::MetadataEnvoyLbKeys::get().HASH_KEY);
44
5006
      if (val.kind_case() != val.kStringValue && val.kind_case() != val.KIND_NOT_SET) {
45
1
        FINE_GRAIN_LOG(debug, "hash_key must be string type, got: {}",
46
1
                       static_cast<int>(val.kind_case()));
47
1
      }
48
5006
      absl::string_view hash_key = val.string_value();
49
5006
      if (hash_key.empty()) {
50
4945
        hash_key = use_hostname ? host->hostname() : host->address()->asString();
51
4945
      }
52
5006
      return hash_key;
53
5006
    }
54
  };
55
  using HashingLoadBalancerSharedPtr = std::shared_ptr<HashingLoadBalancer>;
56

            
57
  /**
58
   * Class for consistent hashing load balancer (CH-LB) with bounded loads.
59
   * It is common to both RingHash and Maglev load balancers, because the logic of selecting the
60
   * next host when one is overloaded is independent of the CH-LB type.
61
   */
62
  class BoundedLoadHashingLoadBalancer : public HashingLoadBalancer {
63
  public:
64
    BoundedLoadHashingLoadBalancer(HashingLoadBalancerSharedPtr hashing_lb_ptr,
65
                                   NormalizedHostWeightVector normalized_host_weights,
66
                                   uint32_t hash_balance_factor)
67
17
        : normalized_host_weights_map_(initNormalizedHostWeightMap(normalized_host_weights)),
68
17
          hashing_lb_ptr_(std::move(hashing_lb_ptr)),
69
17
          normalized_host_weights_(std::move(normalized_host_weights)),
70
17
          hash_balance_factor_(hash_balance_factor) {
71
17
      ASSERT(hashing_lb_ptr_ != nullptr);
72
17
      ASSERT(hash_balance_factor > 0);
73
17
    }
74
    HostSelectionResponse chooseHost(uint64_t hash, uint32_t attempt) const override;
75

            
76
  protected:
77
    virtual double hostOverloadFactor(const Host& host, double weight) const;
78
    const NormalizedHostWeightMap normalized_host_weights_map_;
79

            
80
  private:
81
    const NormalizedHostWeightMap
82
17
    initNormalizedHostWeightMap(const NormalizedHostWeightVector& normalized_host_weights) {
83
17
      NormalizedHostWeightMap normalized_host_weights_map;
84
64
      for (auto const& item : normalized_host_weights) {
85
64
        normalized_host_weights_map[item.first] = item.second;
86
64
      }
87
17
      return normalized_host_weights_map;
88
17
    }
89
    const HashingLoadBalancerSharedPtr hashing_lb_ptr_;
90
    const NormalizedHostWeightVector normalized_host_weights_;
91
    const uint32_t hash_balance_factor_;
92
  };
93
  // Upstream::ThreadAwareLoadBalancer
94
610
  LoadBalancerFactorySharedPtr factory() override { return factory_; }
95
  absl::Status initialize() override;
96

            
97
  // Upstream::LoadBalancer
98
  HostSelectionResponse chooseHost(LoadBalancerContext*) override { return {nullptr}; }
99
  // Preconnect not implemented for hash based load balancing
100
2
  HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
101
  // Pool selection not implemented.
102
  absl::optional<Upstream::SelectedPoolAndConnection>
103
  selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
104
                           const Upstream::Host& /*host*/,
105
2
                           std::vector<uint8_t>& /*hash_key*/) override {
106
2
    return absl::nullopt;
107
2
  }
108
  // Lifetime tracking not implemented.
109
2
  OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
110
2
    return {};
111
2
  }
112

            
113
protected:
114
  ThreadAwareLoadBalancerBase(const PrioritySet& priority_set, ClusterLbStats& stats,
115
                              Runtime::Loader& runtime, Random::RandomGenerator& random,
116
                              uint32_t healthy_panic_threshold, bool locality_weighted_balancing,
117
                              HashPolicySharedPtr hash_policy)
118
557
      : LoadBalancerBase(priority_set, stats, runtime, random, healthy_panic_threshold),
119
557
        factory_(new LoadBalancerFactoryImpl(stats, random, std::move(hash_policy))),
120
557
        locality_weighted_balancing_(locality_weighted_balancing) {}
121

            
122
private:
123
  struct PerPriorityState {
124
    std::shared_ptr<HashingLoadBalancer> current_lb_;
125
    bool global_panic_{};
126
  };
127
  using PerPriorityStatePtr = std::unique_ptr<PerPriorityState>;
128

            
129
  struct LoadBalancerImpl : public LoadBalancer {
130
    LoadBalancerImpl(ClusterLbStats& stats, Random::RandomGenerator& random,
131
                     HashPolicySharedPtr hash_policy)
132
1507
        : stats_(stats), random_(random), hash_policy_(std::move(hash_policy)) {}
133

            
134
    // Upstream::LoadBalancer
135
    HostSelectionResponse chooseHost(LoadBalancerContext* context) override;
136
    // Preconnect not implemented for hash based load balancing
137
2
    HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
138
    absl::optional<Upstream::SelectedPoolAndConnection>
139
    selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
140
                             const Upstream::Host& /*host*/,
141
2
                             std::vector<uint8_t>& /*hash_key*/) override {
142
2
      return absl::nullopt;
143
2
    }
144
2
    OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
145
2
      return {};
146
2
    }
147

            
148
    ClusterLbStats& stats_;
149
    Random::RandomGenerator& random_;
150
    HashPolicySharedPtr hash_policy_;
151

            
152
    std::shared_ptr<std::vector<PerPriorityStatePtr>> per_priority_state_;
153
    std::shared_ptr<HealthyLoad> healthy_per_priority_load_;
154
    std::shared_ptr<DegradedLoad> degraded_per_priority_load_;
155
  };
156

            
157
  struct LoadBalancerFactoryImpl : public LoadBalancerFactory {
158
    LoadBalancerFactoryImpl(ClusterLbStats& stats, Random::RandomGenerator& random,
159
                            std::shared_ptr<Http::HashPolicy> hash_policy)
160
557
        : stats_(stats), random_(random), hash_policy_(std::move(hash_policy)) {}
161

            
162
    // Upstream::LoadBalancerFactory
163
    // Ignore the params for the thread-aware LB.
164
    LoadBalancerPtr create(LoadBalancerParams) override;
165

            
166
    ClusterLbStats& stats_;
167
    Random::RandomGenerator& random_;
168
    std::shared_ptr<Http::HashPolicy> hash_policy_;
169
    absl::Mutex mutex_;
170
    std::shared_ptr<std::vector<PerPriorityStatePtr>> per_priority_state_ ABSL_GUARDED_BY(mutex_);
171
    // This is split out of PerPriorityState so LoadBalancerBase::ChoosePriority can be reused.
172
    std::shared_ptr<HealthyLoad> healthy_per_priority_load_ ABSL_GUARDED_BY(mutex_);
173
    std::shared_ptr<DegradedLoad> degraded_per_priority_load_ ABSL_GUARDED_BY(mutex_);
174
  };
175

            
176
  virtual HashingLoadBalancerSharedPtr
177
  createLoadBalancer(const NormalizedHostWeightVector& normalized_host_weights,
178
                     double min_normalized_weight, double max_normalized_weight) PURE;
179
  void refresh();
180

            
181
  std::shared_ptr<LoadBalancerFactoryImpl> factory_;
182
  const bool locality_weighted_balancing_{};
183
  Common::CallbackHandlePtr priority_update_cb_;
184
  Common::CallbackHandlePtr member_update_cb_;
185
};
186

            
187
class TypedHashLbConfigBase : public LoadBalancerConfig {
188
public:
189
501
  TypedHashLbConfigBase() = default;
190
  TypedHashLbConfigBase(absl::Span<const HashPolicyProto* const> hash_policy,
191
                        Regex::Engine& regex_engine, absl::Status& creation_status);
192

            
193
  absl::Status validateEndpoints(const PriorityState& priorities) const override;
194

            
195
  HashPolicySharedPtr hash_policy_;
196
};
197

            
198
} // namespace Upstream
199
} // namespace Envoy