Line data Source code
1 : #include "source/extensions/clusters/aggregate/cluster.h"
2 :
3 : #include "envoy/config/cluster/v3/cluster.pb.h"
4 : #include "envoy/event/dispatcher.h"
5 : #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
6 : #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
7 :
8 : #include "source/common/common/assert.h"
9 :
10 : namespace Envoy {
11 : namespace Extensions {
12 : namespace Clusters {
13 : namespace Aggregate {
14 :
15 : Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster,
16 : const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
17 : Upstream::ClusterFactoryContext& context)
18 : : Upstream::ClusterImplBase(cluster, context), cluster_manager_(context.clusterManager()),
19 : runtime_(context.serverFactoryContext().runtime()),
20 : random_(context.serverFactoryContext().api().randomGenerator()),
21 0 : clusters_(std::make_shared<ClusterSet>(config.clusters().begin(), config.clusters().end())) {}
22 :
23 : AggregateClusterLoadBalancer::AggregateClusterLoadBalancer(
24 : const Upstream::ClusterInfoConstSharedPtr& parent_info,
25 : Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
26 : Random::RandomGenerator& random, const ClusterSetConstSharedPtr& clusters)
27 : : parent_info_(parent_info), cluster_manager_(cluster_manager), runtime_(runtime),
28 0 : random_(random), clusters_(clusters) {
29 0 : for (const auto& cluster : *clusters_) {
30 0 : auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
31 : // It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the
32 : // cluster could be added dynamically by xDS.
33 0 : if (tlc == nullptr) {
34 0 : continue;
35 0 : }
36 :
37 : // Add callback for clusters initialized before aggregate cluster.
38 0 : addMemberUpdateCallbackForCluster(*tlc);
39 0 : }
40 0 : refresh();
41 0 : handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
42 0 : }
43 :
44 : void AggregateClusterLoadBalancer::addMemberUpdateCallbackForCluster(
45 0 : Upstream::ThreadLocalCluster& thread_local_cluster) {
46 0 : member_update_cbs_[thread_local_cluster.info()->name()] =
47 0 : thread_local_cluster.prioritySet().addMemberUpdateCb(
48 0 : [this, target_cluster_info = thread_local_cluster.info()](const Upstream::HostVector&,
49 0 : const Upstream::HostVector&) {
50 0 : ENVOY_LOG(debug, "member update for cluster '{}' in aggregate cluster '{}'",
51 0 : target_cluster_info->name(), parent_info_->name());
52 0 : refresh();
53 0 : });
54 0 : }
55 :
56 : PriorityContextPtr
57 0 : AggregateClusterLoadBalancer::linearizePrioritySet(OptRef<const std::string> excluded_cluster) {
58 0 : PriorityContextPtr priority_context = std::make_unique<PriorityContext>();
59 0 : uint32_t next_priority_after_linearizing = 0;
60 :
61 : // Linearize the priority set. e.g. for clusters [C_0, C_1, C_2] referred in aggregate cluster
62 : // C_0 [P_0, P_1, P_2]
63 : // C_1 [P_0, P_1]
64 : // C_2 [P_0, P_1, P_2, P_3]
65 : // The linearization result is:
66 : // [C_0.P_0, C_0.P_1, C_0.P_2, C_1.P_0, C_1.P_1, C_2.P_0, C_2.P_1, C_2.P_2, C_2.P_3]
67 : // and the traffic will be distributed among these priorities.
68 0 : for (const auto& cluster : *clusters_) {
69 0 : if (excluded_cluster.has_value() && excluded_cluster.value().get() == cluster) {
70 0 : continue;
71 0 : }
72 0 : auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
73 : // It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the
74 : // cluster hasn't been added by xDS.
75 0 : if (tlc == nullptr) {
76 0 : ENVOY_LOG(debug, "refresh: cluster '{}' absent in aggregate cluster '{}'", cluster,
77 0 : parent_info_->name());
78 0 : continue;
79 0 : } else {
80 0 : ENVOY_LOG(debug, "refresh: cluster '{}' found in aggregate cluster '{}'", cluster,
81 0 : parent_info_->name());
82 0 : }
83 :
84 0 : uint32_t priority_in_current_cluster = 0;
85 0 : for (const auto& host_set : tlc->prioritySet().hostSetsPerPriority()) {
86 0 : if (!host_set->hosts().empty()) {
87 0 : priority_context->priority_set_.updateHosts(
88 0 : next_priority_after_linearizing, Upstream::HostSetImpl::updateHostsParams(*host_set),
89 0 : host_set->localityWeights(), host_set->hosts(), {}, host_set->weightedPriorityHealth(),
90 0 : host_set->overprovisioningFactor());
91 0 : priority_context->priority_to_cluster_.emplace_back(
92 0 : std::make_pair(priority_in_current_cluster, tlc));
93 :
94 0 : priority_context->cluster_and_priority_to_linearized_priority_[std::make_pair(
95 0 : cluster, priority_in_current_cluster)] = next_priority_after_linearizing;
96 0 : next_priority_after_linearizing++;
97 0 : }
98 0 : priority_in_current_cluster++;
99 0 : }
100 0 : }
101 :
102 0 : return priority_context;
103 0 : }
104 :
105 0 : void AggregateClusterLoadBalancer::refresh(OptRef<const std::string> excluded_cluster) {
106 0 : PriorityContextPtr priority_context = linearizePrioritySet(excluded_cluster);
107 0 : if (!priority_context->priority_set_.hostSetsPerPriority().empty()) {
108 0 : load_balancer_ = std::make_unique<LoadBalancerImpl>(
109 0 : *priority_context, parent_info_->lbStats(), runtime_, random_, parent_info_->lbConfig());
110 0 : } else {
111 0 : load_balancer_ = nullptr;
112 0 : }
113 0 : priority_context_ = std::move(priority_context);
114 0 : }
115 :
116 : void AggregateClusterLoadBalancer::onClusterAddOrUpdate(
117 0 : absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) {
118 0 : if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
119 0 : ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", cluster_name,
120 0 : parent_info_->name());
121 0 : auto& cluster = get_cluster();
122 0 : refresh();
123 0 : addMemberUpdateCallbackForCluster(cluster);
124 0 : }
125 0 : }
126 :
127 0 : void AggregateClusterLoadBalancer::onClusterRemoval(const std::string& cluster_name) {
128 : // The onClusterRemoval callback is called before the thread local cluster is removed. There
129 : // will be a dangling pointer to the thread local cluster if the deleted cluster is not skipped
130 : // when we refresh the load balancer.
131 0 : if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
132 0 : ENVOY_LOG(debug, "removing cluster '{}' from aggregate cluster '{}'", cluster_name,
133 0 : parent_info_->name());
134 0 : refresh(cluster_name);
135 0 : }
136 0 : }
137 :
138 : absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToLinearizedPriority(
139 0 : const Upstream::HostDescription& host) const {
140 0 : auto it = priority_context_.cluster_and_priority_to_linearized_priority_.find(
141 0 : std::make_pair(host.cluster().name(), host.priority()));
142 :
143 0 : if (it != priority_context_.cluster_and_priority_to_linearized_priority_.end()) {
144 0 : return it->second;
145 0 : } else {
146 : // The HostSet can change due to CDS/EDS updates between retries.
147 0 : return absl::nullopt;
148 0 : }
149 0 : }
150 :
151 : Upstream::HostConstSharedPtr
152 0 : AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) {
153 0 : const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr;
154 0 : if (context != nullptr) {
155 0 : priority_loads = &context->determinePriorityLoad(
156 0 : priority_set_, per_priority_load_,
157 0 : [this](const auto& host) { return hostToLinearizedPriority(host); });
158 0 : } else {
159 0 : priority_loads = &per_priority_load_;
160 0 : }
161 :
162 0 : const auto priority_pair =
163 0 : choosePriority(random_.random(), priority_loads->healthy_priority_load_,
164 0 : priority_loads->degraded_priority_load_);
165 :
166 0 : AggregateLoadBalancerContext aggregate_context(
167 0 : context, priority_pair.second,
168 0 : priority_context_.priority_to_cluster_[priority_pair.first].first);
169 :
170 0 : Upstream::ThreadLocalCluster* cluster =
171 0 : priority_context_.priority_to_cluster_[priority_pair.first].second;
172 0 : return cluster->loadBalancer().chooseHost(&aggregate_context);
173 0 : }
174 :
175 : Upstream::HostConstSharedPtr
176 0 : AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
177 0 : if (load_balancer_) {
178 0 : return load_balancer_->chooseHost(context);
179 0 : }
180 0 : return nullptr;
181 0 : }
182 :
183 : Upstream::HostConstSharedPtr
184 0 : AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) {
185 0 : if (load_balancer_) {
186 0 : return load_balancer_->peekAnotherHost(context);
187 0 : }
188 0 : return nullptr;
189 0 : }
190 :
191 : absl::optional<Upstream::SelectedPoolAndConnection>
192 : AggregateClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context,
193 : const Upstream::Host& host,
194 0 : std::vector<uint8_t>& hash_key) {
195 0 : if (load_balancer_) {
196 0 : return load_balancer_->selectExistingConnection(context, host, hash_key);
197 0 : }
198 0 : return absl::nullopt;
199 0 : }
200 :
201 : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
202 0 : AggregateClusterLoadBalancer::lifetimeCallbacks() {
203 0 : if (load_balancer_) {
204 0 : return load_balancer_->lifetimeCallbacks();
205 0 : }
206 0 : return {};
207 0 : }
208 :
209 : absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
210 : ClusterFactory::createClusterWithConfig(
211 : const envoy::config::cluster::v3::Cluster& cluster,
212 : const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
213 0 : Upstream::ClusterFactoryContext& context) {
214 0 : auto new_cluster = std::make_shared<Cluster>(cluster, proto_config, context);
215 0 : auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster);
216 0 : return std::make_pair(new_cluster, std::move(lb));
217 0 : }
218 :
219 : REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
220 :
221 : } // namespace Aggregate
222 : } // namespace Clusters
223 : } // namespace Extensions
224 : } // namespace Envoy
|