Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/extensions/clusters/aggregate/cluster.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include "envoy/common/callback.h"
4
#include "envoy/config/cluster/v3/cluster.pb.h"
5
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
7
#include "envoy/stream_info/stream_info.h"
8
#include "envoy/thread_local/thread_local_object.h"
9
#include "envoy/upstream/thread_local_cluster.h"
10
11
#include "source/common/common/logger.h"
12
#include "source/common/upstream/cluster_factory_impl.h"
13
#include "source/common/upstream/upstream_impl.h"
14
#include "source/extensions/clusters/aggregate/lb_context.h"
15
16
namespace Envoy {
17
namespace Extensions {
18
namespace Clusters {
19
namespace Aggregate {
20
21
using PriorityToClusterVector = std::vector<std::pair<uint32_t, Upstream::ThreadLocalCluster*>>;
22
23
// Maps pair(host_cluster_name, host_priority) to the linearized priority of the Aggregate cluster.
24
using ClusterAndPriorityToLinearizedPriorityMap =
25
    absl::flat_hash_map<std::pair<std::string, uint32_t>, uint32_t>;
26
27
struct PriorityContext {
28
  Upstream::PrioritySetImpl priority_set_;
29
  PriorityToClusterVector priority_to_cluster_;
30
  ClusterAndPriorityToLinearizedPriorityMap cluster_and_priority_to_linearized_priority_;
31
};
32
33
using PriorityContextPtr = std::unique_ptr<PriorityContext>;
34
35
// Order matters so a vector must be used for rebuilds. If the vector size becomes larger we can
36
// maintain a parallel set for lookups during cluster update callbacks.
37
using ClusterSet = std::vector<std::string>;
38
using ClusterSetConstSharedPtr = std::shared_ptr<const ClusterSet>;
39
40
class Cluster : public Upstream::ClusterImplBase {
41
public:
42
  // Upstream::Cluster
43
0
  Upstream::Cluster::InitializePhase initializePhase() const override {
44
0
    return Upstream::Cluster::InitializePhase::Secondary;
45
0
  }
46
47
  Upstream::ClusterManager& cluster_manager_;
48
  Runtime::Loader& runtime_;
49
  Random::RandomGenerator& random_;
50
  const ClusterSetConstSharedPtr clusters_;
51
52
protected:
53
  Cluster(const envoy::config::cluster::v3::Cluster& cluster,
54
          const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
55
          Upstream::ClusterFactoryContext& context, absl::Status& creation_status);
56
57
private:
58
  friend class ClusterFactory;
59
  friend class AggregateClusterTest;
60
61
  // Upstream::ClusterImplBase
62
0
  void startPreInit() override { onPreInitComplete(); }
63
};
64
65
// Load balancer used by each worker thread. It will be refreshed when clusters, hosts or priorities
66
// are updated.
67
class AggregateClusterLoadBalancer : public Upstream::LoadBalancer,
68
                                     Upstream::ClusterUpdateCallbacks,
69
                                     Logger::Loggable<Logger::Id::upstream> {
70
public:
71
  friend class AggregateLoadBalancerFactory;
72
  AggregateClusterLoadBalancer(const Upstream::ClusterInfoConstSharedPtr& parent_info,
73
                               Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
74
                               Random::RandomGenerator& random,
75
                               const ClusterSetConstSharedPtr& clusters);
76
77
  // Upstream::ClusterUpdateCallbacks
78
  void onClusterAddOrUpdate(absl::string_view cluster_name,
79
                            Upstream::ThreadLocalClusterCommand& get_cluster) override;
80
  void onClusterRemoval(const std::string& cluster_name) override;
81
82
  // Upstream::LoadBalancer
83
  Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
84
  Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override;
85
  absl::optional<Upstream::SelectedPoolAndConnection>
86
  selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
87
                           const Upstream::Host& /*host*/,
88
                           std::vector<uint8_t>& /*hash_key*/) override;
89
  OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override;
90
91
private:
92
  // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the
93
  // priority set could be empty, we cannot initialize LoadBalancerBase when priority set is empty.
94
  class LoadBalancerImpl : public Upstream::LoadBalancerBase {
95
  public:
96
    LoadBalancerImpl(const PriorityContext& priority_context, Upstream::ClusterLbStats& lb_stats,
97
                     Runtime::Loader& runtime, Random::RandomGenerator& random,
98
                     const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
99
        : Upstream::LoadBalancerBase(priority_context.priority_set_, lb_stats, runtime, random,
100
                                     PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
101
                                         common_config, healthy_panic_threshold, 100, 50)),
102
0
          priority_context_(priority_context) {}
103
104
    // Upstream::LoadBalancer
105
    Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
106
    // Preconnecting not yet implemented for extensions.
107
0
    Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
108
0
      return nullptr;
109
0
    }
110
    absl::optional<Upstream::SelectedPoolAndConnection>
111
    selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
112
                             const Upstream::Host& /*host*/,
113
0
                             std::vector<uint8_t>& /*hash_key*/) override {
114
0
      return {};
115
0
    }
116
0
    OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
117
0
      return {};
118
0
    }
119
120
    absl::optional<uint32_t> hostToLinearizedPriority(const Upstream::HostDescription& host) const;
121
122
  private:
123
    const PriorityContext& priority_context_;
124
  };
125
126
  using LoadBalancerImplPtr = std::unique_ptr<LoadBalancerImpl>;
127
128
  void addMemberUpdateCallbackForCluster(Upstream::ThreadLocalCluster& thread_local_cluster);
129
  PriorityContextPtr linearizePrioritySet(OptRef<const std::string> excluded_cluster);
130
  void refresh(OptRef<const std::string> excluded_cluster = OptRef<const std::string>());
131
132
  LoadBalancerImplPtr load_balancer_;
133
  Upstream::ClusterInfoConstSharedPtr parent_info_;
134
  Upstream::ClusterManager& cluster_manager_;
135
  Runtime::Loader& runtime_;
136
  Random::RandomGenerator& random_;
137
  PriorityContextPtr priority_context_;
138
  const ClusterSetConstSharedPtr clusters_;
139
  Upstream::ClusterUpdateCallbacksHandlePtr handle_;
140
  absl::flat_hash_map<std::string, Envoy::Common::CallbackHandlePtr> member_update_cbs_;
141
};
142
143
// Load balancer factory created by the main thread and will be called in each worker thread to
144
// create the thread local load balancer.
145
class AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory {
146
public:
147
0
  AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {}
148
  // Upstream::LoadBalancerFactory
149
0
  Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override {
150
0
    return std::make_unique<AggregateClusterLoadBalancer>(
151
0
        cluster_.info(), cluster_.cluster_manager_, cluster_.runtime_, cluster_.random_,
152
0
        cluster_.clusters_);
153
0
  }
154
155
  const Cluster& cluster_;
156
};
157
158
// Thread aware load balancer created by the main thread.
159
struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
160
  AggregateThreadAwareLoadBalancer(const Cluster& cluster)
161
0
      : factory_(std::make_shared<AggregateLoadBalancerFactory>(cluster)) {}
162
163
  // Upstream::ThreadAwareLoadBalancer
164
0
  Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
165
0
  absl::Status initialize() override { return absl::OkStatus(); }
166
167
  std::shared_ptr<AggregateLoadBalancerFactory> factory_;
168
};
169
170
class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
171
                           envoy::extensions::clusters::aggregate::v3::ClusterConfig> {
172
public:
173
4
  ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.aggregate") {}
174
175
private:
176
  absl::StatusOr<
177
      std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
178
  createClusterWithConfig(
179
      const envoy::config::cluster::v3::Cluster& cluster,
180
      const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
181
      Upstream::ClusterFactoryContext& context) override;
182
};
183
184
DECLARE_FACTORY(ClusterFactory);
185
186
} // namespace Aggregate
187
} // namespace Clusters
188
} // namespace Extensions
189
} // namespace Envoy