LCOV - code coverage report
Current view: top level - source/extensions/clusters/aggregate - cluster.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 1 24 4.2 %
Date: 2024-01-05 06:35:25 Functions: 1 12 8.3 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include "envoy/common/callback.h"
       4             : #include "envoy/config/cluster/v3/cluster.pb.h"
       5             : #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
       6             : #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
       7             : #include "envoy/stream_info/stream_info.h"
       8             : #include "envoy/thread_local/thread_local_object.h"
       9             : #include "envoy/upstream/thread_local_cluster.h"
      10             : 
      11             : #include "source/common/common/logger.h"
      12             : #include "source/common/upstream/cluster_factory_impl.h"
      13             : #include "source/common/upstream/upstream_impl.h"
      14             : #include "source/extensions/clusters/aggregate/lb_context.h"
      15             : 
      16             : namespace Envoy {
      17             : namespace Extensions {
      18             : namespace Clusters {
      19             : namespace Aggregate {
      20             : 
      21             : using PriorityToClusterVector = std::vector<std::pair<uint32_t, Upstream::ThreadLocalCluster*>>;
      22             : 
      23             : // Maps pair(host_cluster_name, host_priority) to the linearized priority of the Aggregate cluster.
      24             : using ClusterAndPriorityToLinearizedPriorityMap =
      25             :     absl::flat_hash_map<std::pair<std::string, uint32_t>, uint32_t>;
      26             : 
      27             : struct PriorityContext {
      28             :   Upstream::PrioritySetImpl priority_set_;
      29             :   PriorityToClusterVector priority_to_cluster_;
      30             :   ClusterAndPriorityToLinearizedPriorityMap cluster_and_priority_to_linearized_priority_;
      31             : };
      32             : 
      33             : using PriorityContextPtr = std::unique_ptr<PriorityContext>;
      34             : 
      35             : // Order matters so a vector must be used for rebuilds. If the vector size becomes larger we can
      36             : // maintain a parallel set for lookups during cluster update callbacks.
      37             : using ClusterSet = std::vector<std::string>;
      38             : using ClusterSetConstSharedPtr = std::shared_ptr<const ClusterSet>;
      39             : 
      40             : class Cluster : public Upstream::ClusterImplBase {
      41             : public:
      42             :   Cluster(const envoy::config::cluster::v3::Cluster& cluster,
      43             :           const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
      44             :           Upstream::ClusterFactoryContext& context);
      45             : 
      46             :   // Upstream::Cluster
      47           0 :   Upstream::Cluster::InitializePhase initializePhase() const override {
      48           0 :     return Upstream::Cluster::InitializePhase::Secondary;
      49           0 :   }
      50             : 
      51             :   Upstream::ClusterManager& cluster_manager_;
      52             :   Runtime::Loader& runtime_;
      53             :   Random::RandomGenerator& random_;
      54             :   const ClusterSetConstSharedPtr clusters_;
      55             : 
      56             : private:
      57             :   // Upstream::ClusterImplBase
      58           0 :   void startPreInit() override { onPreInitComplete(); }
      59             : };
      60             : 
      61             : // Load balancer used by each worker thread. It will be refreshed when clusters, hosts or priorities
      62             : // are updated.
      63             : class AggregateClusterLoadBalancer : public Upstream::LoadBalancer,
      64             :                                      Upstream::ClusterUpdateCallbacks,
      65             :                                      Logger::Loggable<Logger::Id::upstream> {
      66             : public:
      67             :   AggregateClusterLoadBalancer(const Upstream::ClusterInfoConstSharedPtr& parent_info,
      68             :                                Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
      69             :                                Random::RandomGenerator& random,
      70             :                                const ClusterSetConstSharedPtr& clusters);
      71             : 
      72             :   // Upstream::ClusterUpdateCallbacks
      73             :   void onClusterAddOrUpdate(absl::string_view cluster_name,
      74             :                             Upstream::ThreadLocalClusterCommand& get_cluster) override;
      75             :   void onClusterRemoval(const std::string& cluster_name) override;
      76             : 
      77             :   // Upstream::LoadBalancer
      78             :   Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
      79             :   Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override;
      80             :   absl::optional<Upstream::SelectedPoolAndConnection>
      81             :   selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
      82             :                            const Upstream::Host& /*host*/,
      83             :                            std::vector<uint8_t>& /*hash_key*/) override;
      84             :   OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override;
      85             : 
      86             : private:
      87             :   // Use inner class to extend LoadBalancerBase. When initializing AggregateClusterLoadBalancer, the
      88             :   // priority set could be empty, we cannot initialize LoadBalancerBase when priority set is empty.
      89             :   class LoadBalancerImpl : public Upstream::LoadBalancerBase {
      90             :   public:
      91             :     LoadBalancerImpl(const PriorityContext& priority_context, Upstream::ClusterLbStats& lb_stats,
      92             :                      Runtime::Loader& runtime, Random::RandomGenerator& random,
      93             :                      const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config)
      94             :         : Upstream::LoadBalancerBase(priority_context.priority_set_, lb_stats, runtime, random,
      95             :                                      PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(
      96             :                                          common_config, healthy_panic_threshold, 100, 50)),
      97           0 :           priority_context_(priority_context) {}
      98             : 
      99             :     // Upstream::LoadBalancer
     100             :     Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override;
     101             :     // Preconnecting not yet implemented for extensions.
     102           0 :     Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
     103           0 :       return nullptr;
     104           0 :     }
     105             :     absl::optional<Upstream::SelectedPoolAndConnection>
     106             :     selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
     107             :                              const Upstream::Host& /*host*/,
     108           0 :                              std::vector<uint8_t>& /*hash_key*/) override {
     109           0 :       return {};
     110           0 :     }
     111           0 :     OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
     112           0 :       return {};
     113           0 :     }
     114             : 
     115             :     absl::optional<uint32_t> hostToLinearizedPriority(const Upstream::HostDescription& host) const;
     116             : 
     117             :   private:
     118             :     const PriorityContext& priority_context_;
     119             :   };
     120             : 
     121             :   using LoadBalancerImplPtr = std::unique_ptr<LoadBalancerImpl>;
     122             : 
     123             :   void addMemberUpdateCallbackForCluster(Upstream::ThreadLocalCluster& thread_local_cluster);
     124             :   PriorityContextPtr linearizePrioritySet(OptRef<const std::string> excluded_cluster);
     125             :   void refresh(OptRef<const std::string> excluded_cluster = OptRef<const std::string>());
     126             : 
     127             :   LoadBalancerImplPtr load_balancer_;
     128             :   Upstream::ClusterInfoConstSharedPtr parent_info_;
     129             :   Upstream::ClusterManager& cluster_manager_;
     130             :   Runtime::Loader& runtime_;
     131             :   Random::RandomGenerator& random_;
     132             :   PriorityContextPtr priority_context_;
     133             :   const ClusterSetConstSharedPtr clusters_;
     134             :   Upstream::ClusterUpdateCallbacksHandlePtr handle_;
     135             :   absl::flat_hash_map<std::string, Envoy::Common::CallbackHandlePtr> member_update_cbs_;
     136             : };
     137             : 
     138             : // Load balancer factory created by the main thread and will be called in each worker thread to
     139             : // create the thread local load balancer.
     140             : struct AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory {
     141           0 :   AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {}
     142             :   // Upstream::LoadBalancerFactory
     143           0 :   Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override {
     144           0 :     return std::make_unique<AggregateClusterLoadBalancer>(
     145           0 :         cluster_.info(), cluster_.cluster_manager_, cluster_.runtime_, cluster_.random_,
     146           0 :         cluster_.clusters_);
     147           0 :   }
     148             : 
     149             :   const Cluster& cluster_;
     150             : };
     151             : 
     152             : // Thread aware load balancer created by the main thread.
     153             : struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
     154             :   AggregateThreadAwareLoadBalancer(const Cluster& cluster)
     155           0 :       : factory_(std::make_shared<AggregateLoadBalancerFactory>(cluster)) {}
     156             : 
     157             :   // Upstream::ThreadAwareLoadBalancer
     158           0 :   Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
     159           0 :   void initialize() override {}
     160             : 
     161             :   std::shared_ptr<AggregateLoadBalancerFactory> factory_;
     162             : };
     163             : 
     164             : class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
     165             :                            envoy::extensions::clusters::aggregate::v3::ClusterConfig> {
     166             : public:
     167           2 :   ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.aggregate") {}
     168             : 
     169             : private:
     170             :   absl::StatusOr<
     171             :       std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
     172             :   createClusterWithConfig(
     173             :       const envoy::config::cluster::v3::Cluster& cluster,
     174             :       const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
     175             :       Upstream::ClusterFactoryContext& context) override;
     176             : };
     177             : 
     178             : DECLARE_FACTORY(ClusterFactory);
     179             : 
     180             : } // namespace Aggregate
     181             : } // namespace Clusters
     182             : } // namespace Extensions
     183             : } // namespace Envoy

Generated by: LCOV version 1.15