/proc/self/cwd/source/extensions/clusters/aggregate/cluster.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/clusters/aggregate/cluster.h" |
2 | | |
3 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
4 | | #include "envoy/event/dispatcher.h" |
5 | | #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" |
6 | | #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h" |
7 | | |
8 | | #include "source/common/common/assert.h" |
9 | | |
10 | | namespace Envoy { |
11 | | namespace Extensions { |
12 | | namespace Clusters { |
13 | | namespace Aggregate { |
14 | | |
15 | | Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster, |
16 | | const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config, |
17 | | Upstream::ClusterFactoryContext& context) |
18 | | : Upstream::ClusterImplBase(cluster, context), cluster_manager_(context.clusterManager()), |
19 | | runtime_(context.serverFactoryContext().runtime()), |
20 | | random_(context.serverFactoryContext().api().randomGenerator()), |
21 | 0 | clusters_(std::make_shared<ClusterSet>(config.clusters().begin(), config.clusters().end())) {} |
22 | | |
23 | | AggregateClusterLoadBalancer::AggregateClusterLoadBalancer( |
24 | | const Upstream::ClusterInfoConstSharedPtr& parent_info, |
25 | | Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime, |
26 | | Random::RandomGenerator& random, const ClusterSetConstSharedPtr& clusters) |
27 | | : parent_info_(parent_info), cluster_manager_(cluster_manager), runtime_(runtime), |
28 | 0 | random_(random), clusters_(clusters) { |
29 | 0 | for (const auto& cluster : *clusters_) { |
30 | 0 | auto tlc = cluster_manager_.getThreadLocalCluster(cluster); |
31 | | // It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the |
32 | | // cluster could be added dynamically by xDS. |
33 | 0 | if (tlc == nullptr) { |
34 | 0 | continue; |
35 | 0 | } |
36 | | |
37 | | // Add callback for clusters initialized before aggregate cluster. |
38 | 0 | addMemberUpdateCallbackForCluster(*tlc); |
39 | 0 | } |
40 | 0 | refresh(); |
41 | 0 | handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this); |
42 | 0 | } |
43 | | |
44 | | void AggregateClusterLoadBalancer::addMemberUpdateCallbackForCluster( |
45 | 0 | Upstream::ThreadLocalCluster& thread_local_cluster) { |
46 | 0 | member_update_cbs_[thread_local_cluster.info()->name()] = |
47 | 0 | thread_local_cluster.prioritySet().addMemberUpdateCb( |
48 | 0 | [this, target_cluster_info = thread_local_cluster.info()](const Upstream::HostVector&, |
49 | 0 | const Upstream::HostVector&) { |
50 | 0 | ENVOY_LOG(debug, "member update for cluster '{}' in aggregate cluster '{}'", |
51 | 0 | target_cluster_info->name(), parent_info_->name()); |
52 | 0 | refresh(); |
53 | 0 | }); |
54 | 0 | } |
55 | | |
56 | | PriorityContextPtr |
57 | 0 | AggregateClusterLoadBalancer::linearizePrioritySet(OptRef<const std::string> excluded_cluster) { |
58 | 0 | PriorityContextPtr priority_context = std::make_unique<PriorityContext>(); |
59 | 0 | uint32_t next_priority_after_linearizing = 0; |
60 | | |
61 | | // Linearize the priority set. e.g. for clusters [C_0, C_1, C_2] referred in aggregate cluster |
62 | | // C_0 [P_0, P_1, P_2] |
63 | | // C_1 [P_0, P_1] |
64 | | // C_2 [P_0, P_1, P_2, P_3] |
65 | | // The linearization result is: |
66 | | // [C_0.P_0, C_0.P_1, C_0.P_2, C_1.P_0, C_1.P_1, C_2.P_0, C_2.P_1, C_2.P_2, C_2.P_3] |
67 | | // and the traffic will be distributed among these priorities. |
68 | 0 | for (const auto& cluster : *clusters_) { |
69 | 0 | if (excluded_cluster.has_value() && excluded_cluster.value().get() == cluster) { |
70 | 0 | continue; |
71 | 0 | } |
72 | 0 | auto tlc = cluster_manager_.getThreadLocalCluster(cluster); |
73 | | // It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the |
74 | | // cluster hasn't been added by xDS. |
75 | 0 | if (tlc == nullptr) { |
76 | 0 | ENVOY_LOG(debug, "refresh: cluster '{}' absent in aggregate cluster '{}'", cluster, |
77 | 0 | parent_info_->name()); |
78 | 0 | continue; |
79 | 0 | } else { |
80 | 0 | ENVOY_LOG(debug, "refresh: cluster '{}' found in aggregate cluster '{}'", cluster, |
81 | 0 | parent_info_->name()); |
82 | 0 | } |
83 | | |
84 | 0 | uint32_t priority_in_current_cluster = 0; |
85 | 0 | for (const auto& host_set : tlc->prioritySet().hostSetsPerPriority()) { |
86 | 0 | if (!host_set->hosts().empty()) { |
87 | 0 | priority_context->priority_set_.updateHosts( |
88 | 0 | next_priority_after_linearizing, Upstream::HostSetImpl::updateHostsParams(*host_set), |
89 | 0 | host_set->localityWeights(), host_set->hosts(), {}, host_set->weightedPriorityHealth(), |
90 | 0 | host_set->overprovisioningFactor()); |
91 | 0 | priority_context->priority_to_cluster_.emplace_back( |
92 | 0 | std::make_pair(priority_in_current_cluster, tlc)); |
93 | |
|
94 | 0 | priority_context->cluster_and_priority_to_linearized_priority_[std::make_pair( |
95 | 0 | cluster, priority_in_current_cluster)] = next_priority_after_linearizing; |
96 | 0 | next_priority_after_linearizing++; |
97 | 0 | } |
98 | 0 | priority_in_current_cluster++; |
99 | 0 | } |
100 | 0 | } |
101 | |
|
102 | 0 | return priority_context; |
103 | 0 | } |
104 | | |
105 | 0 | void AggregateClusterLoadBalancer::refresh(OptRef<const std::string> excluded_cluster) { |
106 | 0 | PriorityContextPtr priority_context = linearizePrioritySet(excluded_cluster); |
107 | 0 | if (!priority_context->priority_set_.hostSetsPerPriority().empty()) { |
108 | 0 | load_balancer_ = std::make_unique<LoadBalancerImpl>( |
109 | 0 | *priority_context, parent_info_->lbStats(), runtime_, random_, parent_info_->lbConfig()); |
110 | 0 | } else { |
111 | 0 | load_balancer_ = nullptr; |
112 | 0 | } |
113 | 0 | priority_context_ = std::move(priority_context); |
114 | 0 | } |
115 | | |
116 | | void AggregateClusterLoadBalancer::onClusterAddOrUpdate( |
117 | 0 | absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) { |
118 | 0 | if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) { |
119 | 0 | ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", cluster_name, |
120 | 0 | parent_info_->name()); |
121 | 0 | auto& cluster = get_cluster(); |
122 | 0 | refresh(); |
123 | 0 | addMemberUpdateCallbackForCluster(cluster); |
124 | 0 | } |
125 | 0 | } |
126 | | |
127 | 0 | void AggregateClusterLoadBalancer::onClusterRemoval(const std::string& cluster_name) { |
128 | | // The onClusterRemoval callback is called before the thread local cluster is removed. There |
129 | | // will be a dangling pointer to the thread local cluster if the deleted cluster is not skipped |
130 | | // when we refresh the load balancer. |
131 | 0 | if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) { |
132 | 0 | ENVOY_LOG(debug, "removing cluster '{}' from aggregate cluster '{}'", cluster_name, |
133 | 0 | parent_info_->name()); |
134 | 0 | refresh(cluster_name); |
135 | 0 | } |
136 | 0 | } |
137 | | |
138 | | absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToLinearizedPriority( |
139 | 0 | const Upstream::HostDescription& host) const { |
140 | 0 | auto it = priority_context_.cluster_and_priority_to_linearized_priority_.find( |
141 | 0 | std::make_pair(host.cluster().name(), host.priority())); |
142 | |
|
143 | 0 | if (it != priority_context_.cluster_and_priority_to_linearized_priority_.end()) { |
144 | 0 | return it->second; |
145 | 0 | } else { |
146 | | // The HostSet can change due to CDS/EDS updates between retries. |
147 | 0 | return absl::nullopt; |
148 | 0 | } |
149 | 0 | } |
150 | | |
151 | | Upstream::HostConstSharedPtr |
152 | 0 | AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) { |
153 | 0 | const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr; |
154 | 0 | if (context != nullptr) { |
155 | 0 | priority_loads = &context->determinePriorityLoad( |
156 | 0 | priority_set_, per_priority_load_, |
157 | 0 | [this](const auto& host) { return hostToLinearizedPriority(host); }); |
158 | 0 | } else { |
159 | 0 | priority_loads = &per_priority_load_; |
160 | 0 | } |
161 | |
|
162 | 0 | const auto priority_pair = |
163 | 0 | choosePriority(random_.random(), priority_loads->healthy_priority_load_, |
164 | 0 | priority_loads->degraded_priority_load_); |
165 | |
|
166 | 0 | AggregateLoadBalancerContext aggregate_context( |
167 | 0 | context, priority_pair.second, |
168 | 0 | priority_context_.priority_to_cluster_[priority_pair.first].first); |
169 | |
|
170 | 0 | Upstream::ThreadLocalCluster* cluster = |
171 | 0 | priority_context_.priority_to_cluster_[priority_pair.first].second; |
172 | 0 | return cluster->loadBalancer().chooseHost(&aggregate_context); |
173 | 0 | } |
174 | | |
175 | | Upstream::HostConstSharedPtr |
176 | 0 | AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) { |
177 | 0 | if (load_balancer_) { |
178 | 0 | return load_balancer_->chooseHost(context); |
179 | 0 | } |
180 | 0 | return nullptr; |
181 | 0 | } |
182 | | |
183 | | Upstream::HostConstSharedPtr |
184 | 0 | AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) { |
185 | 0 | if (load_balancer_) { |
186 | 0 | return load_balancer_->peekAnotherHost(context); |
187 | 0 | } |
188 | 0 | return nullptr; |
189 | 0 | } |
190 | | |
191 | | absl::optional<Upstream::SelectedPoolAndConnection> |
192 | | AggregateClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context, |
193 | | const Upstream::Host& host, |
194 | 0 | std::vector<uint8_t>& hash_key) { |
195 | 0 | if (load_balancer_) { |
196 | 0 | return load_balancer_->selectExistingConnection(context, host, hash_key); |
197 | 0 | } |
198 | 0 | return absl::nullopt; |
199 | 0 | } |
200 | | |
201 | | OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> |
202 | 0 | AggregateClusterLoadBalancer::lifetimeCallbacks() { |
203 | 0 | if (load_balancer_) { |
204 | 0 | return load_balancer_->lifetimeCallbacks(); |
205 | 0 | } |
206 | 0 | return {}; |
207 | 0 | } |
208 | | |
209 | | absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>> |
210 | | ClusterFactory::createClusterWithConfig( |
211 | | const envoy::config::cluster::v3::Cluster& cluster, |
212 | | const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config, |
213 | 0 | Upstream::ClusterFactoryContext& context) { |
214 | 0 | auto new_cluster = std::make_shared<Cluster>(cluster, proto_config, context); |
215 | 0 | auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster); |
216 | 0 | return std::make_pair(new_cluster, std::move(lb)); |
217 | 0 | } |
218 | | |
219 | | REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory); |
220 | | |
221 | | } // namespace Aggregate |
222 | | } // namespace Clusters |
223 | | } // namespace Extensions |
224 | | } // namespace Envoy |