/proc/self/cwd/source/extensions/clusters/aggregate/cluster.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include "envoy/common/callback.h" |
4 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
5 | | #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" |
6 | | #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h" |
7 | | #include "envoy/stream_info/stream_info.h" |
8 | | #include "envoy/thread_local/thread_local_object.h" |
9 | | #include "envoy/upstream/thread_local_cluster.h" |
10 | | |
11 | | #include "source/common/common/logger.h" |
12 | | #include "source/common/upstream/cluster_factory_impl.h" |
13 | | #include "source/common/upstream/upstream_impl.h" |
14 | | #include "source/extensions/clusters/aggregate/lb_context.h" |
15 | | |
16 | | namespace Envoy { |
17 | | namespace Extensions { |
18 | | namespace Clusters { |
19 | | namespace Aggregate { |
20 | | |
21 | | using PriorityToClusterVector = std::vector<std::pair<uint32_t, Upstream::ThreadLocalCluster*>>; |
22 | | |
23 | | // Maps pair(host_cluster_name, host_priority) to the linearized priority of the Aggregate cluster. |
24 | | using ClusterAndPriorityToLinearizedPriorityMap = |
25 | | absl::flat_hash_map<std::pair<std::string, uint32_t>, uint32_t>; |
26 | | |
27 | | struct PriorityContext { |
28 | | Upstream::PrioritySetImpl priority_set_; |
29 | | PriorityToClusterVector priority_to_cluster_; |
30 | | ClusterAndPriorityToLinearizedPriorityMap cluster_and_priority_to_linearized_priority_; |
31 | | }; |
32 | | |
33 | | using PriorityContextPtr = std::unique_ptr<PriorityContext>; |
34 | | |
35 | | // Order matters so a vector must be used for rebuilds. If the vector size becomes larger we can |
36 | | // maintain a parallel set for lookups during cluster update callbacks. |
37 | | using ClusterSet = std::vector<std::string>; |
38 | | using ClusterSetConstSharedPtr = std::shared_ptr<const ClusterSet>; |
39 | | |
40 | | class Cluster : public Upstream::ClusterImplBase { |
41 | | public: |
42 | | Cluster(const envoy::config::cluster::v3::Cluster& cluster, |
43 | | const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config, |
44 | | Upstream::ClusterFactoryContext& context); |
45 | | |
46 | | // Upstream::Cluster |
47 | 0 | Upstream::Cluster::InitializePhase initializePhase() const override { |
48 | 0 | return Upstream::Cluster::InitializePhase::Secondary; |
49 | 0 | } |
50 | | |
51 | | Upstream::ClusterManager& cluster_manager_; |
52 | | Runtime::Loader& runtime_; |
53 | | Random::RandomGenerator& random_; |
54 | | const ClusterSetConstSharedPtr clusters_; |
55 | | |
56 | | private: |
57 | | // Upstream::ClusterImplBase |
58 | 0 | void startPreInit() override { onPreInitComplete(); } |
59 | | }; |
60 | | |
61 | | // Load balancer used by each worker thread. It will be refreshed when clusters, hosts or priorities |
62 | | // are updated. |
63 | | class AggregateClusterLoadBalancer : public Upstream::LoadBalancer, |
64 | | Upstream::ClusterUpdateCallbacks, |
65 | | Logger::Loggable<Logger::Id::upstream> { |
66 | | public: |
67 | | AggregateClusterLoadBalancer(const Upstream::ClusterInfoConstSharedPtr& parent_info, |
68 | | Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime, |
69 | | Random::RandomGenerator& random, |
70 | | const ClusterSetConstSharedPtr& clusters); |
71 | | |
72 | | // Upstream::ClusterUpdateCallbacks |
73 | | void onClusterAddOrUpdate(absl::string_view cluster_name, |
74 | | Upstream::ThreadLocalClusterCommand& get_cluster) override; |
75 | | void onClusterRemoval(const std::string& cluster_name) override; |
76 | | |
77 | | // Upstream::LoadBalancer |
78 | | Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; |
79 | | Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override; |
80 | | absl::optional<Upstream::SelectedPoolAndConnection> |
81 | | selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, |
82 | | const Upstream::Host& /*host*/, |
83 | | std::vector<uint8_t>& /*hash_key*/) override; |
84 | | OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override; |
85 | | |
86 | | private: |
87 | | // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the |
88 | | // priority set could be empty, we cannot initialize LoadBalancerBase when priority set is empty. |
89 | | class LoadBalancerImpl : public Upstream::LoadBalancerBase { |
90 | | public: |
91 | | LoadBalancerImpl(const PriorityContext& priority_context, Upstream::ClusterLbStats& lb_stats, |
92 | | Runtime::Loader& runtime, Random::RandomGenerator& random, |
93 | | const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) |
94 | | : Upstream::LoadBalancerBase(priority_context.priority_set_, lb_stats, runtime, random, |
95 | | PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT( |
96 | | common_config, healthy_panic_threshold, 100, 50)), |
97 | 0 | priority_context_(priority_context) {} |
98 | | |
99 | | // Upstream::LoadBalancer |
100 | | Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; |
101 | | // Preconnecting not yet implemented for extensions. |
102 | 0 | Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { |
103 | 0 | return nullptr; |
104 | 0 | } |
105 | | absl::optional<Upstream::SelectedPoolAndConnection> |
106 | | selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, |
107 | | const Upstream::Host& /*host*/, |
108 | 0 | std::vector<uint8_t>& /*hash_key*/) override { |
109 | 0 | return {}; |
110 | 0 | } |
111 | 0 | OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override { |
112 | 0 | return {}; |
113 | 0 | } |
114 | | |
115 | | absl::optional<uint32_t> hostToLinearizedPriority(const Upstream::HostDescription& host) const; |
116 | | |
117 | | private: |
118 | | const PriorityContext& priority_context_; |
119 | | }; |
120 | | |
121 | | using LoadBalancerImplPtr = std::unique_ptr<LoadBalancerImpl>; |
122 | | |
123 | | void addMemberUpdateCallbackForCluster(Upstream::ThreadLocalCluster& thread_local_cluster); |
124 | | PriorityContextPtr linearizePrioritySet(OptRef<const std::string> excluded_cluster); |
125 | | void refresh(OptRef<const std::string> excluded_cluster = OptRef<const std::string>()); |
126 | | |
127 | | LoadBalancerImplPtr load_balancer_; |
128 | | Upstream::ClusterInfoConstSharedPtr parent_info_; |
129 | | Upstream::ClusterManager& cluster_manager_; |
130 | | Runtime::Loader& runtime_; |
131 | | Random::RandomGenerator& random_; |
132 | | PriorityContextPtr priority_context_; |
133 | | const ClusterSetConstSharedPtr clusters_; |
134 | | Upstream::ClusterUpdateCallbacksHandlePtr handle_; |
135 | | absl::flat_hash_map<std::string, Envoy::Common::CallbackHandlePtr> member_update_cbs_; |
136 | | }; |
137 | | |
138 | | // Load balancer factory created by the main thread and will be called in each worker thread to |
139 | | // create the thread local load balancer. |
140 | | struct AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory { |
141 | 0 | AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {} |
142 | | // Upstream::LoadBalancerFactory |
143 | 0 | Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override { |
144 | 0 | return std::make_unique<AggregateClusterLoadBalancer>( |
145 | 0 | cluster_.info(), cluster_.cluster_manager_, cluster_.runtime_, cluster_.random_, |
146 | 0 | cluster_.clusters_); |
147 | 0 | } |
148 | | |
149 | | const Cluster& cluster_; |
150 | | }; |
151 | | |
152 | | // Thread aware load balancer created by the main thread. |
153 | | struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer { |
154 | | AggregateThreadAwareLoadBalancer(const Cluster& cluster) |
155 | 0 | : factory_(std::make_shared<AggregateLoadBalancerFactory>(cluster)) {} |
156 | | |
157 | | // Upstream::ThreadAwareLoadBalancer |
158 | 0 | Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; } |
159 | 0 | void initialize() override {} |
160 | | |
161 | | std::shared_ptr<AggregateLoadBalancerFactory> factory_; |
162 | | }; |
163 | | |
164 | | class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase< |
165 | | envoy::extensions::clusters::aggregate::v3::ClusterConfig> { |
166 | | public: |
167 | 4 | ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.aggregate") {} |
168 | | |
169 | | private: |
170 | | absl::StatusOr< |
171 | | std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>> |
172 | | createClusterWithConfig( |
173 | | const envoy::config::cluster::v3::Cluster& cluster, |
174 | | const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config, |
175 | | Upstream::ClusterFactoryContext& context) override; |
176 | | }; |
177 | | |
178 | | DECLARE_FACTORY(ClusterFactory); |
179 | | |
180 | | } // namespace Aggregate |
181 | | } // namespace Clusters |
182 | | } // namespace Extensions |
183 | | } // namespace Envoy |