Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/common/callback.h" 4 : #include "envoy/config/cluster/v3/cluster.pb.h" 5 : #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" 6 : #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h" 7 : #include "envoy/stream_info/stream_info.h" 8 : #include "envoy/thread_local/thread_local_object.h" 9 : #include "envoy/upstream/thread_local_cluster.h" 10 : 11 : #include "source/common/common/logger.h" 12 : #include "source/common/upstream/cluster_factory_impl.h" 13 : #include "source/common/upstream/upstream_impl.h" 14 : #include "source/extensions/clusters/aggregate/lb_context.h" 15 : 16 : namespace Envoy { 17 : namespace Extensions { 18 : namespace Clusters { 19 : namespace Aggregate { 20 : 21 : using PriorityToClusterVector = std::vector<std::pair<uint32_t, Upstream::ThreadLocalCluster*>>; 22 : 23 : // Maps pair(host_cluster_name, host_priority) to the linearized priority of the Aggregate cluster. 24 : using ClusterAndPriorityToLinearizedPriorityMap = 25 : absl::flat_hash_map<std::pair<std::string, uint32_t>, uint32_t>; 26 : 27 : struct PriorityContext { 28 : Upstream::PrioritySetImpl priority_set_; 29 : PriorityToClusterVector priority_to_cluster_; 30 : ClusterAndPriorityToLinearizedPriorityMap cluster_and_priority_to_linearized_priority_; 31 : }; 32 : 33 : using PriorityContextPtr = std::unique_ptr<PriorityContext>; 34 : 35 : // Order matters so a vector must be used for rebuilds. If the vector size becomes larger we can 36 : // maintain a parallel set for lookups during cluster update callbacks. 37 : using ClusterSet = std::vector<std::string>; 38 : using ClusterSetConstSharedPtr = std::shared_ptr<const ClusterSet>; 39 : 40 : class Cluster : public Upstream::ClusterImplBase { 41 : public: 42 : Cluster(const envoy::config::cluster::v3::Cluster& cluster, 43 : const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config, 44 : Upstream::ClusterFactoryContext& context); 45 : 46 : // Upstream::Cluster 47 0 : Upstream::Cluster::InitializePhase initializePhase() const override { 48 0 : return Upstream::Cluster::InitializePhase::Secondary; 49 0 : } 50 : 51 : Upstream::ClusterManager& cluster_manager_; 52 : Runtime::Loader& runtime_; 53 : Random::RandomGenerator& random_; 54 : const ClusterSetConstSharedPtr clusters_; 55 : 56 : private: 57 : // Upstream::ClusterImplBase 58 0 : void startPreInit() override { onPreInitComplete(); } 59 : }; 60 : 61 : // Load balancer used by each worker thread. It will be refreshed when clusters, hosts or priorities 62 : // are updated. 63 : class AggregateClusterLoadBalancer : public Upstream::LoadBalancer, 64 : Upstream::ClusterUpdateCallbacks, 65 : Logger::Loggable<Logger::Id::upstream> { 66 : public: 67 : AggregateClusterLoadBalancer(const Upstream::ClusterInfoConstSharedPtr& parent_info, 68 : Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime, 69 : Random::RandomGenerator& random, 70 : const ClusterSetConstSharedPtr& clusters); 71 : 72 : // Upstream::ClusterUpdateCallbacks 73 : void onClusterAddOrUpdate(absl::string_view cluster_name, 74 : Upstream::ThreadLocalClusterCommand& get_cluster) override; 75 : void onClusterRemoval(const std::string& cluster_name) override; 76 : 77 : // Upstream::LoadBalancer 78 : Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; 79 : Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override; 80 : absl::optional<Upstream::SelectedPoolAndConnection> 81 : selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, 82 : const Upstream::Host& /*host*/, 83 : std::vector<uint8_t>& /*hash_key*/) override; 84 : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override; 85 : 86 : private: 87 : // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the 88 : // priority set could be empty, we cannot initialize LoadBalancerBase when priority set is empty. 89 : class LoadBalancerImpl : public Upstream::LoadBalancerBase { 90 : public: 91 : LoadBalancerImpl(const PriorityContext& priority_context, Upstream::ClusterLbStats& lb_stats, 92 : Runtime::Loader& runtime, Random::RandomGenerator& random, 93 : const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) 94 : : Upstream::LoadBalancerBase(priority_context.priority_set_, lb_stats, runtime, random, 95 : PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT( 96 : common_config, healthy_panic_threshold, 100, 50)), 97 0 : priority_context_(priority_context) {} 98 : 99 : // Upstream::LoadBalancer 100 : Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; 101 : // Preconnecting not yet implemented for extensions. 102 0 : Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { 103 0 : return nullptr; 104 0 : } 105 : absl::optional<Upstream::SelectedPoolAndConnection> 106 : selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, 107 : const Upstream::Host& /*host*/, 108 0 : std::vector<uint8_t>& /*hash_key*/) override { 109 0 : return {}; 110 0 : } 111 0 : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override { 112 0 : return {}; 113 0 : } 114 : 115 : absl::optional<uint32_t> hostToLinearizedPriority(const Upstream::HostDescription& host) const; 116 : 117 : private: 118 : const PriorityContext& priority_context_; 119 : }; 120 : 121 : using LoadBalancerImplPtr = std::unique_ptr<LoadBalancerImpl>; 122 : 123 : void addMemberUpdateCallbackForCluster(Upstream::ThreadLocalCluster& thread_local_cluster); 124 : PriorityContextPtr linearizePrioritySet(OptRef<const std::string> excluded_cluster); 125 : void refresh(OptRef<const std::string> excluded_cluster = OptRef<const std::string>()); 126 : 127 : LoadBalancerImplPtr load_balancer_; 128 : Upstream::ClusterInfoConstSharedPtr parent_info_; 129 : Upstream::ClusterManager& cluster_manager_; 130 : Runtime::Loader& runtime_; 131 : Random::RandomGenerator& random_; 132 : PriorityContextPtr priority_context_; 133 : const ClusterSetConstSharedPtr clusters_; 134 : Upstream::ClusterUpdateCallbacksHandlePtr handle_; 135 : absl::flat_hash_map<std::string, Envoy::Common::CallbackHandlePtr> member_update_cbs_; 136 : }; 137 : 138 : // Load balancer factory created by the main thread and will be called in each worker thread to 139 : // create the thread local load balancer. 140 : struct AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory { 141 0 : AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {} 142 : // Upstream::LoadBalancerFactory 143 0 : Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override { 144 0 : return std::make_unique<AggregateClusterLoadBalancer>( 145 0 : cluster_.info(), cluster_.cluster_manager_, cluster_.runtime_, cluster_.random_, 146 0 : cluster_.clusters_); 147 0 : } 148 : 149 : const Cluster& cluster_; 150 : }; 151 : 152 : // Thread aware load balancer created by the main thread. 153 : struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer { 154 : AggregateThreadAwareLoadBalancer(const Cluster& cluster) 155 0 : : factory_(std::make_shared<AggregateLoadBalancerFactory>(cluster)) {} 156 : 157 : // Upstream::ThreadAwareLoadBalancer 158 0 : Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; } 159 0 : void initialize() override {} 160 : 161 : std::shared_ptr<AggregateLoadBalancerFactory> factory_; 162 : }; 163 : 164 : class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase< 165 : envoy::extensions::clusters::aggregate::v3::ClusterConfig> { 166 : public: 167 2 : ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.aggregate") {} 168 : 169 : private: 170 : absl::StatusOr< 171 : std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>> 172 : createClusterWithConfig( 173 : const envoy::config::cluster::v3::Cluster& cluster, 174 : const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config, 175 : Upstream::ClusterFactoryContext& context) override; 176 : }; 177 : 178 : DECLARE_FACTORY(ClusterFactory); 179 : 180 : } // namespace Aggregate 181 : } // namespace Clusters 182 : } // namespace Extensions 183 : } // namespace Envoy