1
#include "source/extensions/load_balancing_policies/subset/config.h"
2

            
3
#include "source/common/upstream/upstream_impl.h"
4
#include "source/extensions/load_balancing_policies/common/factory_base.h"
5
#include "source/extensions/load_balancing_policies/subset/subset_lb.h"
6

            
7
namespace Envoy {
8
namespace Extensions {
9
namespace LoadBalancingPolicies {
10
namespace Subset {
11

            
12
using SubsetLbProto = envoy::extensions::load_balancing_policies::subset::v3::Subset;
13
using ClusterProto = envoy::config::cluster::v3::Cluster;
14

            
15
class LbFactory : public Upstream::LoadBalancerFactory {
16
public:
17
  LbFactory(const Upstream::SubsetLoadBalancerConfig& subset_config,
18
            const Upstream::ClusterInfo& cluster_info, Runtime::Loader& runtime,
19
            Random::RandomGenerator& random, TimeSource& time_source)
20
51
      : subset_config_(subset_config), cluster_info_(cluster_info), runtime_(runtime),
21
51
        random_(random), time_source_(time_source) {}
22

            
23
96
  Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams params) override {
24
96
    return std::make_unique<Upstream::SubsetLoadBalancer>(
25
96
        subset_config_, cluster_info_, params.priority_set, params.local_priority_set,
26
96
        cluster_info_.lbStats(), cluster_info_.statsScope(), runtime_, random_, time_source_);
27
96
  }
28
125
  bool recreateOnHostChange() const override { return false; }
29

            
30
private:
31
  const Upstream::SubsetLoadBalancerConfig& subset_config_;
32
  const Upstream::ClusterInfo& cluster_info_;
33

            
34
  Runtime::Loader& runtime_;
35
  Random::RandomGenerator& random_;
36
  TimeSource& time_source_;
37
};
38

            
39
class ThreadAwareLb : public Upstream::ThreadAwareLoadBalancer {
40
public:
41
51
  ThreadAwareLb(Upstream::LoadBalancerFactorySharedPtr factory) : factory_(std::move(factory)) {}
42

            
43
51
  Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
44
51
  absl::Status initialize() override { return absl::OkStatus(); }
45

            
46
private:
47
  Upstream::LoadBalancerFactorySharedPtr factory_;
48
};
49

            
50
Upstream::ThreadAwareLoadBalancerPtr
51
SubsetLbFactory::create(OptRef<const Upstream::LoadBalancerConfig> lb_config,
52
                        const Upstream::ClusterInfo& cluster_info, const Upstream::PrioritySet&,
53
                        Runtime::Loader& runtime, Random::RandomGenerator& random,
54
51
                        TimeSource& time_source) {
55

            
56
51
  const auto* typed_config =
57
51
      dynamic_cast<const Upstream::SubsetLoadBalancerConfig*>(lb_config.ptr());
58
  // The load balancing policy configuration will be loaded and validated in the main thread when we
59
  // load the cluster configuration. So we can assume the configuration is valid here.
60
51
  ASSERT(typed_config != nullptr,
61
51
         "Invalid load balancing policy configuration for subset load balancer");
62

            
63
  // Create the load balancer factory that will be used to create the load balancer in the workers.
64
51
  auto lb_factory =
65
51
      std::make_shared<LbFactory>(*typed_config, cluster_info, runtime, random, time_source);
66

            
67
  // Move and store the load balancer factory in the thread aware load balancer. This thread aware
68
  // load balancer is simply a wrapper of the load balancer factory for subset lb and no actual
69
  // logic is implemented.
70
51
  return std::make_unique<ThreadAwareLb>(std::move(lb_factory));
71
51
}
72

            
73
absl::StatusOr<Upstream::LoadBalancerConfigPtr>
74
SubsetLbFactory::loadConfig(Server::Configuration::ServerFactoryContext& factory_context,
75
3
                            const Protobuf::Message& config) {
76
3
  ASSERT(dynamic_cast<const SubsetLbProto*>(&config) != nullptr);
77
3
  const SubsetLbProto& typed_config = dynamic_cast<const SubsetLbProto&>(config);
78

            
79
  // Load the subset load balancer configuration. This will contains child load balancer
80
  // config and child load balancer factory.
81
3
  absl::Status status = absl::OkStatus();
82
3
  auto lb_config =
83
3
      std::make_unique<Upstream::SubsetLoadBalancerConfig>(factory_context, typed_config, status);
84
3
  RETURN_IF_NOT_OK_REF(status);
85
2
  return lb_config;
86
3
}
87

            
88
absl::StatusOr<Upstream::LoadBalancerConfigPtr>
89
SubsetLbFactory::loadLegacy(Server::Configuration::ServerFactoryContext& factory_context,
90
53
                            const Upstream::ClusterProto& cluster) {
91
53
  if (cluster.lb_policy() == envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
92
2
    return absl::InvalidArgumentError(
93
2
        fmt::format("cluster: LB policy {} cannot be combined with lb_subset_config",
94
2
                    envoy::config::cluster::v3::Cluster::LbPolicy_Name(cluster.lb_policy())));
95
2
  }
96
51
  absl::Status status = absl::OkStatus();
97
51
  auto lb_config =
98
51
      std::make_unique<Upstream::SubsetLoadBalancerConfig>(factory_context, cluster, status);
99
51
  RETURN_IF_NOT_OK_REF(status);
100
51
  return lb_config;
101
51
}
102

            
103
/**
104
 * Static registration for the Factory. @see RegisterFactory.
105
 */
106
REGISTER_FACTORY(SubsetLbFactory, Upstream::TypedLoadBalancerFactory);
107

            
108
} // namespace Subset
109
} // namespace LoadBalancingPolicies
110
} // namespace Extensions
111
} // namespace Envoy