Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/extensions/clusters/aggregate/cluster.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/clusters/aggregate/cluster.h"
2
3
#include "envoy/config/cluster/v3/cluster.pb.h"
4
#include "envoy/event/dispatcher.h"
5
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
7
8
#include "source/common/common/assert.h"
9
10
namespace Envoy {
11
namespace Extensions {
12
namespace Clusters {
13
namespace Aggregate {
14
15
Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster,
16
                 const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
17
                 Upstream::ClusterFactoryContext& context, absl::Status& creation_status)
18
    : Upstream::ClusterImplBase(cluster, context, creation_status),
19
      cluster_manager_(context.clusterManager()),
20
      runtime_(context.serverFactoryContext().runtime()),
21
      random_(context.serverFactoryContext().api().randomGenerator()),
22
0
      clusters_(std::make_shared<ClusterSet>(config.clusters().begin(), config.clusters().end())) {}
23
24
AggregateClusterLoadBalancer::AggregateClusterLoadBalancer(
25
    const Upstream::ClusterInfoConstSharedPtr& parent_info,
26
    Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
27
    Random::RandomGenerator& random, const ClusterSetConstSharedPtr& clusters)
28
    : parent_info_(parent_info), cluster_manager_(cluster_manager), runtime_(runtime),
29
0
      random_(random), clusters_(clusters) {
30
0
  for (const auto& cluster : *clusters_) {
31
0
    auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
32
    // It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the
33
    // cluster could be added dynamically by xDS.
34
0
    if (tlc == nullptr) {
35
0
      continue;
36
0
    }
37
38
    // Add callback for clusters initialized before aggregate cluster.
39
0
    addMemberUpdateCallbackForCluster(*tlc);
40
0
  }
41
0
  refresh();
42
0
  handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
43
0
}
44
45
void AggregateClusterLoadBalancer::addMemberUpdateCallbackForCluster(
46
0
    Upstream::ThreadLocalCluster& thread_local_cluster) {
47
0
  member_update_cbs_[thread_local_cluster.info()->name()] =
48
0
      thread_local_cluster.prioritySet().addMemberUpdateCb(
49
0
          [this, target_cluster_info = thread_local_cluster.info()](const Upstream::HostVector&,
50
0
                                                                    const Upstream::HostVector&) {
51
0
            ENVOY_LOG(debug, "member update for cluster '{}' in aggregate cluster '{}'",
52
0
                      target_cluster_info->name(), parent_info_->name());
53
0
            refresh();
54
0
            return absl::OkStatus();
55
0
          });
56
0
}
57
58
PriorityContextPtr
59
0
AggregateClusterLoadBalancer::linearizePrioritySet(OptRef<const std::string> excluded_cluster) {
60
0
  PriorityContextPtr priority_context = std::make_unique<PriorityContext>();
61
0
  uint32_t next_priority_after_linearizing = 0;
62
63
  // Linearize the priority set. e.g. for clusters [C_0, C_1, C_2] referred in aggregate cluster
64
  //    C_0 [P_0, P_1, P_2]
65
  //    C_1 [P_0, P_1]
66
  //    C_2 [P_0, P_1, P_2, P_3]
67
  // The linearization result is:
68
  //    [C_0.P_0, C_0.P_1, C_0.P_2, C_1.P_0, C_1.P_1, C_2.P_0, C_2.P_1, C_2.P_2, C_2.P_3]
69
  // and the traffic will be distributed among these priorities.
70
0
  for (const auto& cluster : *clusters_) {
71
0
    if (excluded_cluster.has_value() && excluded_cluster.value().get() == cluster) {
72
0
      continue;
73
0
    }
74
0
    auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
75
    // It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the
76
    // cluster hasn't been added by xDS.
77
0
    if (tlc == nullptr) {
78
0
      ENVOY_LOG(debug, "refresh: cluster '{}' absent in aggregate cluster '{}'", cluster,
79
0
                parent_info_->name());
80
0
      continue;
81
0
    } else {
82
0
      ENVOY_LOG(debug, "refresh: cluster '{}' found in aggregate cluster '{}'", cluster,
83
0
                parent_info_->name());
84
0
    }
85
86
0
    uint32_t priority_in_current_cluster = 0;
87
0
    for (const auto& host_set : tlc->prioritySet().hostSetsPerPriority()) {
88
0
      if (!host_set->hosts().empty()) {
89
0
        priority_context->priority_set_.updateHosts(
90
0
            next_priority_after_linearizing, Upstream::HostSetImpl::updateHostsParams(*host_set),
91
0
            host_set->localityWeights(), host_set->hosts(), {}, random_.random(),
92
0
            host_set->weightedPriorityHealth(), host_set->overprovisioningFactor());
93
0
        priority_context->priority_to_cluster_.emplace_back(
94
0
            std::make_pair(priority_in_current_cluster, tlc));
95
96
0
        priority_context->cluster_and_priority_to_linearized_priority_[std::make_pair(
97
0
            cluster, priority_in_current_cluster)] = next_priority_after_linearizing;
98
0
        next_priority_after_linearizing++;
99
0
      }
100
0
      priority_in_current_cluster++;
101
0
    }
102
0
  }
103
104
0
  return priority_context;
105
0
}
106
107
0
void AggregateClusterLoadBalancer::refresh(OptRef<const std::string> excluded_cluster) {
108
0
  PriorityContextPtr priority_context = linearizePrioritySet(excluded_cluster);
109
0
  if (!priority_context->priority_set_.hostSetsPerPriority().empty()) {
110
0
    load_balancer_ = std::make_unique<LoadBalancerImpl>(
111
0
        *priority_context, parent_info_->lbStats(), runtime_, random_, parent_info_->lbConfig());
112
0
  } else {
113
0
    load_balancer_ = nullptr;
114
0
  }
115
0
  priority_context_ = std::move(priority_context);
116
0
}
117
118
void AggregateClusterLoadBalancer::onClusterAddOrUpdate(
119
0
    absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) {
120
0
  if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
121
0
    ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", cluster_name,
122
0
              parent_info_->name());
123
0
    auto& cluster = get_cluster();
124
0
    refresh();
125
0
    addMemberUpdateCallbackForCluster(cluster);
126
0
  }
127
0
}
128
129
0
void AggregateClusterLoadBalancer::onClusterRemoval(const std::string& cluster_name) {
130
  //  The onClusterRemoval callback is called before the thread local cluster is removed. There
131
  //  will be a dangling pointer to the thread local cluster if the deleted cluster is not skipped
132
  //  when we refresh the load balancer.
133
0
  if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
134
0
    ENVOY_LOG(debug, "removing cluster '{}' from aggregate cluster '{}'", cluster_name,
135
0
              parent_info_->name());
136
0
    refresh(cluster_name);
137
0
  }
138
0
}
139
140
absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToLinearizedPriority(
141
0
    const Upstream::HostDescription& host) const {
142
0
  auto it = priority_context_.cluster_and_priority_to_linearized_priority_.find(
143
0
      std::make_pair(host.cluster().name(), host.priority()));
144
145
0
  if (it != priority_context_.cluster_and_priority_to_linearized_priority_.end()) {
146
0
    return it->second;
147
0
  } else {
148
    // The HostSet can change due to CDS/EDS updates between retries.
149
0
    return absl::nullopt;
150
0
  }
151
0
}
152
153
Upstream::HostConstSharedPtr
154
0
AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) {
155
0
  const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr;
156
0
  if (context != nullptr) {
157
0
    priority_loads = &context->determinePriorityLoad(
158
0
        priority_set_, per_priority_load_,
159
0
        [this](const auto& host) { return hostToLinearizedPriority(host); });
160
0
  } else {
161
0
    priority_loads = &per_priority_load_;
162
0
  }
163
164
0
  const auto priority_pair =
165
0
      choosePriority(random_.random(), priority_loads->healthy_priority_load_,
166
0
                     priority_loads->degraded_priority_load_);
167
168
0
  AggregateLoadBalancerContext aggregate_context(
169
0
      context, priority_pair.second,
170
0
      priority_context_.priority_to_cluster_[priority_pair.first].first);
171
172
0
  Upstream::ThreadLocalCluster* cluster =
173
0
      priority_context_.priority_to_cluster_[priority_pair.first].second;
174
0
  return cluster->loadBalancer().chooseHost(&aggregate_context);
175
0
}
176
177
Upstream::HostConstSharedPtr
178
0
AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
179
0
  if (load_balancer_) {
180
0
    return load_balancer_->chooseHost(context);
181
0
  }
