/proc/self/cwd/source/extensions/clusters/aggregate/cluster.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/clusters/aggregate/cluster.h" |
2 | | |
3 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
4 | | #include "envoy/event/dispatcher.h" |
5 | | #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" |
6 | | #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h" |
7 | | |
8 | | #include "source/common/common/assert.h" |
9 | | |
10 | | namespace Envoy { |
11 | | namespace Extensions { |
12 | | namespace Clusters { |
13 | | namespace Aggregate { |
14 | | |
15 | | Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster, |
16 | | const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config, |
17 | | Upstream::ClusterFactoryContext& context, absl::Status& creation_status) |
18 | | : Upstream::ClusterImplBase(cluster, context, creation_status), |
19 | | cluster_manager_(context.clusterManager()), |
20 | | runtime_(context.serverFactoryContext().runtime()), |
21 | | random_(context.serverFactoryContext().api().randomGenerator()), |
22 | 0 | clusters_(std::make_shared<ClusterSet>(config.clusters().begin(), config.clusters().end())) {} |
23 | | |
24 | | AggregateClusterLoadBalancer::AggregateClusterLoadBalancer( |
25 | | const Upstream::ClusterInfoConstSharedPtr& parent_info, |
26 | | Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime, |
27 | | Random::RandomGenerator& random, const ClusterSetConstSharedPtr& clusters) |
28 | | : parent_info_(parent_info), cluster_manager_(cluster_manager), runtime_(runtime), |
29 | 0 | random_(random), clusters_(clusters) { |
30 | 0 | for (const auto& cluster : *clusters_) { |
31 | 0 | auto tlc = cluster_manager_.getThreadLocalCluster(cluster); |
32 | | // It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the |
33 | | // cluster could be added dynamically by xDS. |
34 | 0 | if (tlc == nullptr) { |
35 | 0 | continue; |
36 | 0 | } |
37 | | |
38 | | // Add callback for clusters initialized before aggregate cluster. |
39 | 0 | addMemberUpdateCallbackForCluster(*tlc); |
40 | 0 | } |
41 | 0 | refresh(); |
42 | 0 | handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this); |
43 | 0 | } |
44 | | |
45 | | void AggregateClusterLoadBalancer::addMemberUpdateCallbackForCluster( |
46 | 0 | Upstream::ThreadLocalCluster& thread_local_cluster) { |
47 | 0 | member_update_cbs_[thread_local_cluster.info()->name()] = |
48 | 0 | thread_local_cluster.prioritySet().addMemberUpdateCb( |
49 | 0 | [this, target_cluster_info = thread_local_cluster.info()](const Upstream::HostVector&, |
50 | 0 | const Upstream::HostVector&) { |
51 | 0 | ENVOY_LOG(debug, "member update for cluster '{}' in aggregate cluster '{}'", |
52 | 0 | target_cluster_info->name(), parent_info_->name()); |
53 | 0 | refresh(); |
54 | 0 | return absl::OkStatus(); |
55 | 0 | }); |
56 | 0 | } |
57 | | |
58 | | PriorityContextPtr |
59 | 0 | AggregateClusterLoadBalancer::linearizePrioritySet(OptRef<const std::string> excluded_cluster) { |
60 | 0 | PriorityContextPtr priority_context = std::make_unique<PriorityContext>(); |
61 | 0 | uint32_t next_priority_after_linearizing = 0; |
62 | | |
63 | | // Linearize the priority set. e.g. for clusters [C_0, C_1, C_2] referred in aggregate cluster |
64 | | // C_0 [P_0, P_1, P_2] |
65 | | // C_1 [P_0, P_1] |
66 | | // C_2 [P_0, P_1, P_2, P_3] |
67 | | // The linearization result is: |
68 | | // [C_0.P_0, C_0.P_1, C_0.P_2, C_1.P_0, C_1.P_1, C_2.P_0, C_2.P_1, C_2.P_2, C_2.P_3] |
69 | | // and the traffic will be distributed among these priorities. |
70 | 0 | for (const auto& cluster : *clusters_) { |
71 | 0 | if (excluded_cluster.has_value() && excluded_cluster.value().get() == cluster) { |
72 | 0 | continue; |
73 | 0 | } |
74 | 0 | auto tlc = cluster_manager_.getThreadLocalCluster(cluster); |
75 | | // It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the |
76 | | // cluster hasn't been added by xDS. |
77 | 0 | if (tlc == nullptr) { |
78 | 0 | ENVOY_LOG(debug, "refresh: cluster '{}' absent in aggregate cluster '{}'", cluster, |
79 | 0 | parent_info_->name()); |
80 | 0 | continue; |
81 | 0 | } else { |
82 | 0 | ENVOY_LOG(debug, "refresh: cluster '{}' found in aggregate cluster '{}'", cluster, |
83 | 0 | parent_info_->name()); |
84 | 0 | } |
85 | | |
86 | 0 | uint32_t priority_in_current_cluster = 0; |
87 | 0 | for (const auto& host_set : tlc->prioritySet().hostSetsPerPriority()) { |
88 | 0 | if (!host_set->hosts().empty()) { |
89 | 0 | priority_context->priority_set_.updateHosts( |
90 | 0 | next_priority_after_linearizing, Upstream::HostSetImpl::updateHostsParams(*host_set), |
91 | 0 | host_set->localityWeights(), host_set->hosts(), {}, random_.random(), |
92 | 0 | host_set->weightedPriorityHealth(), host_set->overprovisioningFactor()); |
93 | 0 | priority_context->priority_to_cluster_.emplace_back( |
94 | 0 | std::make_pair(priority_in_current_cluster, tlc)); |
95 | |
|
96 | 0 | priority_context->cluster_and_priority_to_linearized_priority_[std::make_pair( |
97 | 0 | cluster, priority_in_current_cluster)] = next_priority_after_linearizing; |
98 | 0 | next_priority_after_linearizing++; |
99 | 0 | } |
100 | 0 | priority_in_current_cluster++; |
101 | 0 | } |
102 | 0 | } |
103 | |
|
104 | 0 | return priority_context; |
105 | 0 | } |
106 | | |
107 | 0 | void AggregateClusterLoadBalancer::refresh(OptRef<const std::string> excluded_cluster) { |
108 | 0 | PriorityContextPtr priority_context = linearizePrioritySet(excluded_cluster); |
109 | 0 | if (!priority_context->priority_set_.hostSetsPerPriority().empty()) { |
110 | 0 | load_balancer_ = std::make_unique<LoadBalancerImpl>( |
111 | 0 | *priority_context, parent_info_->lbStats(), runtime_, random_, parent_info_->lbConfig()); |
112 | 0 | } else { |
113 | 0 | load_balancer_ = nullptr; |
114 | 0 | } |
115 | 0 | priority_context_ = std::move(priority_context); |
116 | 0 | } |
117 | | |
118 | | void AggregateClusterLoadBalancer::onClusterAddOrUpdate( |
119 | 0 | absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) { |
120 | 0 | if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) { |
121 | 0 | ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", cluster_name, |
122 | 0 | parent_info_->name()); |
123 | 0 | auto& cluster = get_cluster(); |
124 | 0 | refresh(); |
125 | 0 | addMemberUpdateCallbackForCluster(cluster); |
126 | 0 | } |
127 | 0 | } |
128 | | |
129 | 0 | void AggregateClusterLoadBalancer::onClusterRemoval(const std::string& cluster_name) { |
130 | | // The onClusterRemoval callback is called before the thread local cluster is removed. There |
131 | | // will be a dangling pointer to the thread local cluster if the deleted cluster is not skipped |
132 | | // when we refresh the load balancer. |
133 | 0 | if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) { |
134 | 0 | ENVOY_LOG(debug, "removing cluster '{}' from aggregate cluster '{}'", cluster_name, |
135 | 0 | parent_info_->name()); |
136 | 0 | refresh(cluster_name); |
137 | 0 | } |
138 | 0 | } |
139 | | |
140 | | absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToLinearizedPriority( |
141 | 0 | const Upstream::HostDescription& host) const { |
142 | 0 | auto it = priority_context_.cluster_and_priority_to_linearized_priority_.find( |
143 | 0 | std::make_pair(host.cluster().name(), host.priority())); |
144 | |
|
145 | 0 | if (it != priority_context_.cluster_and_priority_to_linearized_priority_.end()) { |
146 | 0 | return it->second; |
147 | 0 | } else { |
148 | | // The HostSet can change due to CDS/EDS updates between retries. |
149 | 0 | return absl::nullopt; |
150 | 0 | } |
151 | 0 | } |
152 | | |
153 | | Upstream::HostConstSharedPtr |
154 | 0 | AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) { |
155 | 0 | const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr; |
156 | 0 | if (context != nullptr) { |
157 | 0 | priority_loads = &context->determinePriorityLoad( |
158 | 0 | priority_set_, per_priority_load_, |
159 | 0 | [this](const auto& host) { return hostToLinearizedPriority(host); }); |
160 | 0 | } else { |
161 | 0 | priority_loads = &per_priority_load_; |
162 | 0 | } |
163 | |
|
164 | 0 | const auto priority_pair = |
165 | 0 | choosePriority(random_.random(), priority_loads->healthy_priority_load_, |
166 | 0 | priority_loads->degraded_priority_load_); |
167 | |
|
168 | 0 | AggregateLoadBalancerContext aggregate_context( |
169 | 0 | context, priority_pair.second, |
170 | 0 | priority_context_.priority_to_cluster_[priority_pair.first].first); |
171 | |
|
172 | 0 | Upstream::ThreadLocalCluster* cluster = |
173 | 0 | priority_context_.priority_to_cluster_[priority_pair.first].second; |
174 | 0 | return cluster->loadBalancer().chooseHost(&aggregate_context); |
175 | 0 | } |
176 | | |
177 | | Upstream::HostConstSharedPtr |
178 | 0 | AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) { |
179 | 0 | if (load_balancer_) { |
180 | 0 | return load_balancer_->chooseHost(context); |
181 | 0 | } |
182 | 0 | return nullptr; |
183 | 0 | } |
184 | | |
185 | | Upstream::HostConstSharedPtr |
186 | 0 | AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) { |
187 | 0 | if (load_balancer_) { |
188 | 0 | return load_balancer_->peekAnotherHost(context); |
189 | 0 | } |
190 | 0 | return nullptr; |
191 | 0 | } |
192 | | |
193 | | absl::optional<Upstream::SelectedPoolAndConnection> |
194 | | AggregateClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context, |
195 | | const Upstream::Host& host, |
196 | 0 | std::vector<uint8_t>& hash_key) { |
197 | 0 | if (load_balancer_) { |
198 | 0 | return load_balancer_->selectExistingConnection(context, host, hash_key); |
199 | 0 | } |
200 | 0 | return absl::nullopt; |
201 | 0 | } |
202 | | |
203 | | OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> |
204 | 0 | AggregateClusterLoadBalancer::lifetimeCallbacks() { |
205 | 0 | if (load_balancer_) { |
206 | 0 | return load_balancer_->lifetimeCallbacks(); |
207 | 0 | } |
208 | 0 | return {}; |
209 | 0 | } |
210 | | |
211 | | absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>> |
212 | | ClusterFactory::createClusterWithConfig( |
213 | | const envoy::config::cluster::v3::Cluster& cluster, |
214 | | const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config, |
215 | 0 | Upstream::ClusterFactoryContext& context) { |
216 | 0 | absl::Status creation_status = absl::OkStatus(); |
217 | 0 | auto new_cluster = |
218 | 0 | std::shared_ptr<Cluster>(new Cluster(cluster, proto_config, context, creation_status)); |
219 | 0 | RETURN_IF_NOT_OK(creation_status); |
220 | 0 | auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster); |
221 | 0 | return std::make_pair(new_cluster, std::move(lb)); |
222 | 0 | } |
223 | | |
224 | | REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory); |
225 | | |
226 | | } // namespace Aggregate |
227 | | } // namespace Clusters |
228 | | } // namespace Extensions |
229 | | } // namespace Envoy |