LCOV - code coverage report
Current view: top level - source/extensions/clusters/redis - redis_cluster_lb.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 158 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 11 0.0 %

          Line data    Source code
       1             : #include "redis_cluster_lb.h"
       2             : 
       3             : namespace Envoy {
       4             : namespace Extensions {
       5             : namespace Clusters {
       6             : namespace Redis {
       7             : 
       8           0 : bool ClusterSlot::operator==(const Envoy::Extensions::Clusters::Redis::ClusterSlot& rhs) const {
       9           0 :   if (start_ != rhs.start_ || end_ != rhs.end_ || *primary_ != *rhs.primary_ ||
      10           0 :       replicas_.size() != rhs.replicas_.size()) {
      11           0 :     return false;
      12           0 :   }
      13             :   // The value type is shared_ptr, and the shared_ptr is not same one even for same ip:port.
      14             :   // so, just compare the key here.
      15           0 :   return std::equal(replicas_.begin(), replicas_.end(), rhs.replicas_.begin(), rhs.replicas_.end(),
      16           0 :                     [](const auto& it1, const auto& it2) { return it1.first == it2.first; });
      17           0 : }
      18             : 
      19             : // RedisClusterLoadBalancerFactory
      20             : bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots,
      21           0 :                                                           Envoy::Upstream::HostMap& all_hosts) {
      22             :   // The slots is sorted, allowing for a quick comparison to make sure we need to update the slot
      23             :   // array sort based on start and end to enable efficient comparison
      24           0 :   std::sort(
      25           0 :       slots->begin(), slots->end(), [](const ClusterSlot& lhs, const ClusterSlot& rhs) -> bool {
      26           0 :         return lhs.start() < rhs.start() || (!(lhs.start() < rhs.start()) && lhs.end() < rhs.end());
      27           0 :       });
      28             : 
      29           0 :   if (current_cluster_slot_ && *current_cluster_slot_ == *slots) {
      30           0 :     return false;
      31           0 :   }
      32             : 
      33           0 :   auto updated_slots = std::make_shared<SlotArray>();
      34           0 :   auto shard_vector = std::make_shared<std::vector<RedisShardSharedPtr>>();
      35           0 :   absl::flat_hash_map<std::string, uint64_t> shards;
      36             : 
      37           0 :   for (const ClusterSlot& slot : *slots) {
      38             :     // look in the updated map
      39           0 :     const std::string primary_address = slot.primary()->asString();
      40             : 
      41           0 :     auto result = shards.try_emplace(primary_address, shard_vector->size());
      42           0 :     if (result.second) {
      43           0 :       auto primary_host = all_hosts.find(primary_address);
      44           0 :       ASSERT(primary_host != all_hosts.end(),
      45           0 :              "we expect all address to be found in the updated_hosts");
      46             : 
      47           0 :       Upstream::HostVectorSharedPtr primary_and_replicas = std::make_shared<Upstream::HostVector>();
      48           0 :       Upstream::HostVectorSharedPtr replicas = std::make_shared<Upstream::HostVector>();
      49           0 :       primary_and_replicas->push_back(primary_host->second);
      50             : 
      51           0 :       for (auto const& replica : slot.replicas()) {
      52           0 :         auto replica_host = all_hosts.find(replica.first);
      53           0 :         ASSERT(replica_host != all_hosts.end(),
      54           0 :                "we expect all address to be found in the updated_hosts");
      55           0 :         replicas->push_back(replica_host->second);
      56           0 :         primary_and_replicas->push_back(replica_host->second);
      57           0 :       }
      58             : 
      59           0 :       shard_vector->emplace_back(
      60           0 :           std::make_shared<RedisShard>(primary_host->second, replicas, primary_and_replicas));
      61           0 :     }
      62             : 
      63           0 :     for (auto i = slot.start(); i <= slot.end(); ++i) {
      64           0 :       updated_slots->at(i) = result.first->second;
      65           0 :     }
      66           0 :   }
      67             : 
      68           0 :   {
      69           0 :     absl::WriterMutexLock lock(&mutex_);
      70           0 :     current_cluster_slot_ = std::move(slots);
      71           0 :     slot_array_ = std::move(updated_slots);
      72           0 :     shard_vector_ = std::move(shard_vector);
      73           0 :   }
      74           0 :   return true;
      75           0 : }
      76             : 
      77           0 : void RedisClusterLoadBalancerFactory::onHostHealthUpdate() {
      78           0 :   ShardVectorSharedPtr current_shard_vector;
      79           0 :   {
      80           0 :     absl::ReaderMutexLock lock(&mutex_);
      81           0 :     current_shard_vector = shard_vector_;
      82           0 :   }
      83             : 
      84             :   // This can get called by cluster initialization before the Redis Cluster topology is resolved.
      85           0 :   if (!current_shard_vector) {
      86           0 :     return;
      87           0 :   }
      88             : 
      89           0 :   auto shard_vector = std::make_shared<std::vector<RedisShardSharedPtr>>();
      90             : 
      91           0 :   for (auto const& shard : *current_shard_vector) {
      92           0 :     shard_vector->emplace_back(std::make_shared<RedisShard>(
      93           0 :         shard->primary(), shard->replicas().hostsPtr(), shard->allHosts().hostsPtr()));
      94           0 :   }
      95             : 
      96           0 :   {
      97           0 :     absl::WriterMutexLock lock(&mutex_);
      98           0 :     shard_vector_ = std::move(shard_vector);
      99           0 :   }
     100           0 : }
     101             : 
     102           0 : Upstream::LoadBalancerPtr RedisClusterLoadBalancerFactory::create(Upstream::LoadBalancerParams) {
     103           0 :   absl::ReaderMutexLock lock(&mutex_);
     104           0 :   return std::make_unique<RedisClusterLoadBalancer>(slot_array_, shard_vector_, random_);
     105           0 : }
     106             : 
     107             : namespace {
     108             : Upstream::HostConstSharedPtr chooseRandomHost(const Upstream::HostSetImpl& host_set,
     109           0 :                                               Random::RandomGenerator& random) {
     110           0 :   auto hosts = host_set.healthyHosts();
     111           0 :   if (hosts.empty()) {
     112           0 :     hosts = host_set.degradedHosts();
     113           0 :   }
     114             : 
     115           0 :   if (hosts.empty()) {
     116           0 :     hosts = host_set.hosts();
     117           0 :   }
     118             : 
     119           0 :   if (!hosts.empty()) {
     120           0 :     return hosts[random.random() % hosts.size()];
     121           0 :   } else {
     122           0 :     return nullptr;
     123           0 :   }
     124           0 : }
     125             : } // namespace
     126             : 
     127             : Upstream::HostConstSharedPtr RedisClusterLoadBalancerFactory::RedisClusterLoadBalancer::chooseHost(
     128           0 :     Envoy::Upstream::LoadBalancerContext* context) {
     129           0 :   if (!slot_array_) {
     130           0 :     return nullptr;
     131           0 :   }
     132           0 :   absl::optional<uint64_t> hash;
     133           0 :   if (context) {
     134           0 :     hash = context->computeHashKey();
     135           0 :   }
     136             : 
     137           0 :   if (!hash) {
     138           0 :     return nullptr;
     139           0 :   }
     140             : 
     141           0 :   auto shard = shard_vector_->at(
     142           0 :       slot_array_->at(hash.value() % Envoy::Extensions::Clusters::Redis::MaxSlot));
     143             : 
     144           0 :   auto redis_context = dynamic_cast<RedisLoadBalancerContext*>(context);
     145           0 :   if (redis_context && redis_context->isReadCommand()) {
     146           0 :     switch (redis_context->readPolicy()) {
     147           0 :     case NetworkFilters::Common::Redis::Client::ReadPolicy::Primary:
     148           0 :       return shard->primary();
     149           0 :     case NetworkFilters::Common::Redis::Client::ReadPolicy::PreferPrimary:
     150           0 :       if (shard->primary()->coarseHealth() == Upstream::Host::Health::Healthy) {
     151           0 :         return shard->primary();
     152           0 :       } else {
     153           0 :         return chooseRandomHost(shard->allHosts(), random_);
     154           0 :       }
     155           0 :     case NetworkFilters::Common::Redis::Client::ReadPolicy::Replica:
     156           0 :       return chooseRandomHost(shard->replicas(), random_);
     157           0 :     case NetworkFilters::Common::Redis::Client::ReadPolicy::PreferReplica:
     158           0 :       if (!shard->replicas().healthyHosts().empty()) {
     159           0 :         return chooseRandomHost(shard->replicas(), random_);
     160           0 :       } else {
     161           0 :         return chooseRandomHost(shard->allHosts(), random_);
     162           0 :       }
     163           0 :     case NetworkFilters::Common::Redis::Client::ReadPolicy::Any:
     164           0 :       return chooseRandomHost(shard->allHosts(), random_);
     165           0 :     }
     166           0 :   }
     167           0 :   return shard->primary();
     168           0 : }
     169             : 
     170             : bool RedisLoadBalancerContextImpl::isReadRequest(
     171           0 :     const NetworkFilters::Common::Redis::RespValue& request) {
     172           0 :   const NetworkFilters::Common::Redis::RespValue* command = nullptr;
     173           0 :   if (request.type() == NetworkFilters::Common::Redis::RespType::Array) {
     174           0 :     command = &(request.asArray()[0]);
     175           0 :   } else if (request.type() == NetworkFilters::Common::Redis::RespType::CompositeArray) {
     176           0 :     command = request.asCompositeArray().command();
     177           0 :   }
     178           0 :   if (!command) {
     179           0 :     return false;
     180           0 :   }
     181           0 :   if (command->type() != NetworkFilters::Common::Redis::RespType::SimpleString &&
     182           0 :       command->type() != NetworkFilters::Common::Redis::RespType::BulkString) {
     183           0 :     return false;
     184           0 :   }
     185           0 :   std::string to_lower_string = absl::AsciiStrToLower(command->asString());
     186           0 :   return NetworkFilters::Common::Redis::SupportedCommands::isReadCommand(to_lower_string);
     187           0 : }
     188             : 
     189             : RedisLoadBalancerContextImpl::RedisLoadBalancerContextImpl(
     190             :     const std::string& key, bool enabled_hashtagging, bool is_redis_cluster,
     191             :     const NetworkFilters::Common::Redis::RespValue& request,
     192             :     NetworkFilters::Common::Redis::Client::ReadPolicy read_policy)
     193             :     : hash_key_(is_redis_cluster ? Crc16::crc16(hashtag(key, true))
     194             :                                  : MurmurHash::murmurHash2(hashtag(key, enabled_hashtagging))),
     195           0 :       is_read_(isReadRequest(request)), read_policy_(read_policy) {}
     196             : 
     197             : // Inspired by the redis-cluster hashtagging algorithm
     198             : // https://redis.io/topics/cluster-spec#keys-hash-tags
     199           0 : absl::string_view RedisLoadBalancerContextImpl::hashtag(absl::string_view v, bool enabled) {
     200           0 :   if (!enabled) {
     201           0 :     return v;
     202           0 :   }
     203             : 
     204           0 :   auto start = v.find('{');
     205           0 :   if (start == std::string::npos) {
     206           0 :     return v;
     207           0 :   }
     208             : 
     209           0 :   auto end = v.find('}', start);
     210           0 :   if (end == std::string::npos || end == start + 1) {
     211           0 :     return v;
     212           0 :   }
     213             : 
     214           0 :   return v.substr(start + 1, end - start - 1);
     215           0 : }
     216             : } // namespace Redis
     217             : } // namespace Clusters
     218             : } // namespace Extensions
     219             : } // namespace Envoy

Generated by: LCOV version 1.15