1
#include "source/extensions/clusters/aggregate/cluster.h"
2

            
3
#include "envoy/config/cluster/v3/cluster.pb.h"
4
#include "envoy/event/dispatcher.h"
5
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
7

            
8
#include "source/common/common/assert.h"
9

            
10
namespace Envoy {
11
namespace Extensions {
12
namespace Clusters {
13
namespace Aggregate {
14

            
15
Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster,
16
                 const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
17
                 Upstream::ClusterFactoryContext& context, absl::Status& creation_status)
18
26
    : Upstream::ClusterImplBase(cluster, context, creation_status),
19
26
      cluster_manager_(context.serverFactoryContext().clusterManager()),
20
26
      clusters_(std::make_shared<ClusterSet>(config.clusters().begin(), config.clusters().end())) {}
21

            
22
AggregateClusterLoadBalancer::AggregateClusterLoadBalancer(
23
    const Upstream::ClusterInfoConstSharedPtr& parent_info,
24
    Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
25
    Random::RandomGenerator& random, const ClusterSetConstSharedPtr& clusters)
26
40
    : parent_info_(parent_info), cluster_manager_(cluster_manager), runtime_(runtime),
27
40
      random_(random), clusters_(clusters) {
28
64
  for (const auto& cluster : *clusters_) {
29
64
    auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
30
    // It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the
31
    // cluster could be added dynamically by xDS.
32
64
    if (tlc == nullptr) {
33
46
      continue;
34
46
    }
35

            
36
    // Add callback for clusters initialized before aggregate cluster.
37
18
    addMemberUpdateCallbackForCluster(*tlc);
38
18
  }
39
40
  refresh();
40
40
  handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
41
40
}
42

            
43
void AggregateClusterLoadBalancer::addMemberUpdateCallbackForCluster(
44
76
    Upstream::ThreadLocalCluster& thread_local_cluster) {
45
76
  member_update_cbs_[thread_local_cluster.info()->name()] =
46
76
      thread_local_cluster.prioritySet().addMemberUpdateCb(
47
76
          [this, target_cluster_info = thread_local_cluster.info()](const Upstream::HostVector&,
48
85
                                                                    const Upstream::HostVector&) {
49
25
            ENVOY_LOG(debug, "member update for cluster '{}' in aggregate cluster '{}'",
50
25
                      target_cluster_info->name(), parent_info_->name());
51
25
            refresh();
52
25
          });
53
76
}
54

            
55
PriorityContextPtr
56
135
AggregateClusterLoadBalancer::linearizePrioritySet(OptRef<const std::string> excluded_cluster) {
57
135
  PriorityContextPtr priority_context = std::make_unique<PriorityContext>();
58
135
  uint32_t next_priority_after_linearizing = 0;
59

            
60
  // Linearize the priority set. e.g. for clusters [C_0, C_1, C_2] referred in aggregate cluster
61
  //    C_0 [P_0, P_1, P_2]
62
  //    C_1 [P_0, P_1]
63
  //    C_2 [P_0, P_1, P_2, P_3]
64
  // The linearization result is:
65
  //    [C_0.P_0, C_0.P_1, C_0.P_2, C_1.P_0, C_1.P_1, C_2.P_0, C_2.P_1, C_2.P_2, C_2.P_3]
66
  // and the traffic will be distributed among these priorities.
67
230
  for (const auto& cluster : *clusters_) {
68
230
    if (excluded_cluster.has_value() && excluded_cluster.value().get() == cluster) {
69
12
      continue;
70
12
    }
71
218
    auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
72
    // It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the
73
    // cluster hasn't been added by xDS.
74
218
    if (tlc == nullptr) {
75
71
      ENVOY_LOG(debug, "refresh: cluster '{}' absent in aggregate cluster '{}'", cluster,
76
71
                parent_info_->name());
77
71
      continue;
78
147
    } else {
79
147
      ENVOY_LOG(debug, "refresh: cluster '{}' found in aggregate cluster '{}'", cluster,
80
147
                parent_info_->name());
81
147
    }
82

            
83
147
    uint32_t priority_in_current_cluster = 0;
84
191
    for (const auto& host_set : tlc->prioritySet().hostSetsPerPriority()) {
85
191
      if (!host_set->hosts().empty()) {
86
191
        priority_context->priority_set_.updateHosts(
87
191
            next_priority_after_linearizing, Upstream::HostSetImpl::updateHostsParams(*host_set),
88
191
            host_set->localityWeights(), host_set->hosts(), {}, host_set->weightedPriorityHealth(),
89
191
            host_set->overprovisioningFactor());
90
191
        priority_context->priority_to_cluster_.emplace_back(
91
191
            std::make_pair(priority_in_current_cluster, tlc));
92

            
93
191
        priority_context->cluster_and_priority_to_linearized_priority_[std::make_pair(
94
191
            cluster, priority_in_current_cluster)] = next_priority_after_linearizing;
95
191
        next_priority_after_linearizing++;
96
191
      }
97
191
      priority_in_current_cluster++;
98
191
    }
99
147
  }
100

            
101
135
  return priority_context;
102
135
}
103

            
104
135
void AggregateClusterLoadBalancer::refresh(OptRef<const std::string> excluded_cluster) {
105
135
  PriorityContextPtr priority_context = linearizePrioritySet(excluded_cluster);
106
135
  if (!priority_context->priority_set_.hostSetsPerPriority().empty()) {
107
104
    load_balancer_ = std::make_unique<LoadBalancerImpl>(
108
104
        *priority_context, parent_info_->lbStats(), runtime_, random_, parent_info_->lbConfig());
109
104
  } else {
110
31
    load_balancer_ = nullptr;
111
31
  }
112
135
  priority_context_ = std::move(priority_context);
113
135
}
114

            
115
void AggregateClusterLoadBalancer::onClusterAddOrUpdate(
116
86
    absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) {
117
86
  if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
118
58
    ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'", cluster_name,
119
58
              parent_info_->name());
120
58
    auto& cluster = get_cluster();
121
58
    refresh();
122
58
    addMemberUpdateCallbackForCluster(cluster);
123
58
  }
124
86
}
125

            
126
12
void AggregateClusterLoadBalancer::onClusterRemoval(const std::string& cluster_name) {
127
  //  The onClusterRemoval callback is called before the thread local cluster is removed. There
128
  //  will be a dangling pointer to the thread local cluster if the deleted cluster is not skipped
129
  //  when we refresh the load balancer.
130
12
  if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
131
12
    ENVOY_LOG(debug, "removing cluster '{}' from aggregate cluster '{}'", cluster_name,
132
12
              parent_info_->name());
133
12
    refresh(cluster_name);
134
12
  }
135
12
}
136

            
137
absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToLinearizedPriority(
138
7
    const Upstream::HostDescription& host) const {
139
7
  auto it = priority_context_.cluster_and_priority_to_linearized_priority_.find(
140
7
      std::make_pair(host.cluster().name(), host.priority()));
141

            
142
7
  if (it != priority_context_.cluster_and_priority_to_linearized_priority_.end()) {
143
6
    return it->second;
144
6
  } else {
145
    // The HostSet can change due to CDS/EDS updates between retries.
146
1
    return absl::nullopt;
147
1
  }
148
7
}
149

            
150
Upstream::HostSelectionResponse
151
1281
AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) {
152
1281
  const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr;
153
1281
  if (context != nullptr) {
154
37
    priority_loads = &context->determinePriorityLoad(
155
37
        priority_set_, per_priority_load_,
156
41
        [this](const auto& host) { return hostToLinearizedPriority(host); });
157
1280
  } else {
158
1244
    priority_loads = &per_priority_load_;
159
1244
  }
160

            
161
1281
  const auto priority_pair =
162
1281
      choosePriority(random_.random(), priority_loads->healthy_priority_load_,
163
1281
                     priority_loads->degraded_priority_load_);
164

            
165
1281
  AggregateLoadBalancerContext aggregate_context(
166
1281
      context, priority_pair.second,
167
1281
      priority_context_.priority_to_cluster_[priority_pair.first].first);
168

            
169
1281
  Upstream::ThreadLocalCluster* cluster =
170
1281
      priority_context_.priority_to_cluster_[priority_pair.first].second;
171
1281
  return cluster->loadBalancer().chooseHost(&aggregate_context);
172
1281
}
173

            
174
Upstream::HostSelectionResponse
175
1285
AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
176
1285
  if (load_balancer_) {
177
1281
    return load_balancer_->chooseHost(context);
178
1281
  }
