Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/clusters/aggregate/cluster.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include "envoy/common/callback.h"
4
#include "envoy/config/cluster/v3/cluster.pb.h"
5
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
7
#include "envoy/stream_info/stream_info.h"
8
#include "envoy/thread_local/thread_local_object.h"
9
#include "envoy/upstream/thread_local_cluster.h"
10
11
#include "source/common/common/logger.h"
12
#include "source/common/upstream/cluster_factory_impl.h"
13
#include "source/common/upstream/upstream_impl.h"
14
#include "source/extensions/clusters/aggregate/lb_context.h"
15
16
namespace Envoy {
17
namespace Extensions {
18
namespace Clusters {
19
namespace Aggregate {
20
21
using PriorityToClusterVector = std::vector<std::pair<uint32_t, Upstream::ThreadLocalCluster*>>;
22
23
// Maps pair(host_cluster_name, host_priority) to the linearized priority of the Aggregate cluster.
24
using ClusterAndPriorityToLinearizedPriorityMap =
25
    absl::flat_hash_map<std::pair<std::string, uint32_t>, uint32_t>;
26
27
struct PriorityContext {
28
  Upstream::PrioritySetImpl priority_set_;
29
  PriorityToClusterVector priority_to_cluster_;
30
  ClusterAndPriorityToLinearizedPriorityMap cluster_and_priority_to_linearized_priority_;
31
};
32
33
using PriorityContextPtr = std::unique_ptr<PriorityContext>;
34
35
// Order matters so a vector must be used for rebuilds. If the vector size becomes larger we can
36
// maintain a parallel set for lookups during cluster update callbacks.
37
using ClusterSet = std::vector<std::string>;
38
using ClusterSetConstSharedPtr = std::shared_ptr<const ClusterSet>;
39
40
class Cluster : public Upstream::ClusterImplBase {
41
public:
42
  Cluster(const envoy::config::cluster::v3::Cluster& cluster,
43
          const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
44
          Upstream::ClusterFactoryContext& context);
45
46
  // Upstream::Cluster
47
0
  Upstream::Cluster::InitializePhase initializePhase() const override {
48
0
    return Upstream::Cluster::InitializePhase::Secondary;
49
0
  }
50
51
  Upstream::ClusterManager& cluster_manager_;
52
  Runtime::Loader& runtime_;
53
  Random::RandomGenerator& random_;
54
  const ClusterSetConstSharedPtr clusters_;
55
56
private:
57
  // Upstream::ClusterImplBase
58
0
  void startPreInit() override { onPreInitComplete(); }
59
};
60
61
// Load balancer used by each worker thread. It will be refreshed when clusters, hosts or priorities
62
// are updated.
63
class AggregateClusterLoadBalancer : public Upstream::LoadBalancer,
64
                                     Upstream::ClusterUpdateCallbacks,
65
                                     Logger::Loggable<Logger::Id::upstream> {
66
public:
67
  AggregateClusterLoadBalancer(const Upstream::ClusterInfoConstSharedPtr& parent_info,
68
                               Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
69
                               Random::RandomGenerator& random,
70
                               const ClusterSetConstSharedPtr& clusters);
71
72
  // Upstream::ClusterUpdateCallbacks
73
  void onClusterAddOrUpdate(absl::string_view cluster_name,
74
                            Upstream::ThreadLocalClusterCommand& get_cluster) override;
75
  void onClusterRemoval(const std::string& cluster_name) override;
76
77
  // Upstream::LoadBalancer
78
  Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
79
  Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override;
80
  absl::optional<Upstream::SelectedPoolAndConnection>
81
  selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
82
                           const Upstream::Host& /*host*/,
83
                           std::vector<uint8_t>& /*hash_key*/) override;
84
  OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override;
85
86
private:
87
  // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the
88
  // priority set could be empty, we cannot initialize LoadBalancerBase when priority set is empty.
89
  class LoadBalancerImpl : public Upstream::LoadBalancerBase {
90
  public:
91
    LoadBalancerImpl(const PriorityContext& priority_context, Upstream::ClusterLbStats& lb_stats,
92
                     Runtime::Loader& runtime, Random::RandomGenerator& random,
93
                     const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
94
        : Upstream::LoadBalancerBase(priority_context.priority_set_, lb_stats, runtime, random,
95
                                     PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
96
                                         common_config, healthy_panic_threshold, 100, 50)),
97
0
          priority_context_(priority_context) {}
98
99
    // Upstream::LoadBalancer
100
    Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
101
    // Preconnecting not yet implemented for extensions.
102
0
    Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
103
0
      return nullptr;
104
0
    }
105
    absl::optional<Upstream::SelectedPoolAndConnection>
106
    selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
107
                             const Upstream::Host& /*host*/,
108
0
                             std::vector<uint8_t>& /*hash_key*/) override {
109
0
      return {};
110
0
    }
111
0
    OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
112
0
      return {};
113
0
    }
114
115
    absl::optional<uint32_t> hostToLinearizedPriority(const Upstream::HostDescription& host) const;
116
117
  private:
118
    const PriorityContext& priority_context_;
119
  };
120
121
  using LoadBalancerImplPtr = std::unique_ptr<LoadBalancerImpl>;
122
123
  void addMemberUpdateCallbackForCluster(Upstream::ThreadLocalCluster& thread_local_cluster);
124
  PriorityContextPtr linearizePrioritySet(OptRef<const std::string> excluded_cluster);
125
  void refresh(OptRef<const std::string> excluded_cluster = OptRef<const std::string>());
126
127
  LoadBalancerImplPtr load_balancer_;
128
  Upstream::ClusterInfoConstSharedPtr parent_info_;
129
  Upstream::ClusterManager& cluster_manager_;
130
  Runtime::Loader& runtime_;
131
  Random::RandomGenerator& random_;
132
  PriorityContextPtr priority_context_;
133
  const ClusterSetConstSharedPtr clusters_;
134
  Upstream::ClusterUpdateCallbacksHandlePtr handle_;
135
  absl::flat_hash_map<std::string, Envoy::Common::CallbackHandlePtr> member_update_cbs_;
136
};
137
138
// Load balancer factory created by the main thread and will be called in each worker thread to
139
// create the thread local load balancer.
140
struct AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory {
141
0
  AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {}
142
  // Upstream::LoadBalancerFactory
143
0
  Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override {
144
0
    return std::make_unique<AggregateClusterLoadBalancer>(
145
0
        cluster_.info(), cluster_.cluster_manager_, cluster_.runtime_, cluster_.random_,
146
0
        cluster_.clusters_);
147
0
  }
148
149
  const Cluster& cluster_;
150
};
151
152
// Thread aware load balancer created by the main thread.
153
struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
154
  AggregateThreadAwareLoadBalancer(const Cluster& cluster)
155
0
      : factory_(std::make_shared<AggregateLoadBalancerFactory>(cluster)) {}
156
157
  // Upstream::ThreadAwareLoadBalancer
158
0
  Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
159
0
  void initialize() override {}
160
161
  std::shared_ptr<AggregateLoadBalancerFactory> factory_;
162
};
163
164
class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
165
                           envoy::extensions::clusters::aggregate::v3::ClusterConfig> {
166
public:
167
4
  ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.aggregate") {}
168
169
private:
170
  absl::StatusOr<
171
      std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
172
  createClusterWithConfig(
173
      const envoy::config::cluster::v3::Cluster& cluster,
174
      const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
175
      Upstream::ClusterFactoryContext& context) override;
176
};
177
178
DECLARE_FACTORY(ClusterFactory);
179
180
} // namespace Aggregate
181
} // namespace Clusters
182
} // namespace Extensions
183
} // namespace Envoy