Line data Source code
1 : #include "source/extensions/load_balancing_policies/subset/config.h"
2 :
3 : #include "source/common/upstream/upstream_impl.h"
4 : #include "source/extensions/load_balancing_policies/common/factory_base.h"
5 : #include "source/extensions/load_balancing_policies/subset/subset_lb.h"
6 :
7 : namespace Envoy {
8 : namespace Extensions {
9 : namespace LoadBalancingPolices {
10 : namespace Subset {
11 :
12 : using SubsetLbProto = envoy::extensions::load_balancing_policies::subset::v3::Subset;
13 : using ClusterProto = envoy::config::cluster::v3::Cluster;
14 :
15 : Upstream::LoadBalancerPtr Factory::create(const Upstream::ClusterInfo& cluster,
16 : const Upstream::PrioritySet& priority_set,
17 : const Upstream::PrioritySet* local_priority_set,
18 : Runtime::Loader& runtime, Random::RandomGenerator& random,
19 0 : TimeSource& time_source) {
20 0 : auto child_lb_creator = std::make_unique<Upstream::LegacyChildLoadBalancerCreatorImpl>(
21 0 : cluster.lbType(), cluster.lbRingHashConfig(), cluster.lbMaglevConfig(),
22 0 : cluster.lbRoundRobinConfig(), cluster.lbLeastRequestConfig(), cluster.lbConfig());
23 :
24 0 : return std::make_unique<Upstream::SubsetLoadBalancer>(
25 0 : cluster.lbSubsetInfo(), std::move(child_lb_creator), priority_set, local_priority_set,
26 0 : cluster.lbStats(), cluster.statsScope(), runtime, random, time_source);
27 0 : }
28 :
29 : /**
30 : * Static registration for the Factory. @see RegisterFactory.
31 : */
32 : REGISTER_FACTORY(Factory, Upstream::NonThreadAwareLoadBalancerFactory);
33 :
34 : class ChildLoadBalancerCreatorImpl : public Upstream::ChildLoadBalancerCreator {
35 : public:
36 : ChildLoadBalancerCreatorImpl(const Upstream::SubsetLoadBalancerConfig& subset_config,
37 : const Upstream::ClusterInfo& cluster_info)
38 0 : : subset_config_(subset_config), cluster_info_(cluster_info) {}
39 :
40 : std::pair<Upstream::ThreadAwareLoadBalancerPtr, Upstream::LoadBalancerPtr>
41 : createLoadBalancer(const Upstream::PrioritySet& child_priority_set, const Upstream::PrioritySet*,
42 : Upstream::ClusterLbStats&, Stats::Scope&, Runtime::Loader& runtime,
43 0 : Random::RandomGenerator& random, TimeSource& time_source) override {
44 0 : return {subset_config_.createLoadBalancer(cluster_info_, child_priority_set, runtime, random,
45 0 : time_source),
46 0 : nullptr};
47 0 : }
48 :
49 : private:
50 : const Upstream::SubsetLoadBalancerConfig& subset_config_;
51 : const Upstream::ClusterInfo& cluster_info_;
52 : };
53 :
54 : class LbFactory : public Upstream::LoadBalancerFactory {
55 : public:
56 : LbFactory(const Upstream::SubsetLoadBalancerConfig& subset_config,
57 : const Upstream::ClusterInfo& cluster_info, Runtime::Loader& runtime,
58 : Random::RandomGenerator& random, TimeSource& time_source)
59 : : subset_config_(subset_config), cluster_info_(cluster_info), runtime_(runtime),
60 0 : random_(random), time_source_(time_source) {}
61 :
62 0 : Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams params) override {
63 0 : auto child_lb_creator =
64 0 : std::make_unique<ChildLoadBalancerCreatorImpl>(subset_config_, cluster_info_);
65 :
66 0 : return std::make_unique<Upstream::SubsetLoadBalancer>(
67 0 : subset_config_.subsetInfo(), std::move(child_lb_creator), params.priority_set,
68 0 : params.local_priority_set, cluster_info_.lbStats(), cluster_info_.statsScope(), runtime_,
69 0 : random_, time_source_);
70 0 : }
71 0 : bool recreateOnHostChange() const override { return false; }
72 :
73 : private:
74 : const Upstream::SubsetLoadBalancerConfig& subset_config_;
75 : const Upstream::ClusterInfo& cluster_info_;
76 :
77 : Runtime::Loader& runtime_;
78 : Random::RandomGenerator& random_;
79 : TimeSource& time_source_;
80 : };
81 :
82 : class ThreadAwareLb : public Upstream::ThreadAwareLoadBalancer {
83 : public:
84 0 : ThreadAwareLb(Upstream::LoadBalancerFactorySharedPtr factory) : factory_(std::move(factory)) {}
85 :
86 0 : Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
87 0 : void initialize() override {}
88 :
89 : private:
90 : Upstream::LoadBalancerFactorySharedPtr factory_;
91 : };
92 :
93 : Upstream::ThreadAwareLoadBalancerPtr
94 : SubsetLbFactory::create(OptRef<const Upstream::LoadBalancerConfig> lb_config,
95 : const Upstream::ClusterInfo& cluster_info, const Upstream::PrioritySet&,
96 : Runtime::Loader& runtime, Random::RandomGenerator& random,
97 0 : TimeSource& time_source) {
98 :
99 0 : const auto* typed_config =
100 0 : dynamic_cast<const Upstream::SubsetLoadBalancerConfig*>(lb_config.ptr());
101 : // The load balancing policy configuration will be loaded and validated in the main thread when we
102 : // load the cluster configuration. So we can assume the configuration is valid here.
103 0 : ASSERT(typed_config != nullptr,
104 0 : "Invalid load balancing policy configuration for subset load balancer");
105 :
106 : // Create the load balancer factory that will be used to create the load balancer in the workers.
107 0 : auto lb_factory =
108 0 : std::make_shared<LbFactory>(*typed_config, cluster_info, runtime, random, time_source);
109 :
110 : // Move and store the load balancer factory in the thread aware load balancer. This thread aware
111 : // load balancer is simply a wrapper of the load balancer factory for subset lb and no actual
112 : // logic is implemented.
113 0 : return std::make_unique<ThreadAwareLb>(std::move(lb_factory));
114 0 : }
115 :
116 : Upstream::LoadBalancerConfigPtr
117 : SubsetLbFactory::loadConfig(const Protobuf::Message& config,
118 0 : ProtobufMessage::ValidationVisitor& visitor) {
119 :
120 0 : auto active_or_legacy = Common::ActiveOrLegacy<SubsetLbProto, ClusterProto>::get(&config);
121 0 : ASSERT(active_or_legacy.hasLegacy() || active_or_legacy.hasActive());
122 :
123 0 : if (active_or_legacy.hasLegacy()) {
124 0 : if (active_or_legacy.legacy()->lb_policy() ==
125 0 : envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
126 0 : throw EnvoyException(
127 0 : fmt::format("cluster: LB policy {} cannot be combined with lb_subset_config",
128 0 : envoy::config::cluster::v3::Cluster::LbPolicy_Name(
129 0 : active_or_legacy.legacy()->lb_policy())));
130 0 : }
131 :
132 0 : auto sub_lb_pair =
133 0 : Upstream::LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProtoWithoutSubset(
134 0 : *active_or_legacy.legacy(), visitor);
135 :
136 0 : if (!sub_lb_pair.ok()) {
137 0 : throw EnvoyException(std::string(sub_lb_pair.status().message()));
138 0 : }
139 :
140 0 : return std::make_unique<Upstream::SubsetLoadBalancerConfig>(
141 0 : active_or_legacy.legacy()->lb_subset_config(), std::move(sub_lb_pair->config),
142 0 : sub_lb_pair->factory);
143 0 : }
144 :
145 : // Load the subset load balancer configuration. This will contains child load balancer
146 : // config and child load balancer factory.
147 0 : return std::make_unique<Upstream::SubsetLoadBalancerConfig>(*active_or_legacy.active(), visitor);
148 0 : }
149 :
150 : /**
151 : * Static registration for the Factory. @see RegisterFactory.
152 : */
153 : REGISTER_FACTORY(SubsetLbFactory, Upstream::TypedLoadBalancerFactory);
154 :
155 : } // namespace Subset
156 : } // namespace LoadBalancingPolices
157 : } // namespace Extensions
158 : } // namespace Envoy
|