LCOV - code coverage report
Current view: top level - source/extensions/clusters/redis - redis_cluster_lb.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 48 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 25 0.0 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <array>
       4             : #include <string>
       5             : #include <vector>
       6             : 
       7             : #include "envoy/upstream/load_balancer.h"
       8             : #include "envoy/upstream/upstream.h"
       9             : 
      10             : #include "source/common/network/address_impl.h"
      11             : #include "source/common/upstream/load_balancer_impl.h"
      12             : #include "source/common/upstream/upstream_impl.h"
      13             : #include "source/extensions/clusters/redis/crc16.h"
      14             : #include "source/extensions/filters/network/common/redis/client.h"
      15             : #include "source/extensions/filters/network/common/redis/codec.h"
      16             : #include "source/extensions/filters/network/common/redis/supported_commands.h"
      17             : 
      18             : #include "absl/container/btree_map.h"
      19             : #include "absl/synchronization/mutex.h"
      20             : 
      21             : namespace Envoy {
      22             : namespace Extensions {
      23             : namespace Clusters {
      24             : namespace Redis {
      25             : 
      26             : static const uint64_t MaxSlot = 16384;
      27             : 
      28             : using ReplicaToResolve = std::pair<std::string, uint16_t>;
      29             : 
      30             : class ClusterSlot {
      31             : public:
      32             :   ClusterSlot(int64_t start, int64_t end, Network::Address::InstanceConstSharedPtr primary)
      33           0 :       : start_(start), end_(end), primary_(std::move(primary)) {}
      34             : 
      35           0 :   int64_t start() const { return start_; }
      36           0 :   int64_t end() const { return end_; }
      37           0 :   Network::Address::InstanceConstSharedPtr primary() const { return primary_; }
      38           0 :   const absl::btree_map<std::string, Network::Address::InstanceConstSharedPtr>& replicas() const {
      39           0 :     return replicas_;
      40           0 :   }
      41             : 
      42           0 :   void setPrimary(Network::Address::InstanceConstSharedPtr address) {
      43           0 :     primary_ = std::move(address);
      44           0 :   }
      45           0 :   void addReplica(Network::Address::InstanceConstSharedPtr replica_address) {
      46           0 :     replicas_.emplace(replica_address->asString(), std::move(replica_address));
      47           0 :   }
      48           0 :   void addReplicaToResolve(const std::string& host, uint16_t port) {
      49           0 :     replicas_to_resolve_.emplace_back(host, port);
      50           0 :   }
      51             : 
      52             :   bool operator==(const ClusterSlot& rhs) const;
      53             : 
      54             :   // In case of primary slot address is hostname and needs to be resolved
      55             :   std::string primary_hostname_;
      56             :   uint16_t primary_port_;
      57             :   std::vector<ReplicaToResolve> replicas_to_resolve_;
      58             : 
      59             : private:
      60             :   int64_t start_;
      61             :   int64_t end_;
      62             :   Network::Address::InstanceConstSharedPtr primary_;
      63             :   absl::btree_map<std::string, Network::Address::InstanceConstSharedPtr> replicas_;
      64             : };
      65             : 
      66             : using ClusterSlotsPtr = std::unique_ptr<std::vector<ClusterSlot>>;
      67             : using ClusterSlotsSharedPtr = std::shared_ptr<std::vector<ClusterSlot>>;
      68             : 
      69             : class RedisLoadBalancerContext {
      70             : public:
      71           0 :   virtual ~RedisLoadBalancerContext() = default;
      72             : 
      73             :   virtual bool isReadCommand() const PURE;
      74             :   virtual NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const PURE;
      75             : };
      76             : 
      77             : class RedisLoadBalancerContextImpl : public RedisLoadBalancerContext,
      78             :                                      public Upstream::LoadBalancerContextBase {
      79             : public:
      80             :   /**
      81             :    * The load balancer context for Redis requests. Note that is_redis_cluster implies using Redis
      82             :    * cluster which require us to always enable hashtagging.
      83             :    * @param key specify the key for the Redis request.
      84             :    * @param enabled_hashtagging specify whether to enable hashtagging, this will always be true if
      85             :    * is_redis_cluster is true.
      86             :    * @param is_redis_cluster specify whether this is a request for redis cluster, if true the key
      87             :    * will be hashed using crc16.
      88             :    * @param request specify the Redis request.
      89             :    * @param read_policy specify the read policy.
      90             :    */
      91             :   RedisLoadBalancerContextImpl(const std::string& key, bool enabled_hashtagging,
      92             :                                bool is_redis_cluster,
      93             :                                const NetworkFilters::Common::Redis::RespValue& request,
      94             :                                NetworkFilters::Common::Redis::Client::ReadPolicy read_policy =
      95             :                                    NetworkFilters::Common::Redis::Client::ReadPolicy::Primary);
      96             : 
      97             :   // Upstream::LoadBalancerContextBase
      98           0 :   absl::optional<uint64_t> computeHashKey() override { return hash_key_; }
      99             : 
     100           0 :   bool isReadCommand() const override { return is_read_; }
     101             : 
     102           0 :   NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override {
     103           0 :     return read_policy_;
     104           0 :   }
     105             : 
     106             : private:
     107             :   absl::string_view hashtag(absl::string_view v, bool enabled);
     108             : 
     109             :   static bool isReadRequest(const NetworkFilters::Common::Redis::RespValue& request);
     110             : 
     111             :   const absl::optional<uint64_t> hash_key_;
     112             :   const bool is_read_;
     113             :   const NetworkFilters::Common::Redis::Client::ReadPolicy read_policy_;
     114             : };
     115             : 
     116             : class ClusterSlotUpdateCallBack {
     117             : public:
     118           0 :   virtual ~ClusterSlotUpdateCallBack() = default;
     119             : 
     120             :   /**
     121             :    * Callback when cluster slot is updated
     122             :    * @param slots provides the updated cluster slots.
     123             :    * @param all_hosts provides the updated hosts.
     124             :    * @return indicate if the cluster slot is updated or not.
     125             :    */
     126             :   virtual bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots,
     127             :                                    Upstream::HostMap& all_hosts) PURE;
     128             : 
     129             :   /**
     130             :    * Callback when a host's health status is updated
     131             :    */
     132             :   virtual void onHostHealthUpdate() PURE;
     133             : };
     134             : 
     135             : using ClusterSlotUpdateCallBackSharedPtr = std::shared_ptr<ClusterSlotUpdateCallBack>;
     136             : 
     137             : /**
     138             :  * This factory is created and returned by RedisCluster's factory() method, the create() method will
     139             :  * be called on each thread to create a thread local RedisClusterLoadBalancer.
     140             :  */
     141             : class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack,
     142             :                                         public Upstream::LoadBalancerFactory {
     143             : public:
     144           0 :   RedisClusterLoadBalancerFactory(Random::RandomGenerator& random) : random_(random) {}
     145             : 
     146             :   // ClusterSlotUpdateCallBack
     147             :   bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, Upstream::HostMap& all_hosts) override;
     148             : 
     149             :   void onHostHealthUpdate() override;
     150             : 
     151             :   // Upstream::LoadBalancerFactory
     152             :   Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override;
     153             : 
     154             : private:
     155             :   class RedisShard {
     156             :   public:
     157             :     RedisShard(Upstream::HostConstSharedPtr primary, Upstream::HostVectorConstSharedPtr replicas,
     158             :                Upstream::HostVectorConstSharedPtr all_hosts)
     159           0 :         : primary_(std::move(primary)) {
     160           0 :       replicas_.updateHosts(Upstream::HostSetImpl::partitionHosts(
     161           0 :                                 std::move(replicas), Upstream::HostsPerLocalityImpl::empty()),
     162           0 :                             nullptr, {}, {});
     163           0 :       all_hosts_.updateHosts(Upstream::HostSetImpl::partitionHosts(
     164           0 :                                  std::move(all_hosts), Upstream::HostsPerLocalityImpl::empty()),
     165           0 :                              nullptr, {}, {});
     166           0 :     }
     167           0 :     const Upstream::HostConstSharedPtr primary() const { return primary_; }
     168           0 :     const Upstream::HostSetImpl& replicas() const { return replicas_; }
     169           0 :     const Upstream::HostSetImpl& allHosts() const { return all_hosts_; }
     170             : 
     171             :   private:
     172             :     const Upstream::HostConstSharedPtr primary_;
     173             :     Upstream::HostSetImpl replicas_{0, absl::nullopt, absl::nullopt};
     174             :     Upstream::HostSetImpl all_hosts_{0, absl::nullopt, absl::nullopt};
     175             :   };
     176             : 
     177             :   using RedisShardSharedPtr = std::shared_ptr<const RedisShard>;
     178             :   using ShardVectorSharedPtr = std::shared_ptr<std::vector<RedisShardSharedPtr>>;
     179             :   using SlotArray = std::array<uint64_t, MaxSlot>;
     180             :   using SlotArraySharedPtr = std::shared_ptr<const SlotArray>;
     181             : 
     182             :   /*
     183             :    * This class implements load balancing according to `Redis Cluster
     184             :    * <https://redis.io/topics/cluster-spec>`_. This load balancer is thread local and created
     185             :    * through the RedisClusterLoadBalancerFactory by the cluster manager.
     186             :    *
     187             :    * The topology is stored in slot_array_ and shard_vector_. According to the
     188             :    * `Redis Cluster Spec <https://redis.io/topics/cluster-spec#keys-distribution-model`_, the key
     189             :    * space is split into a fixed size 16384 slots. The current implementation uses a fixed size
     190             :    * std::array() of the index of the shard in the shard_vector_. This has a fixed cpu and memory
     191             :    * cost and provide a fast lookup constant time lookup similar to Maglev. This will be used by the
     192             :    * redis proxy filter for load balancing purpose.
     193             :    */
     194             :   class RedisClusterLoadBalancer : public Upstream::LoadBalancer {
     195             :   public:
     196             :     RedisClusterLoadBalancer(SlotArraySharedPtr slot_array, ShardVectorSharedPtr shard_vector,
     197             :                              Random::RandomGenerator& random)
     198             :         : slot_array_(std::move(slot_array)), shard_vector_(std::move(shard_vector)),
     199           0 :           random_(random) {}
     200             : 
     201             :     // Upstream::LoadBalancerBase
     202             :     Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext*) override;
     203           0 :     Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
     204           0 :       return nullptr;
     205           0 :     }
     206             :     // Pool selection not implemented.
     207             :     absl::optional<Upstream::SelectedPoolAndConnection>
     208             :     selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
     209             :                              const Upstream::Host& /*host*/,
     210           0 :                              std::vector<uint8_t>& /*hash_key*/) override {
     211           0 :       return absl::nullopt;
     212           0 :     }
     213             :     // Lifetime tracking not implemented.
     214           0 :     OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
     215           0 :       return {};
     216           0 :     }
     217             : 
     218             :   private:
     219             :     const SlotArraySharedPtr slot_array_;
     220             :     const ShardVectorSharedPtr shard_vector_;
     221             :     Random::RandomGenerator& random_;
     222             :   };
     223             : 
     224             :   absl::Mutex mutex_;
     225             :   SlotArraySharedPtr slot_array_ ABSL_GUARDED_BY(mutex_);
     226             :   ClusterSlotsSharedPtr current_cluster_slot_;
     227             :   ShardVectorSharedPtr shard_vector_;
     228             :   Random::RandomGenerator& random_;
     229             : };
     230             : 
     231             : class RedisClusterThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
     232             : public:
     233             :   RedisClusterThreadAwareLoadBalancer(Upstream::LoadBalancerFactorySharedPtr factory)
     234           0 :       : factory_(std::move(factory)) {}
     235             : 
     236             :   // Upstream::ThreadAwareLoadBalancer
     237           0 :   Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
     238           0 :   void initialize() override{};
     239             : 
     240             : private:
     241             :   Upstream::LoadBalancerFactorySharedPtr factory_;
     242             : };
     243             : 
     244             : } // namespace Redis
     245             : } // namespace Clusters
     246             : } // namespace Extensions
     247             : } // namespace Envoy

Generated by: LCOV version 1.15