Line data Source code
1 : #pragma once 2 : 3 : #include <bitset> 4 : 5 : #include "envoy/common/callback.h" 6 : #include "envoy/config/cluster/v3/cluster.pb.h" 7 : 8 : #include "source/common/common/logger.h" 9 : #include "source/common/config/metadata.h" 10 : #include "source/common/config/well_known_names.h" 11 : #include "source/common/upstream/load_balancer_impl.h" 12 : 13 : #include "absl/strings/string_view.h" 14 : #include "absl/synchronization/mutex.h" 15 : 16 : namespace Envoy { 17 : namespace Upstream { 18 : 19 : using NormalizedHostWeightVector = std::vector<std::pair<HostConstSharedPtr, double>>; 20 : using NormalizedHostWeightMap = std::map<HostConstSharedPtr, double>; 21 : 22 : class ThreadAwareLoadBalancerBase : public LoadBalancerBase, public ThreadAwareLoadBalancer { 23 : public: 24 : /** 25 : * Base class for a hashing load balancer implemented for use in a thread aware load balancer. 26 : * TODO(mattklein123): Currently only RingHash and Maglev use the thread aware load balancer. 27 : * The hash is pre-computed prior to getting to the real load balancer for 28 : * use in priority selection. In the future we likely we will want to pass 29 : * through the full load balancer context in case a future implementation 30 : * wants to use it. 31 : */ 32 : class HashingLoadBalancer { 33 : public: 34 0 : virtual ~HashingLoadBalancer() = default; 35 : virtual HostConstSharedPtr chooseHost(uint64_t hash, uint32_t attempt) const PURE; 36 0 : const absl::string_view hashKey(HostConstSharedPtr host, bool use_hostname) const { 37 0 : const ProtobufWkt::Value& val = Config::Metadata::metadataValue( 38 0 : host->metadata().get(), Config::MetadataFilters::get().ENVOY_LB, 39 0 : Config::MetadataEnvoyLbKeys::get().HASH_KEY); 40 0 : if (val.kind_case() != val.kStringValue && val.kind_case() != val.KIND_NOT_SET) { 41 0 : FINE_GRAIN_LOG(debug, "hash_key must be string type, got: {}", val.kind_case()); 42 0 : } 43 0 : absl::string_view hash_key = val.string_value(); 44 0 : if (hash_key.empty()) { 45 0 : hash_key = use_hostname ? host->hostname() : host->address()->asString(); 46 0 : } 47 0 : return hash_key; 48 0 : } 49 : }; 50 : using HashingLoadBalancerSharedPtr = std::shared_ptr<HashingLoadBalancer>; 51 : 52 : /** 53 : * Class for consistent hashing load balancer (CH-LB) with bounded loads. 54 : * It is common to both RingHash and Maglev load balancers, because the logic of selecting the 55 : * next host when one is overloaded is independent of the CH-LB type. 56 : */ 57 : class BoundedLoadHashingLoadBalancer : public HashingLoadBalancer { 58 : public: 59 : BoundedLoadHashingLoadBalancer(HashingLoadBalancerSharedPtr hashing_lb_ptr, 60 : NormalizedHostWeightVector normalized_host_weights, 61 : uint32_t hash_balance_factor) 62 : : normalized_host_weights_map_(initNormalizedHostWeightMap(normalized_host_weights)), 63 : hashing_lb_ptr_(std::move(hashing_lb_ptr)), 64 : normalized_host_weights_(std::move(normalized_host_weights)), 65 0 : hash_balance_factor_(hash_balance_factor) { 66 0 : ASSERT(hashing_lb_ptr_ != nullptr); 67 0 : ASSERT(hash_balance_factor > 0); 68 0 : } 69 : HostConstSharedPtr chooseHost(uint64_t hash, uint32_t attempt) const override; 70 : 71 : protected: 72 : virtual double hostOverloadFactor(const Host& host, double weight) const; 73 : const NormalizedHostWeightMap normalized_host_weights_map_; 74 : 75 : private: 76 : const NormalizedHostWeightMap 77 0 : initNormalizedHostWeightMap(const NormalizedHostWeightVector& normalized_host_weights) { 78 0 : NormalizedHostWeightMap normalized_host_weights_map; 79 0 : for (auto const& item : normalized_host_weights) { 80 0 : normalized_host_weights_map[item.first] = item.second; 81 0 : } 82 0 : return normalized_host_weights_map; 83 0 : } 84 : const HashingLoadBalancerSharedPtr hashing_lb_ptr_; 85 : const NormalizedHostWeightVector normalized_host_weights_; 86 : const uint32_t hash_balance_factor_; 87 : }; 88 : // Upstream::ThreadAwareLoadBalancer 89 0 : LoadBalancerFactorySharedPtr factory() override { return factory_; } 90 : void initialize() override; 91 : 92 : // Upstream::LoadBalancer 93 0 : HostConstSharedPtr chooseHost(LoadBalancerContext*) override { return nullptr; } 94 : // Preconnect not implemented for hash based load balancing 95 0 : HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } 96 : // Pool selection not implemented. 97 : absl::optional<Upstream::SelectedPoolAndConnection> 98 : selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, 99 : const Upstream::Host& /*host*/, 100 0 : std::vector<uint8_t>& /*hash_key*/) override { 101 0 : return absl::nullopt; 102 0 : } 103 : // Lifetime tracking not implemented. 104 0 : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override { 105 0 : return {}; 106 0 : } 107 : 108 : protected: 109 : ThreadAwareLoadBalancerBase(const PrioritySet& priority_set, ClusterLbStats& stats, 110 : Runtime::Loader& runtime, Random::RandomGenerator& random, 111 : uint32_t healthy_panic_threshold, bool locality_weighted_balancing) 112 : : LoadBalancerBase(priority_set, stats, runtime, random, healthy_panic_threshold), 113 : factory_(new LoadBalancerFactoryImpl(stats, random)), 114 0 : locality_weighted_balancing_(locality_weighted_balancing) {} 115 : 116 : private: 117 : struct PerPriorityState { 118 : std::shared_ptr<HashingLoadBalancer> current_lb_; 119 : bool global_panic_{}; 120 : }; 121 : using PerPriorityStatePtr = std::unique_ptr<PerPriorityState>; 122 : 123 : struct LoadBalancerImpl : public LoadBalancer { 124 : LoadBalancerImpl(ClusterLbStats& stats, Random::RandomGenerator& random) 125 0 : : stats_(stats), random_(random) {} 126 : 127 : // Upstream::LoadBalancer 128 : HostConstSharedPtr chooseHost(LoadBalancerContext* context) override; 129 : // Preconnect not implemented for hash based load balancing 130 0 : HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; } 131 : absl::optional<Upstream::SelectedPoolAndConnection> 132 : selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, 133 : const Upstream::Host& /*host*/, 134 0 : std::vector<uint8_t>& /*hash_key*/) override { 135 0 : return absl::nullopt; 136 0 : } 137 0 : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override { 138 0 : return {}; 139 0 : } 140 : 141 : ClusterLbStats& stats_; 142 : Random::RandomGenerator& random_; 143 : std::shared_ptr<std::vector<PerPriorityStatePtr>> per_priority_state_; 144 : std::shared_ptr<HealthyLoad> healthy_per_priority_load_; 145 : std::shared_ptr<DegradedLoad> degraded_per_priority_load_; 146 : }; 147 : 148 : struct LoadBalancerFactoryImpl : public LoadBalancerFactory { 149 : LoadBalancerFactoryImpl(ClusterLbStats& stats, Random::RandomGenerator& random) 150 0 : : stats_(stats), random_(random) {} 151 : 152 : // Upstream::LoadBalancerFactory 153 : // Ignore the params for the thread-aware LB. 154 : LoadBalancerPtr create(LoadBalancerParams) override; 155 : 156 : ClusterLbStats& stats_; 157 : Random::RandomGenerator& random_; 158 : absl::Mutex mutex_; 159 : std::shared_ptr<std::vector<PerPriorityStatePtr>> per_priority_state_ ABSL_GUARDED_BY(mutex_); 160 : // This is split out of PerPriorityState so LoadBalancerBase::ChoosePriority can be reused. 161 : std::shared_ptr<HealthyLoad> healthy_per_priority_load_ ABSL_GUARDED_BY(mutex_); 162 : std::shared_ptr<DegradedLoad> degraded_per_priority_load_ ABSL_GUARDED_BY(mutex_); 163 : }; 164 : 165 : virtual HashingLoadBalancerSharedPtr 166 : createLoadBalancer(const NormalizedHostWeightVector& normalized_host_weights, 167 : double min_normalized_weight, double max_normalized_weight) PURE; 168 : void refresh(); 169 : 170 : std::shared_ptr<LoadBalancerFactoryImpl> factory_; 171 : const bool locality_weighted_balancing_{}; 172 : Common::CallbackHandlePtr priority_update_cb_; 173 : }; 174 : 175 : } // namespace Upstream 176 : } // namespace Envoy