/proc/self/cwd/source/extensions/clusters/aggregate/cluster.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include "envoy/common/callback.h" |
4 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
5 | | #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" |
6 | | #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h" |
7 | | #include "envoy/stream_info/stream_info.h" |
8 | | #include "envoy/thread_local/thread_local_object.h" |
9 | | #include "envoy/upstream/thread_local_cluster.h" |
10 | | |
11 | | #include "source/common/common/logger.h" |
12 | | #include "source/common/upstream/cluster_factory_impl.h" |
13 | | #include "source/common/upstream/upstream_impl.h" |
14 | | #include "source/extensions/clusters/aggregate/lb_context.h" |
15 | | |
16 | | namespace Envoy { |
17 | | namespace Extensions { |
18 | | namespace Clusters { |
19 | | namespace Aggregate { |
20 | | |
21 | | using PriorityToClusterVector = std::vector<std::pair<uint32_t, Upstream::ThreadLocalCluster*>>; |
22 | | |
23 | | // Maps pair(host_cluster_name, host_priority) to the linearized priority of the Aggregate cluster. |
24 | | using ClusterAndPriorityToLinearizedPriorityMap = |
25 | | absl::flat_hash_map<std::pair<std::string, uint32_t>, uint32_t>; |
26 | | |
27 | | struct PriorityContext { |
28 | | Upstream::PrioritySetImpl priority_set_; |
29 | | PriorityToClusterVector priority_to_cluster_; |
30 | | ClusterAndPriorityToLinearizedPriorityMap cluster_and_priority_to_linearized_priority_; |
31 | | }; |
32 | | |
33 | | using PriorityContextPtr = std::unique_ptr<PriorityContext>; |
34 | | |
35 | | // Order matters so a vector must be used for rebuilds. If the vector size becomes larger we can |
36 | | // maintain a parallel set for lookups during cluster update callbacks. |
37 | | using ClusterSet = std::vector<std::string>; |
38 | | using ClusterSetConstSharedPtr = std::shared_ptr<const ClusterSet>; |
39 | | |
40 | | class Cluster : public Upstream::ClusterImplBase { |
41 | | public: |
42 | | // Upstream::Cluster |
43 | 0 | Upstream::Cluster::InitializePhase initializePhase() const override { |
44 | 0 | return Upstream::Cluster::InitializePhase::Secondary; |
45 | 0 | } |
46 | | |
47 | | Upstream::ClusterManager& cluster_manager_; |
48 | | Runtime::Loader& runtime_; |
49 | | Random::RandomGenerator& random_; |
50 | | const ClusterSetConstSharedPtr clusters_; |
51 | | |
52 | | protected: |
53 | | Cluster(const envoy::config::cluster::v3::Cluster& cluster, |
54 | | const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config, |
55 | | Upstream::ClusterFactoryContext& context, absl::Status& creation_status); |
56 | | |
57 | | private: |
58 | | friend class ClusterFactory; |
59 | | friend class AggregateClusterTest; |
60 | | |
61 | | // Upstream::ClusterImplBase |
62 | 0 | void startPreInit() override { onPreInitComplete(); } |
63 | | }; |
64 | | |
65 | | // Load balancer used by each worker thread. It will be refreshed when clusters, hosts or priorities |
66 | | // are updated. |
67 | | class AggregateClusterLoadBalancer : public Upstream::LoadBalancer, |
68 | | Upstream::ClusterUpdateCallbacks, |
69 | | Logger::Loggable<Logger::Id::upstream> { |
70 | | public: |
71 | | friend class AggregateLoadBalancerFactory; |
72 | | AggregateClusterLoadBalancer(const Upstream::ClusterInfoConstSharedPtr& parent_info, |
73 | | Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime, |
74 | | Random::RandomGenerator& random, |
75 | | const ClusterSetConstSharedPtr& clusters); |
76 | | |
77 | | // Upstream::ClusterUpdateCallbacks |
78 | | void onClusterAddOrUpdate(absl::string_view cluster_name, |
79 | | Upstream::ThreadLocalClusterCommand& get_cluster) override; |
80 | | void onClusterRemoval(const std::string& cluster_name) override; |
81 | | |
82 | | // Upstream::LoadBalancer |
83 | | Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; |
84 | | Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override; |
85 | | absl::optional<Upstream::SelectedPoolAndConnection> |
86 | | selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, |
87 | | const Upstream::Host& /*host*/, |
88 | | std::vector<uint8_t>& /*hash_key*/) override; |
89 | | OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override; |
90 | | |
91 | | private: |
92 | | // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the |
93 | | // priority set could be empty, we cannot initialize LoadBalancerBase when priority set is empty. |
94 | | class LoadBalancerImpl : public Upstream::LoadBalancerBase { |
95 | | public: |
96 | | LoadBalancerImpl(const PriorityContext& priority_context, Upstream::ClusterLbStats& lb_stats, |
97 | | Runtime::Loader& runtime, Random::RandomGenerator& random, |
98 | | const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config) |
99 | | : Upstream::LoadBalancerBase(priority_context.priority_set_, lb_stats, runtime, random, |
100 | | PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT( |
101 | | common_config, healthy_panic_threshold, 100, 50)), |
102 | 0 | priority_context_(priority_context) {} |
103 | | |
104 | | // Upstream::LoadBalancer |
105 | | Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; |
106 | | // Preconnecting not yet implemented for extensions. |
107 | 0 | Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { |
108 | 0 | return nullptr; |
109 | 0 | } |
110 | | absl::optional<Upstream::SelectedPoolAndConnection> |
111 | | selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, |
112 | | const Upstream::Host& /*host*/, |
113 | 0 | std::vector<uint8_t>& /*hash_key*/) override { |
114 | 0 | return {}; |
115 | 0 | } |
116 | 0 | OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override { |
117 | 0 | return {}; |
118 | 0 | } |
119 | | |
120 | | absl::optional<uint32_t> hostToLinearizedPriority(const Upstream::HostDescription& host) const; |
121 | | |
122 | | private: |
123 | | const PriorityContext& priority_context_; |
124 | | }; |
125 | | |
126 | | using LoadBalancerImplPtr = std::unique_ptr<LoadBalancerImpl>; |
127 | | |
128 | | void addMemberUpdateCallbackForCluster(Upstream::ThreadLocalCluster& thread_local_cluster); |
129 | | PriorityContextPtr linearizePrioritySet(OptRef<const std::string> excluded_cluster); |
130 | | void refresh(OptRef<const std::string> excluded_cluster = OptRef<const std::string>()); |
131 | | |
132 | | LoadBalancerImplPtr load_balancer_; |
133 | | Upstream::ClusterInfoConstSharedPtr parent_info_; |
134 | | Upstream::ClusterManager& cluster_manager_; |
135 | | Runtime::Loader& runtime_; |
136 | | Random::RandomGenerator& random_; |
137 | | PriorityContextPtr priority_context_; |
138 | | const ClusterSetConstSharedPtr clusters_; |
139 | | Upstream::ClusterUpdateCallbacksHandlePtr handle_; |
140 | | absl::flat_hash_map<std::string, Envoy::Common::CallbackHandlePtr> member_update_cbs_; |
141 | | }; |
142 | | |
143 | | // Load balancer factory created by the main thread and will be called in each worker thread to |
144 | | // create the thread local load balancer. |
145 | | class AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory { |
146 | | public: |
147 | 0 | AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {} |
148 | | // Upstream::LoadBalancerFactory |
149 | 0 | Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override { |
150 | 0 | return std::make_unique<AggregateClusterLoadBalancer>( |
151 | 0 | cluster_.info(), cluster_.cluster_manager_, cluster_.runtime_, cluster_.random_, |
152 | 0 | cluster_.clusters_); |
153 | 0 | } |
154 | | |
155 | | const Cluster& cluster_; |
156 | | }; |
157 | | |
158 | | // Thread aware load balancer created by the main thread. |
159 | | struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer { |
160 | | AggregateThreadAwareLoadBalancer(const Cluster& cluster) |
161 | 0 | : factory_(std::make_shared<AggregateLoadBalancerFactory>(cluster)) {} |
162 | | |
163 | | // Upstream::ThreadAwareLoadBalancer |
164 | 0 | Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; } |
165 | 0 | absl::Status initialize() override { return absl::OkStatus(); } |
166 | | |
167 | | std::shared_ptr<AggregateLoadBalancerFactory> factory_; |
168 | | }; |
169 | | |
170 | | class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase< |
171 | | envoy::extensions::clusters::aggregate::v3::ClusterConfig> { |
172 | | public: |
173 | 4 | ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.aggregate") {} |
174 | | |
175 | | private: |
176 | | absl::StatusOr< |
177 | | std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>> |
178 | | createClusterWithConfig( |
179 | | const envoy::config::cluster::v3::Cluster& cluster, |
180 | | const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config, |
181 | | Upstream::ClusterFactoryContext& context) override; |
182 | | }; |
183 | | |
184 | | DECLARE_FACTORY(ClusterFactory); |
185 | | |
186 | | } // namespace Aggregate |
187 | | } // namespace Clusters |
188 | | } // namespace Extensions |
189 | | } // namespace Envoy |