179
4
  return {nullptr};
180
1285
}
181

            
182
Upstream::HostConstSharedPtr
183
66
AggregateClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) {
184
66
  if (load_balancer_) {
185
66
    return load_balancer_->peekAnotherHost(context);
186
66
  }
187
  return nullptr;
188
66
}
189

            
190
absl::optional<Upstream::SelectedPoolAndConnection>
191
AggregateClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context,
192
                                                       const Upstream::Host& host,
193
66
                                                       std::vector<uint8_t>& hash_key) {
194
66
  if (load_balancer_) {
195
66
    return load_balancer_->selectExistingConnection(context, host, hash_key);
196
66
  }
197
  return absl::nullopt;
198
66
}
199

            
200
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
201
66
AggregateClusterLoadBalancer::lifetimeCallbacks() {
202
66
  if (load_balancer_) {
203
66
    return load_balancer_->lifetimeCallbacks();
204
66
  }
205
  return {};
206
66
}
207

            
208
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
209
ClusterFactory::createClusterWithConfig(
210
    const envoy::config::cluster::v3::Cluster& cluster,
211
    const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
212
22
    Upstream::ClusterFactoryContext& context) {
213
22
  absl::Status creation_status = absl::OkStatus();
214
22
  auto new_cluster =
215
22
      std::shared_ptr<Cluster>(new Cluster(cluster, proto_config, context, creation_status));
216
22
  RETURN_IF_NOT_OK(creation_status);
217
22
  auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster);
218
22
  return std::make_pair(new_cluster, std::move(lb));
219
22
}
220

            
221
REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
222

            
223
} // namespace Aggregate
224
} // namespace Clusters
225
} // namespace Extensions
226
} // namespace Envoy