Line data Source code
1 : #include "redis_cluster_lb.h" 2 : 3 : namespace Envoy { 4 : namespace Extensions { 5 : namespace Clusters { 6 : namespace Redis { 7 : 8 0 : bool ClusterSlot::operator==(const Envoy::Extensions::Clusters::Redis::ClusterSlot& rhs) const { 9 0 : if (start_ != rhs.start_ || end_ != rhs.end_ || *primary_ != *rhs.primary_ || 10 0 : replicas_.size() != rhs.replicas_.size()) { 11 0 : return false; 12 0 : } 13 : // The value type is shared_ptr, and the shared_ptr is not same one even for same ip:port. 14 : // so, just compare the key here. 15 0 : return std::equal(replicas_.begin(), replicas_.end(), rhs.replicas_.begin(), rhs.replicas_.end(), 16 0 : [](const auto& it1, const auto& it2) { return it1.first == it2.first; }); 17 0 : } 18 : 19 : // RedisClusterLoadBalancerFactory 20 : bool RedisClusterLoadBalancerFactory::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, 21 0 : Envoy::Upstream::HostMap& all_hosts) { 22 : // The slots is sorted, allowing for a quick comparison to make sure we need to update the slot 23 : // array sort based on start and end to enable efficient comparison 24 0 : std::sort( 25 0 : slots->begin(), slots->end(), [](const ClusterSlot& lhs, const ClusterSlot& rhs) -> bool { 26 0 : return lhs.start() < rhs.start() || (!(lhs.start() < rhs.start()) && lhs.end() < rhs.end()); 27 0 : }); 28 : 29 0 : if (current_cluster_slot_ && *current_cluster_slot_ == *slots) { 30 0 : return false; 31 0 : } 32 : 33 0 : auto updated_slots = std::make_shared<SlotArray>(); 34 0 : auto shard_vector = std::make_shared<std::vector<RedisShardSharedPtr>>(); 35 0 : absl::flat_hash_map<std::string, uint64_t> shards; 36 : 37 0 : for (const ClusterSlot& slot : *slots) { 38 : // look in the updated map 39 0 : const std::string primary_address = slot.primary()->asString(); 40 : 41 0 : auto result = shards.try_emplace(primary_address, shard_vector->size()); 42 0 : if (result.second) { 43 0 : auto primary_host = all_hosts.find(primary_address); 44 0 : ASSERT(primary_host != all_hosts.end(), 45 0 : "we expect all address to be found in the updated_hosts"); 46 : 47 0 : Upstream::HostVectorSharedPtr primary_and_replicas = std::make_shared<Upstream::HostVector>(); 48 0 : Upstream::HostVectorSharedPtr replicas = std::make_shared<Upstream::HostVector>(); 49 0 : primary_and_replicas->push_back(primary_host->second); 50 : 51 0 : for (auto const& replica : slot.replicas()) { 52 0 : auto replica_host = all_hosts.find(replica.first); 53 0 : ASSERT(replica_host != all_hosts.end(), 54 0 : "we expect all address to be found in the updated_hosts"); 55 0 : replicas->push_back(replica_host->second); 56 0 : primary_and_replicas->push_back(replica_host->second); 57 0 : } 58 : 59 0 : shard_vector->emplace_back( 60 0 : std::make_shared<RedisShard>(primary_host->second, replicas, primary_and_replicas)); 61 0 : } 62 : 63 0 : for (auto i = slot.start(); i <= slot.end(); ++i) { 64 0 : updated_slots->at(i) = result.first->second; 65 0 : } 66 0 : } 67 : 68 0 : { 69 0 : absl::WriterMutexLock lock(&mutex_); 70 0 : current_cluster_slot_ = std::move(slots); 71 0 : slot_array_ = std::move(updated_slots); 72 0 : shard_vector_ = std::move(shard_vector); 73 0 : } 74 0 : return true; 75 0 : } 76 : 77 0 : void RedisClusterLoadBalancerFactory::onHostHealthUpdate() { 78 0 : ShardVectorSharedPtr current_shard_vector; 79 0 : { 80 0 : absl::ReaderMutexLock lock(&mutex_); 81 0 : current_shard_vector = shard_vector_; 82 0 : } 83 : 84 : // This can get called by cluster initialization before the Redis Cluster topology is resolved. 85 0 : if (!current_shard_vector) { 86 0 : return; 87 0 : } 88 : 89 0 : auto shard_vector = std::make_shared<std::vector<RedisShardSharedPtr>>(); 90 : 91 0 : for (auto const& shard : *current_shard_vector) { 92 0 : shard_vector->emplace_back(std::make_shared<RedisShard>( 93 0 : shard->primary(), shard->replicas().hostsPtr(), shard->allHosts().hostsPtr())); 94 0 : } 95 : 96 0 : { 97 0 : absl::WriterMutexLock lock(&mutex_); 98 0 : shard_vector_ = std::move(shard_vector); 99 0 : } 100 0 : } 101 : 102 0 : Upstream::LoadBalancerPtr RedisClusterLoadBalancerFactory::create(Upstream::LoadBalancerParams) { 103 0 : absl::ReaderMutexLock lock(&mutex_); 104 0 : return std::make_unique<RedisClusterLoadBalancer>(slot_array_, shard_vector_, random_); 105 0 : } 106 : 107 : namespace { 108 : Upstream::HostConstSharedPtr chooseRandomHost(const Upstream::HostSetImpl& host_set, 109 0 : Random::RandomGenerator& random) { 110 0 : auto hosts = host_set.healthyHosts(); 111 0 : if (hosts.empty()) { 112 0 : hosts = host_set.degradedHosts(); 113 0 : } 114 : 115 0 : if (hosts.empty()) { 116 0 : hosts = host_set.hosts(); 117 0 : } 118 : 119 0 : if (!hosts.empty()) { 120 0 : return hosts[random.random() % hosts.size()]; 121 0 : } else { 122 0 : return nullptr; 123 0 : } 124 0 : } 125 : } // namespace 126 : 127 : Upstream::HostConstSharedPtr RedisClusterLoadBalancerFactory::RedisClusterLoadBalancer::chooseHost( 128 0 : Envoy::Upstream::LoadBalancerContext* context) { 129 0 : if (!slot_array_) { 130 0 : return nullptr; 131 0 : } 132 0 : absl::optional<uint64_t> hash; 133 0 : if (context) { 134 0 : hash = context->computeHashKey(); 135 0 : } 136 : 137 0 : if (!hash) { 138 0 : return nullptr; 139 0 : } 140 : 141 0 : auto shard = shard_vector_->at( 142 0 : slot_array_->at(hash.value() % Envoy::Extensions::Clusters::Redis::MaxSlot)); 143 : 144 0 : auto redis_context = dynamic_cast<RedisLoadBalancerContext*>(context); 145 0 : if (redis_context && redis_context->isReadCommand()) { 146 0 : switch (redis_context->readPolicy()) { 147 0 : case NetworkFilters::Common::Redis::Client::ReadPolicy::Primary: 148 0 : return shard->primary(); 149 0 : case NetworkFilters::Common::Redis::Client::ReadPolicy::PreferPrimary: 150 0 : if (shard->primary()->coarseHealth() == Upstream::Host::Health::Healthy) { 151 0 : return shard->primary(); 152 0 : } else { 153 0 : return chooseRandomHost(shard->allHosts(), random_); 154 0 : } 155 0 : case NetworkFilters::Common::Redis::Client::ReadPolicy::Replica: 156 0 : return chooseRandomHost(shard->replicas(), random_); 157 0 : case NetworkFilters::Common::Redis::Client::ReadPolicy::PreferReplica: 158 0 : if (!shard->replicas().healthyHosts().empty()) { 159 0 : return chooseRandomHost(shard->replicas(), random_); 160 0 : } else { 161 0 : return chooseRandomHost(shard->allHosts(), random_); 162 0 : } 163 0 : case NetworkFilters::Common::Redis::Client::ReadPolicy::Any: 164 0 : return chooseRandomHost(shard->allHosts(), random_); 165 0 : } 166 0 : } 167 0 : return shard->primary(); 168 0 : } 169 : 170 : bool RedisLoadBalancerContextImpl::isReadRequest( 171 0 : const NetworkFilters::Common::Redis::RespValue& request) { 172 0 : const NetworkFilters::Common::Redis::RespValue* command = nullptr; 173 0 : if (request.type() == NetworkFilters::Common::Redis::RespType::Array) { 174 0 : command = &(request.asArray()[0]); 175 0 : } else if (request.type() == NetworkFilters::Common::Redis::RespType::CompositeArray) { 176 0 : command = request.asCompositeArray().command(); 177 0 : } 178 0 : if (!command) { 179 0 : return false; 180 0 : } 181 0 : if (command->type() != NetworkFilters::Common::Redis::RespType::SimpleString && 182 0 : command->type() != NetworkFilters::Common::Redis::RespType::BulkString) { 183 0 : return false; 184 0 : } 185 0 : std::string to_lower_string = absl::AsciiStrToLower(command->asString()); 186 0 : return NetworkFilters::Common::Redis::SupportedCommands::isReadCommand(to_lower_string); 187 0 : } 188 : 189 : RedisLoadBalancerContextImpl::RedisLoadBalancerContextImpl( 190 : const std::string& key, bool enabled_hashtagging, bool is_redis_cluster, 191 : const NetworkFilters::Common::Redis::RespValue& request, 192 : NetworkFilters::Common::Redis::Client::ReadPolicy read_policy) 193 : : hash_key_(is_redis_cluster ? Crc16::crc16(hashtag(key, true)) 194 : : MurmurHash::murmurHash2(hashtag(key, enabled_hashtagging))), 195 0 : is_read_(isReadRequest(request)), read_policy_(read_policy) {} 196 : 197 : // Inspired by the redis-cluster hashtagging algorithm 198 : // https://redis.io/topics/cluster-spec#keys-hash-tags 199 0 : absl::string_view RedisLoadBalancerContextImpl::hashtag(absl::string_view v, bool enabled) { 200 0 : if (!enabled) { 201 0 : return v; 202 0 : } 203 : 204 0 : auto start = v.find('{'); 205 0 : if (start == std::string::npos) { 206 0 : return v; 207 0 : } 208 : 209 0 : auto end = v.find('}', start); 210 0 : if (end == std::string::npos || end == start + 1) { 211 0 : return v; 212 0 : } 213 : 214 0 : return v.substr(start + 1, end - start - 1); 215 0 : } 216 : } // namespace Redis 217 : } // namespace Clusters 218 : } // namespace Extensions 219 : } // namespace Envoy