1
#include "source/extensions/clusters/composite/cluster.h"
2

            
3
#include "envoy/config/cluster/v3/cluster.pb.h"
4
#include "envoy/event/dispatcher.h"
5
#include "envoy/extensions/clusters/composite/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/composite/v3/cluster.pb.validate.h"
7

            
8
#include "source/common/common/assert.h"
9

            
10
namespace Envoy {
11
namespace Extensions {
12
namespace Clusters {
13
namespace Composite {
14

            
15
Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster,
16
                 const envoy::extensions::clusters::composite::v3::ClusterConfig& config,
17
                 Upstream::ClusterFactoryContext& context, absl::Status& creation_status)
18
23
    : Upstream::ClusterImplBase(cluster, context, creation_status),
19
23
      cluster_manager_(context.serverFactoryContext().clusterManager()), clusters_([&config]() {
20
23
        auto clusters = std::make_shared<ClusterSet>();
21
23
        clusters->reserve(config.clusters_size());
22
45
        for (const auto& entry : config.clusters()) {
23
45
          clusters->push_back(entry.name());
24
45
        }
25
23
        return clusters;
26
23
      }()) {}
27

            
28
CompositeClusterLoadBalancer::CompositeClusterLoadBalancer(
29
    const Upstream::ClusterInfoConstSharedPtr& parent_info,
30
    Upstream::ClusterManager& cluster_manager, const ClusterSetConstSharedPtr& clusters)
31
26
    : parent_info_(parent_info), cluster_manager_(cluster_manager), clusters_(clusters) {
32
26
  handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
33
26
}
34

            
35
uint32_t
36
27
CompositeClusterLoadBalancer::getAttemptCount(Upstream::LoadBalancerContext* context) const {
37
27
  if (context == nullptr) {
38
1
    return 0;
39
1
  }
40

            
41
  // Get attempt count from stream info.
42
26
  auto* stream_info = context->requestStreamInfo();
43
26
  if (stream_info != nullptr && stream_info->attemptCount().has_value()) {
44
24
    return stream_info->attemptCount().value();
45
24
  }
46

            
47
2
  return 0;
48
26
}
49

            
50
absl::optional<size_t>
51
28
CompositeClusterLoadBalancer::mapAttemptToClusterIndex(uint32_t attempt_count) const {
52
  // Attempt count is 1-based in Envoy router.
53
  // First attempt (count = 1) uses first cluster (index 0).
54
28
  if (attempt_count == 0) {
55
1
    ENVOY_LOG(warn, "invalid attempt count 0 in composite cluster '{}'", parent_info_->name());
56
1
    return absl::nullopt;
57
1
  }
58

            
59
27
  const size_t cluster_index = attempt_count - 1;
60

            
61
27
  if (cluster_index < clusters_->size()) {
62
22
    return cluster_index;
63
22
  }
64

            
65
  // Attempts exceed available clusters - fail the request.
66
5
  return absl::nullopt;
67
27
}
68

            
69
Upstream::ThreadLocalCluster*
70
24
CompositeClusterLoadBalancer::getClusterByIndex(size_t cluster_index) const {
71
24
  if (cluster_index >= clusters_->size()) {
72
2
    ENVOY_LOG(debug, "cluster index {} exceeds available clusters {} in composite cluster '{}'",
73
2
              cluster_index, clusters_->size(), parent_info_->name());
74
2
    return nullptr;
75
2
  }
76

            
77
22
  const auto& cluster_name = (*clusters_)[cluster_index];
78
22
  auto tlc = cluster_manager_.getThreadLocalCluster(cluster_name);
79
22
  if (tlc == nullptr) {
80
7
    ENVOY_LOG(debug, "cluster '{}' not found for composite cluster '{}'", cluster_name,
81
7
              parent_info_->name());
82
7
  }
83
22
  return tlc;
84
24
}
85

            
86
void CompositeClusterLoadBalancer::onClusterAddOrUpdate(
87
14
    absl::string_view cluster_name, Upstream::ThreadLocalClusterCommand& get_cluster) {
88
14
  UNREFERENCED_PARAMETER(get_cluster);
89
14
  if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
90
1
    ENVOY_LOG(debug, "cluster '{}' added or updated for composite cluster '{}'", cluster_name,
91
1
              parent_info_->name());
92
1
  }
93
14
}
94

            
95
2
void CompositeClusterLoadBalancer::onClusterRemoval(const std::string& cluster_name) {
96
2
  if (std::find(clusters_->begin(), clusters_->end(), cluster_name) != clusters_->end()) {
97
1
    ENVOY_LOG(debug, "cluster '{}' removed from composite cluster '{}'", cluster_name,
98
1
              parent_info_->name());
99
1
  }
100
2
}
101

            
102
Upstream::HostSelectionResponse
103
17
CompositeClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
104
  // Extract attempt count from context.
105
17
  const uint32_t attempt_count = getAttemptCount(context);
106

            
107
  // Map attempt count to cluster index.
108
17
  const auto cluster_index_opt = mapAttemptToClusterIndex(attempt_count);
109
17
  if (!cluster_index_opt.has_value()) {
110
2
    ENVOY_LOG(debug, "no cluster available for attempt {} in composite cluster '{}'", attempt_count,
111
2
              parent_info_->name());
112
2
    return {nullptr};
113
2
  }
114

            
115
15
  const size_t cluster_index = cluster_index_opt.value();
116

            
117
  // Get the target cluster.
118
15
  auto* cluster = getClusterByIndex(cluster_index);
119
15
  if (cluster == nullptr) {
120
2
    ENVOY_LOG(debug, "cluster index {} not available for attempt {} in composite cluster '{}'",
121
2
              cluster_index, attempt_count, parent_info_->name());
122
2
    return {nullptr};
123
2
  }
124

            
125
13
  ENVOY_LOG(debug, "selecting cluster '{}' (index {}) for attempt {} in composite cluster '{}'",
126
13
            cluster->info()->name(), cluster_index, attempt_count, parent_info_->name());
127

            
128
  // Create wrapped context with cluster information.
129
13
  CompositeLoadBalancerContext composite_context(context, cluster_index);
130

            
131
  // Delegate to selected cluster's load balancer.
132
13
  return cluster->loadBalancer().chooseHost(&composite_context);
133
15
}
134

            
135
Upstream::HostConstSharedPtr
136
4
CompositeClusterLoadBalancer::peekAnotherHost(Upstream::LoadBalancerContext* context) {
137
4
  const uint32_t attempt_count = getAttemptCount(context);
138
4
  const auto cluster_index_opt = mapAttemptToClusterIndex(attempt_count);
139
4
  if (!cluster_index_opt.has_value()) {
140
1
    return nullptr;
141
1
  }
142

            
143
3
  const size_t cluster_index = cluster_index_opt.value();
144
3
  auto* cluster = getClusterByIndex(cluster_index);
145
3
  if (cluster == nullptr) {
146
2
    return nullptr;
147
2
  }
148

            
149
1
  CompositeLoadBalancerContext composite_context(context, cluster_index);
150
1
  return cluster->loadBalancer().peekAnotherHost(&composite_context);
151
3
}
152

            
153
absl::optional<Upstream::SelectedPoolAndConnection>
154
CompositeClusterLoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* context,
155
                                                       const Upstream::Host& host,
156
2
                                                       std::vector<uint8_t>& hash_key) {
157
2
  const uint32_t attempt_count = getAttemptCount(context);
158
2
  const auto cluster_index_opt = mapAttemptToClusterIndex(attempt_count);
159
2
  if (!cluster_index_opt.has_value()) {
160
    return absl::nullopt;
161
  }
162

            
163
2
  const size_t cluster_index = cluster_index_opt.value();
164
2
  auto* cluster = getClusterByIndex(cluster_index);
165
2
  if (cluster == nullptr) {
166
1
    return absl::nullopt;
167
1
  }
168

            
169
1
  CompositeLoadBalancerContext composite_context(context, cluster_index);
170
1
  return cluster->loadBalancer().selectExistingConnection(&composite_context, host, hash_key);
171
2
}
172

            
173
OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
174
1
CompositeClusterLoadBalancer::lifetimeCallbacks() {
175
  // Return empty for now. Could be enhanced to aggregate callbacks from sub-clusters.
176
1
  return {};
177
1
}
178

            
179
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
180
ClusterFactory::createClusterWithConfig(
181
    const envoy::config::cluster::v3::Cluster& cluster,
182
    const envoy::extensions::clusters::composite::v3::ClusterConfig& proto_config,
183
7
    Upstream::ClusterFactoryContext& context) {
184
7
  absl::Status creation_status = absl::OkStatus();
185
7
  auto new_cluster =
186
7
      std::shared_ptr<Cluster>(new Cluster(cluster, proto_config, context, creation_status));
187
7
  RETURN_IF_NOT_OK(creation_status);
188
7
  auto lb = std::make_unique<CompositeThreadAwareLoadBalancer>(*new_cluster);
189
7
  return std::make_pair(new_cluster, std::move(lb));
190
7
}
191

            
192
REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
193

            
194
} // namespace Composite
195
} // namespace Clusters
196
} // namespace Extensions
197
} // namespace Envoy