Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/clusters/aggregate/cluster.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/clusters/aggregate/cluster.h"
2
3
#include "envoy/config/cluster/v3/cluster.pb.h"
4
#include "envoy/event/dispatcher.h"
5
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
7
8
#include "source/common/common/assert.h"
9
10
namespace Envoy {
11
namespace Extensions {
12
namespace Clusters {
13
namespace Aggregate {
14
15
Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster,
16
                 const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
17
                 Upstream::ClusterFactoryContext& context)
18
    : Upstream::ClusterImplBase(cluster, context), cluster_manager_(context.clusterManager()),
19
      runtime_(context.serverFactoryContext().runtime()),
20
      random_(context.serverFactoryContext().api().randomGenerator()),
21
0
      clusters_(std::make_shared<ClusterSet>(config.clusters().begin(), config.clusters().end())) {}
22
23
AggregateClusterLoadBalancer::AggregateClusterLoadBalancer(
24
    const Upstream::ClusterInfoConstSharedPtr& parent_info,
25
    Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
26
    Random::RandomGenerator& random, const ClusterSetConstSharedPtr& clusters)
27
    : parent_info_(parent_info), cluster_manager_(cluster_manager), runtime_(runtime),
28
0
      random_(random), clusters_(clusters) {
29
0
  for (const auto& cluster : *clusters_) {
30
0
    auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
31
    // It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the
32
    // cluster could be added dynamically by xDS.
33
0
    if (tlc == nullptr) {
34
0
      continue;
35
0
    }
36
37
    // Add callback for clusters initialized before aggregate cluster.
38
0
    addMemberUpdateCallbackForCluster(*tlc);
39
0
  }
40
0
  refresh();
41
0
  handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
42
0
}
43
44
void AggregateClusterLoadBalancer::addMemberUpdateCallbackForCluster(
45
0
    Upstream::ThreadLocalCluster& thread_local_cluster) {
46
0
  member_update_cbs_[thread_local_cluster.info()->name()] =
47
0
      thread_local_cluster.prioritySet().addMemberUpdateCb(
48
0
          [this, target_cluster_info = thread_local_cluster.info()](const Upstream::HostVector&,
49
0
                                                                    const Upstream::HostVector&) {
50
0
            ENVOY_LOG(debug, "member update for cluster '{}' in aggregate cluster '{}'",
51
0
                      target_cluster_info->name(), parent_info_->name());
52
0
            refresh();
53
0
          });
54
0
}
55
56
PriorityContextPtr
57
0
AggregateClusterLoadBalancer::linearizePrioritySet(OptRef<const std::string> excluded_cluster) {
58
0
  PriorityContextPtr priority_context = std::make_unique<PriorityContext>();
59
0
  uint32_t next_priority_after_linearizing = 0;
60
61
  // Linearize the priority set. e.g. for clusters [C_0, C_1, C_2] referred in aggregate cluster
62
  //    C_0 [P_0, P_1, P_2]
63
  //    C_1 [P_0, P_1]
64
  //    C_2 [P_0, P_1, P_2, P_3]
65
  // The linearization result is:
66
  //    [C_0.P_0, C_0.P_1, C_0.P_2, C_1.P_0, C_1.P_1, C_2.P_0, C_2.P_1, C_2.P_2, C_2.P_3]
67
  // and the traffic will be distributed among these priorities.
68
0
  for (const auto& cluster : *clusters_) {
69
0
    if (excluded_cluster.has_value() && excluded_cluster.value().get() == cluster) {
70
0
      continue;
71
0
    }
72
0
    auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
73
    // It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the
74
    // cluster hasn't been added by xDS.
75
0
    if (tlc == nullptr) {
76
0
      ENVOY_LOG(debug, "refresh: cluster '{}' absent in aggregate cluster '{}'", cluster,
77
0
                parent_info_->name());
78
0
      continue;
79
0
    } else {
80
0
      ENVOY_LOG(debug, "refresh: cluster '{}' found in aggregate cluster '{}'", cluster,
81
0
                parent_info_->name());
82
0
    }
83
84
0
    uint32_t priority_in_current_cluster = 0;
85
0
    for (const auto& host_set : tlc->prioritySet().hostSetsPerPriority()) {
86
0
      if (!host_set->hosts().empty()) {
87
0
        priority_context->priority_set_.updateHosts(
88
0
            next_priority_after_linearizing, Upstream::HostSetImpl::updateHostsParams(*host_set),
89
0
            host_set->localityWeights(), host_set->hosts(), {}, host_set->weightedPriorityHealth(),
90
0
            host_set->overprovisioningFactor());
91
0
        priority_context->priority_to_cluster_.emplace_back(
92
0
            std::make_pair(priority_in_current_cluster, tlc));
93
94
0
        priority_context->cluster_and_priority_to_linearized_priority_[std::make_pair(
95
0
            cluster, priority_in_current_cluster)] = next_priority_after_linearizing;
96
0
        next_priority_after_linearizing++;
97
0
      }
98
0
      priority_in_current_cluster++;
99
0
    }
100
0
  }
101
102
0
  return priority_context;
103
0
}
104
105
0
void AggregateClusterLoadBalancer::refresh(OptRef<const std::string> excluded_cluster) {
106
0
  PriorityContextPtr priority_context = linearizePrioritySet(excluded_cluster);
107
0
  if (!priority_context->priority_set_.hostSetsPerPriority().empty()) {
108
0
    load_balancer_ = std::make_unique<LoadBalancerImpl>(
109
0
        *priority_context, parent_info_->lbStats(), runtime_, random_, parent_info_->lbConfig());
110
0
  } else {
111
0
    load_balancer_ = nullptr;
112
0
  }
113
0
  priority_context_ = std::move(priority_context);
114
0
}
115
116
void AggregateClusterLoadBalancer::onClusterAddOrUpdate(
117
0
    absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) {
118
0
  if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
119
0
    ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", cluster_name,
120
0
              parent_info_->name());
121
0
    auto& cluster = get_cluster();
122
0
    refresh();
123
0
    addMemberUpdateCallbackForCluster(cluster);
124
0
  }
125
0
}
126
127
0
void AggregateClusterLoadBalancer::onClusterRemoval(const std::string& cluster_name) {
128
  //  The onClusterRemoval callback is called before the thread local cluster is removed. There
129
  //  will be a dangling pointer to the thread local cluster if the deleted cluster is not skipped
130
  //  when we refresh the load balancer.
131
0
  if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
132
0
    ENVOY_LOG(debug, "removing cluster '{}' from aggregate cluster '{}'", cluster_name,
133
0
              parent_info_->name());
134
0
    refresh(cluster_name);
135
0
  }
136
0
}
137
138
absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToLinearizedPriority(
139
0
    const Upstream::HostDescription& host) const {
140
0
  auto it = priority_context_.cluster_and_priority_to_linearized_priority_.find(
141
0
      std::make_pair(host.cluster().name(), host.priority()));
142
143
0
  if (it != priority_context_.cluster_and_priority_to_linearized_priority_.end()) {
144
0
    return it->second;
145
0
  } else {
146
    // The HostSet can change due to CDS/EDS updates between retries.
147
0
    return absl::nullopt;
148
0
  }
149
0
}
150
151
Upstream::HostConstSharedPtr
152
0
AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) {
153
0
  const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr;
154
0
  if (context != nullptr) {
155
0
    priority_loads = &context->determinePriorityLoad(
156
0
        priority_set_, per_priority_load_,
157
0
        [this](const auto& host) { return hostToLinearizedPriority(host); });
158
0
  } else {
159
0
    priority_loads = &per_priority_load_;
160
0
  }
161
162
0
  const auto priority_pair =
163
0
      choosePriority(random_.random(), priority_loads->healthy_priority_load_,
164
0
                     priority_loads->degraded_priority_load_);
165
166
0
  AggregateLoadBalancerContext aggregate_context(
167
0
      context, priority_pair.second,
168
0
      priority_context_.priority_to_cluster_[priority_pair.first].first);
169
170
0
  Upstream::ThreadLocalCluster* cluster =
171
0
      priority_context_.priority_to_cluster_[priority_pair.first].second;
172
0
  return cluster->loadBalancer().chooseHost(&aggregate_context);
173
0
}
174
175
Upstream::HostConstSharedPtr
176
0
AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
177
0
  if (load_balancer_) {
178
0
    return load_balancer_->chooseHost(context);
179
0
  }