182
0
  return nullptr;
183
0
}
184
185
Upstream::HostConstSharedPtr
186
0
AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) {
187
0
  if (load_balancer_) {
188
0
    return load_balancer_->peekAnotherHost(context);
189
0
  }
190
0
  return nullptr;
191
0
}
192
193
absl::optional<Upstream::SelectedPoolAndConnection>
194
AggregateClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context,
195
                                                       const Upstream::Host& host,
196
0
                                                       std::vector<uint8_t>& hash_key) {
197
0
  if (load_balancer_) {
198
0
    return load_balancer_->selectExistingConnection(context, host, hash_key);
199
0
  }
200
0
  return absl::nullopt;
201
0
}
202
203
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
204
0
AggregateClusterLoadBalancer::lifetimeCallbacks() {
205
0
  if (load_balancer_) {
206
0
    return load_balancer_->lifetimeCallbacks();
207
0
  }
208
0
  return {};
209
0
}
210
211
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
212
ClusterFactory::createClusterWithConfig(
213
    const envoy::config::cluster::v3::Cluster& cluster,
214
    const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
215
0
    Upstream::ClusterFactoryContext& context) {
216
0
  absl::Status creation_status = absl::OkStatus();
217
0
  auto new_cluster =
218
0
      std::shared_ptr<Cluster>(new Cluster(cluster, proto_config, context, creation_status));
219
0
  RETURN_IF_NOT_OK(creation_status);
220
0
  auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster);
221
0
  return std::make_pair(new_cluster, std::move(lb));
222
0
}
223
224
REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
225
226
} // namespace Aggregate
227
} // namespace Clusters
228
} // namespace Extensions
229
} // namespace Envoy