1
#pragma once
2

            
3
#include "envoy/common/callback.h"
4
#include "envoy/config/cluster/v3/cluster.pb.h"
5
#include "envoy/extensions/clusters/composite/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/composite/v3/cluster.pb.validate.h"
7
#include "envoy/stream_info/stream_info.h"
8
#include "envoy/thread_local/thread_local_object.h"
9
#include "envoy/upstream/thread_local_cluster.h"
10

            
11
#include "source/common/common/logger.h"
12
#include "source/common/upstream/cluster_factory_impl.h"
13
#include "source/common/upstream/upstream_impl.h"
14
#include "source/extensions/clusters/composite/lb_context.h"
15

            
16
namespace Envoy {
17
namespace Extensions {
18
namespace Clusters {
19
namespace Composite {
20

            
21
// Order matters so a vector must be used for rebuilds.
22
using ClusterSet = std::vector<std::string>;
23
using ClusterSetConstSharedPtr = std::shared_ptr<const ClusterSet>;
24

            
25
class Cluster : public Upstream::ClusterImplBase {
26
public:
27
  // Upstream::Cluster
28
13
  Upstream::Cluster::InitializePhase initializePhase() const override {
29
13
    return Upstream::Cluster::InitializePhase::Secondary;
30
13
  }
31

            
32
  Upstream::ClusterManager& cluster_manager_;
33
  const ClusterSetConstSharedPtr clusters_;
34

            
35
protected:
36
  Cluster(const envoy::config::cluster::v3::Cluster& cluster,
37
          const envoy::extensions::clusters::composite::v3::ClusterConfig& config,
38
          Upstream::ClusterFactoryContext& context, absl::Status& creation_status);
39

            
40
private:
41
  friend class ClusterFactory;
42
  friend class CompositeClusterTest;
43

            
44
  // Upstream::ClusterImplBase
45
6
  void startPreInit() override { onPreInitComplete(); }
46
};
47

            
48
// Load balancer used by each worker thread.
49
class CompositeClusterLoadBalancer : public Upstream::LoadBalancer,
50
                                     Upstream::ClusterUpdateCallbacks,
51
                                     Logger::Loggable<Logger::Id::upstream> {
52
public:
53
  CompositeClusterLoadBalancer(const Upstream::ClusterInfoConstSharedPtr& parent_info,
54
                               Upstream::ClusterManager& cluster_manager,
55
                               const ClusterSetConstSharedPtr& clusters);
56

            
57
  // Upstream::ClusterUpdateCallbacks
58
  void onClusterAddOrUpdate(absl::string_view cluster_name,
59
                            Upstream::ThreadLocalClusterCommand& get_cluster) override;
60
  void onClusterRemoval(const std::string& cluster_name) override;
61

            
62
  // Upstream::LoadBalancer
63
  Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
64
  Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext* context) override;
65
  absl::optional<Upstream::SelectedPoolAndConnection>
66
  selectExistingConnection(Upstream::LoadBalancerContext* context, const Upstream::Host& host,
67
                           std::vector<uint8_t>& hash_key) override;
68
  OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override;
69

            
70
  // Extract retry attempt count from LoadBalancerContext.
71
  uint32_t getAttemptCount(Upstream::LoadBalancerContext* context) const;
72

            
73
  // Map attempt count to cluster index.
74
  // Returns nullopt when attempt count exceeds the number of available clusters.
75
  absl::optional<size_t> mapAttemptToClusterIndex(uint32_t attempt_count) const;
76

            
77
  // Get cluster by index.
78
  Upstream::ThreadLocalCluster* getClusterByIndex(size_t cluster_index) const;
79

            
80
private:
81
  Upstream::ClusterInfoConstSharedPtr parent_info_;
82
  Upstream::ClusterManager& cluster_manager_;
83
  const ClusterSetConstSharedPtr clusters_;
84
  Upstream::ClusterUpdateCallbacksHandlePtr handle_;
85
};
86

            
87
// Load balancer factory created by the main thread and will be called in each worker thread to
88
// create the thread local load balancer.
89
class CompositeLoadBalancerFactory : public Upstream::LoadBalancerFactory {
90
public:
91
9
  CompositeLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {}
92

            
93
  // Upstream::LoadBalancerFactory
94
13
  Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override {
95
13
    return std::make_unique<CompositeClusterLoadBalancer>(
96
13
        cluster_.info(), cluster_.cluster_manager_, cluster_.clusters_);
97
13
  }
98

            
99
  const Cluster& cluster_;
100
};
101

            
102
// Thread aware load balancer created by the main thread.
103
struct CompositeThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
104
  CompositeThreadAwareLoadBalancer(const Cluster& cluster)
105
8
      : factory_(std::make_shared<CompositeLoadBalancerFactory>(cluster)) {}
106

            
107
  // Upstream::ThreadAwareLoadBalancer
108
7
  Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
109
7
  absl::Status initialize() override { return absl::OkStatus(); }
110

            
111
  std::shared_ptr<CompositeLoadBalancerFactory> factory_;
112
};
113

            
114
class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
115
                           envoy::extensions::clusters::composite::v3::ClusterConfig> {
116
public:
117
7
  ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.composite") {}
118

            
119
private:
120
  absl::StatusOr<
121
      std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
122
  createClusterWithConfig(
123
      const envoy::config::cluster::v3::Cluster& cluster,
124
      const envoy::extensions::clusters::composite::v3::ClusterConfig& proto_config,
125
      Upstream::ClusterFactoryContext& context) override;
126
};
127

            
128
DECLARE_FACTORY(ClusterFactory);
129

            
130
} // namespace Composite
131
} // namespace Clusters
132
} // namespace Extensions
133
} // namespace Envoy