1
#pragma once
2

            
3
#include "envoy/common/callback.h"
4
#include "envoy/config/cluster/v3/cluster.pb.h"
5
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
7
#include "envoy/stream_info/stream_info.h"
8
#include "envoy/thread_local/thread_local_object.h"
9
#include "envoy/upstream/thread_local_cluster.h"
10

            
11
#include "source/common/common/logger.h"
12
#include "source/common/upstream/cluster_factory_impl.h"
13
#include "source/common/upstream/upstream_impl.h"
14
#include "source/extensions/clusters/aggregate/lb_context.h"
15

            
16
namespace Envoy {
17
namespace Extensions {
18
namespace Clusters {
19
namespace Aggregate {
20

            
21
using PriorityToClusterVector = std::vector<std::pair<uint32_t, Upstream::ThreadLocalCluster*>>;
22

            
23
// Maps pair(host_cluster_name, host_priority) to the linearized priority of the Aggregate cluster.
24
using ClusterAndPriorityToLinearizedPriorityMap =
25
    absl::flat_hash_map<std::pair<std::string, uint32_t>, uint32_t>;
26

            
27
struct PriorityContext {
28
  Upstream::PrioritySetImpl priority_set_;
29
  PriorityToClusterVector priority_to_cluster_;
30
  ClusterAndPriorityToLinearizedPriorityMap cluster_and_priority_to_linearized_priority_;
31
};
32

            
33
using PriorityContextPtr = std::unique_ptr<PriorityContext>;
34

            
35
// Order matters so a vector must be used for rebuilds. If the vector size becomes larger we can
36
// maintain a parallel set for lookups during cluster update callbacks.
37
using ClusterSet = std::vector<std::string>;
38
using ClusterSetConstSharedPtr = std::shared_ptr<const ClusterSet>;
39

            
40
class Cluster : public Upstream::ClusterImplBase {
41
public:
42
  // Upstream::Cluster
43
44
  Upstream::Cluster::InitializePhase initializePhase() const override {
44
44
    return Upstream::Cluster::InitializePhase::Secondary;
45
44
  }
46

            
47
  // Getters that return the values from ClusterImplBase.
48
40
  Runtime::Loader& runtime() const { return runtime_; }
49
40
  Random::RandomGenerator& random() const { return random_; }
50

            
51
  Upstream::ClusterManager& cluster_manager_;
52
  const ClusterSetConstSharedPtr clusters_;
53

            
54
protected:
55
  Cluster(const envoy::config::cluster::v3::Cluster& cluster,
56
          const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
57
          Upstream::ClusterFactoryContext& context, absl::Status& creation_status);
58

            
59
private:
60
  friend class ClusterFactory;
61
  friend class AggregateClusterTest;
62

            
63
  // Upstream::ClusterImplBase
64
22
  void startPreInit() override { onPreInitComplete(); }
65
};
66

            
67
// Load balancer used by each worker thread. It will be refreshed when clusters, hosts or priorities
68
// are updated.
69
class AggregateClusterLoadBalancer : public Upstream::LoadBalancer,
70
                                     Upstream::ClusterUpdateCallbacks,
71
                                     Logger::Loggable<Logger::Id::upstream> {
72
public:
73
  friend class AggregateLoadBalancerFactory;
74
  AggregateClusterLoadBalancer(const Upstream::ClusterInfoConstSharedPtr& parent_info,
75
                               Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
76
                               Random::RandomGenerator& random,
77
                               const ClusterSetConstSharedPtr& clusters);
78

            
79
  // Upstream::ClusterUpdateCallbacks
80
  void onClusterAddOrUpdate(absl::string_view cluster_name,
81
                            Upstream::ThreadLocalClusterCommand& get_cluster) override;
82
  void onClusterRemoval(const std::string& cluster_name) override;
83

            
84
  // Upstream::LoadBalancer
85
  Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
86
  Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override;
87
  absl::optional<Upstream::SelectedPoolAndConnection>
88
  selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
89
                           const Upstream::Host& /*host*/,
90
                           std::vector<uint8_t>& /*hash_key*/) override;
91
  OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override;
92

            
93
private:
94
  // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the
95
  // priority set could be empty, we cannot initialize LoadBalancerBase when priority set is empty.
96
  class LoadBalancerImpl : public Upstream::LoadBalancerBase {
97
  public:
98
    LoadBalancerImpl(const PriorityContext& priority_context, Upstream::ClusterLbStats& lb_stats,
99
                     Runtime::Loader& runtime, Random::RandomGenerator& random,
100
                     const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
101
104
        : Upstream::LoadBalancerBase(priority_context.priority_set_, lb_stats, runtime, random,
102
104
                                     PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
103
104
                                         common_config, healthy_panic_threshold, 100, 50)),
104
104
          priority_context_(priority_context) {}
105

            
106
    // Upstream::LoadBalancer
107
    Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
108
    // Preconnecting not yet implemented for extensions.
109
66
    Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
110
66
      return nullptr;
111
66
    }
112
    absl::optional<Upstream::SelectedPoolAndConnection>
113
    selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
114
                             const Upstream::Host& /*host*/,
115
66
                             std::vector<uint8_t>& /*hash_key*/) override {
116
66
      return {};
117
66
    }
118
66
    OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
119
66
      return {};
120
66
    }
121

            
122
    absl::optional<uint32_t> hostToLinearizedPriority(const Upstream::HostDescription& host) const;
123

            
124
  private:
125
    const PriorityContext& priority_context_;
126
  };
127

            
128
  using LoadBalancerImplPtr = std::unique_ptr<LoadBalancerImpl>;
129

            
130
  void addMemberUpdateCallbackForCluster(Upstream::ThreadLocalCluster& thread_local_cluster);
131
  PriorityContextPtr linearizePrioritySet(OptRef<const std::string> excluded_cluster);
132
  void refresh(OptRef<const std::string> excluded_cluster = OptRef<const std::string>());
133

            
134
  LoadBalancerImplPtr load_balancer_;
135
  Upstream::ClusterInfoConstSharedPtr parent_info_;
136
  Upstream::ClusterManager& cluster_manager_;
137
  Runtime::Loader& runtime_;
138
  Random::RandomGenerator& random_;
139
  PriorityContextPtr priority_context_;
140
  const ClusterSetConstSharedPtr clusters_;
141
  Upstream::ClusterUpdateCallbacksHandlePtr handle_;
142
  absl::flat_hash_map<std::string, Envoy::Common::CallbackHandlePtr> member_update_cbs_;
143
};
144

            
145
// Load balancer factory created by the main thread and will be called in each worker thread to
146
// create the thread local load balancer.
147
class AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory {
148
public:
149
26
  AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {}
150
  // Upstream::LoadBalancerFactory
151
40
  Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override {
152
40
    return std::make_unique<AggregateClusterLoadBalancer>(
153
40
        cluster_.info(), cluster_.cluster_manager_, cluster_.runtime(), cluster_.random(),
154
40
        cluster_.clusters_);
155
40
  }
156

            
157
  const Cluster& cluster_;
158
};
159

            
160
// Thread aware load balancer created by the main thread.
161
struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
162
  AggregateThreadAwareLoadBalancer(const Cluster& cluster)
163
26
      : factory_(std::make_shared<AggregateLoadBalancerFactory>(cluster)) {}
164

            
165
  // Upstream::ThreadAwareLoadBalancer
166
26
  Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
167
22
  absl::Status initialize() override { return absl::OkStatus(); }
168

            
169
  std::shared_ptr<AggregateLoadBalancerFactory> factory_;
170
};
171

            
172
class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
173
                           envoy::extensions::clusters::aggregate::v3::ClusterConfig> {
174
public:
175
6
  ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.aggregate") {}
176

            
177
private:
178
  absl::StatusOr<
179
      std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
180
  createClusterWithConfig(
181
      const envoy::config::cluster::v3::Cluster& cluster,
182
      const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
183
      Upstream::ClusterFactoryContext& context) override;
184
};
185

            
186
DECLARE_FACTORY(ClusterFactory);
187

            
188
} // namespace Aggregate
189
} // namespace Clusters
190
} // namespace Extensions
191
} // namespace Envoy