180
0
  return nullptr;
181
0
}
182
183
Upstream::HostConstSharedPtr
184
0
AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) {
185
0
  if (load_balancer_) {
186
0
    return load_balancer_->peekAnotherHost(context);
187
0
  }
188
0
  return nullptr;
189
0
}
190
191
absl::optional<Upstream::SelectedPoolAndConnection>
192
AggregateClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context,
193
                                                       const Upstream::Host& host,
194
0
                                                       std::vector<uint8_t>& hash_key) {
195
0
  if (load_balancer_) {
196
0
    return load_balancer_->selectExistingConnection(context, host, hash_key);
197
0
  }
198
0
  return absl::nullopt;
199
0
}
200
201
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
202
0
AggregateClusterLoadBalancer::lifetimeCallbacks() {
203
0
  if (load_balancer_) {
204
0
    return load_balancer_->lifetimeCallbacks();
205
0
  }
206
0
  return {};
207
0
}
208
209
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
210
ClusterFactory::createClusterWithConfig(
211
    const envoy::config::cluster::v3::Cluster& cluster,
212
    const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
213
0
    Upstream::ClusterFactoryContext& context) {
214
0
  auto new_cluster = std::make_shared<Cluster>(cluster, proto_config, context);
215
0
  auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster);
216
0
  return std::make_pair(new_cluster, std::move(lb));
217
0
}
218
219
REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
220
221
} // namespace Aggregate
222
} // namespace Clusters
223
} // namespace Extensions
224
} // namespace Envoy