Line data Source code
1 : #pragma once 2 : 3 : #include <array> 4 : #include <string> 5 : #include <vector> 6 : 7 : #include "envoy/upstream/load_balancer.h" 8 : #include "envoy/upstream/upstream.h" 9 : 10 : #include "source/common/network/address_impl.h" 11 : #include "source/common/upstream/load_balancer_impl.h" 12 : #include "source/common/upstream/upstream_impl.h" 13 : #include "source/extensions/clusters/redis/crc16.h" 14 : #include "source/extensions/filters/network/common/redis/client.h" 15 : #include "source/extensions/filters/network/common/redis/codec.h" 16 : #include "source/extensions/filters/network/common/redis/supported_commands.h" 17 : 18 : #include "absl/container/btree_map.h" 19 : #include "absl/synchronization/mutex.h" 20 : 21 : namespace Envoy { 22 : namespace Extensions { 23 : namespace Clusters { 24 : namespace Redis { 25 : 26 : static const uint64_t MaxSlot = 16384; 27 : 28 : using ReplicaToResolve = std::pair<std::string, uint16_t>; 29 : 30 : class ClusterSlot { 31 : public: 32 : ClusterSlot(int64_t start, int64_t end, Network::Address::InstanceConstSharedPtr primary) 33 0 : : start_(start), end_(end), primary_(std::move(primary)) {} 34 : 35 0 : int64_t start() const { return start_; } 36 0 : int64_t end() const { return end_; } 37 0 : Network::Address::InstanceConstSharedPtr primary() const { return primary_; } 38 0 : const absl::btree_map<std::string, Network::Address::InstanceConstSharedPtr>& replicas() const { 39 0 : return replicas_; 40 0 : } 41 : 42 0 : void setPrimary(Network::Address::InstanceConstSharedPtr address) { 43 0 : primary_ = std::move(address); 44 0 : } 45 0 : void addReplica(Network::Address::InstanceConstSharedPtr replica_address) { 46 0 : replicas_.emplace(replica_address->asString(), std::move(replica_address)); 47 0 : } 48 0 : void addReplicaToResolve(const std::string& host, uint16_t port) { 49 0 : replicas_to_resolve_.emplace_back(host, port); 50 0 : } 51 : 52 : bool operator==(const ClusterSlot& rhs) const; 53 : 54 : // In case of primary slot address is hostname and needs to be resolved 55 : std::string primary_hostname_; 56 : uint16_t primary_port_; 57 : std::vector<ReplicaToResolve> replicas_to_resolve_; 58 : 59 : private: 60 : int64_t start_; 61 : int64_t end_; 62 : Network::Address::InstanceConstSharedPtr primary_; 63 : absl::btree_map<std::string, Network::Address::InstanceConstSharedPtr> replicas_; 64 : }; 65 : 66 : using ClusterSlotsPtr = std::unique_ptr<std::vector<ClusterSlot>>; 67 : using ClusterSlotsSharedPtr = std::shared_ptr<std::vector<ClusterSlot>>; 68 : 69 : class RedisLoadBalancerContext { 70 : public: 71 0 : virtual ~RedisLoadBalancerContext() = default; 72 : 73 : virtual bool isReadCommand() const PURE; 74 : virtual NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const PURE; 75 : }; 76 : 77 : class RedisLoadBalancerContextImpl : public RedisLoadBalancerContext, 78 : public Upstream::LoadBalancerContextBase { 79 : public: 80 : /** 81 : * The load balancer context for Redis requests. Note that is_redis_cluster implies using Redis 82 : * cluster which require us to always enable hashtagging. 83 : * @param key specify the key for the Redis request. 84 : * @param enabled_hashtagging specify whether to enable hashtagging, this will always be true if 85 : * is_redis_cluster is true. 86 : * @param is_redis_cluster specify whether this is a request for redis cluster, if true the key 87 : * will be hashed using crc16. 88 : * @param request specify the Redis request. 89 : * @param read_policy specify the read policy. 90 : */ 91 : RedisLoadBalancerContextImpl(const std::string& key, bool enabled_hashtagging, 92 : bool is_redis_cluster, 93 : const NetworkFilters::Common::Redis::RespValue& request, 94 : NetworkFilters::Common::Redis::Client::ReadPolicy read_policy = 95 : NetworkFilters::Common::Redis::Client::ReadPolicy::Primary); 96 : 97 : // Upstream::LoadBalancerContextBase 98 0 : absl::optional<uint64_t> computeHashKey() override { return hash_key_; } 99 : 100 0 : bool isReadCommand() const override { return is_read_; } 101 : 102 0 : NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override { 103 0 : return read_policy_; 104 0 : } 105 : 106 : private: 107 : absl::string_view hashtag(absl::string_view v, bool enabled); 108 : 109 : static bool isReadRequest(const NetworkFilters::Common::Redis::RespValue& request); 110 : 111 : const absl::optional<uint64_t> hash_key_; 112 : const bool is_read_; 113 : const NetworkFilters::Common::Redis::Client::ReadPolicy read_policy_; 114 : }; 115 : 116 : class ClusterSlotUpdateCallBack { 117 : public: 118 0 : virtual ~ClusterSlotUpdateCallBack() = default; 119 : 120 : /** 121 : * Callback when cluster slot is updated 122 : * @param slots provides the updated cluster slots. 123 : * @param all_hosts provides the updated hosts. 124 : * @return indicate if the cluster slot is updated or not. 125 : */ 126 : virtual bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, 127 : Upstream::HostMap& all_hosts) PURE; 128 : 129 : /** 130 : * Callback when a host's health status is updated 131 : */ 132 : virtual void onHostHealthUpdate() PURE; 133 : }; 134 : 135 : using ClusterSlotUpdateCallBackSharedPtr = std::shared_ptr<ClusterSlotUpdateCallBack>; 136 : 137 : /** 138 : * This factory is created and returned by RedisCluster's factory() method, the create() method will 139 : * be called on each thread to create a thread local RedisClusterLoadBalancer. 140 : */ 141 : class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack, 142 : public Upstream::LoadBalancerFactory { 143 : public: 144 0 : RedisClusterLoadBalancerFactory(Random::RandomGenerator& random) : random_(random) {} 145 : 146 : // ClusterSlotUpdateCallBack 147 : bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, Upstream::HostMap& all_hosts) override; 148 : 149 : void onHostHealthUpdate() override; 150 : 151 : // Upstream::LoadBalancerFactory 152 : Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override; 153 : 154 : private: 155 : class RedisShard { 156 : public: 157 : RedisShard(Upstream::HostConstSharedPtr primary, Upstream::HostVectorConstSharedPtr replicas, 158 : Upstream::HostVectorConstSharedPtr all_hosts) 159 0 : : primary_(std::move(primary)) { 160 0 : replicas_.updateHosts(Upstream::HostSetImpl::partitionHosts( 161 0 : std::move(replicas), Upstream::HostsPerLocalityImpl::empty()), 162 0 : nullptr, {}, {}); 163 0 : all_hosts_.updateHosts(Upstream::HostSetImpl::partitionHosts( 164 0 : std::move(all_hosts), Upstream::HostsPerLocalityImpl::empty()), 165 0 : nullptr, {}, {}); 166 0 : } 167 0 : const Upstream::HostConstSharedPtr primary() const { return primary_; } 168 0 : const Upstream::HostSetImpl& replicas() const { return replicas_; } 169 0 : const Upstream::HostSetImpl& allHosts() const { return all_hosts_; } 170 : 171 : private: 172 : const Upstream::HostConstSharedPtr primary_; 173 : Upstream::HostSetImpl replicas_{0, absl::nullopt, absl::nullopt}; 174 : Upstream::HostSetImpl all_hosts_{0, absl::nullopt, absl::nullopt}; 175 : }; 176 : 177 : using RedisShardSharedPtr = std::shared_ptr<const RedisShard>; 178 : using ShardVectorSharedPtr = std::shared_ptr<std::vector<RedisShardSharedPtr>>; 179 : using SlotArray = std::array<uint64_t, MaxSlot>; 180 : using SlotArraySharedPtr = std::shared_ptr<const SlotArray>; 181 : 182 : /* 183 : * This class implements load balancing according to `Redis Cluster 184 : * <https://redis.io/topics/cluster-spec>`_. This load balancer is thread local and created 185 : * through the RedisClusterLoadBalancerFactory by the cluster manager. 186 : * 187 : * The topology is stored in slot_array_ and shard_vector_. According to the 188 : * `Redis Cluster Spec <https://redis.io/topics/cluster-spec#keys-distribution-model`_, the key 189 : * space is split into a fixed size 16384 slots. The current implementation uses a fixed size 190 : * std::array() of the index of the shard in the shard_vector_. This has a fixed cpu and memory 191 : * cost and provide a fast lookup constant time lookup similar to Maglev. This will be used by the 192 : * redis proxy filter for load balancing purpose. 193 : */ 194 : class RedisClusterLoadBalancer : public Upstream::LoadBalancer { 195 : public: 196 : RedisClusterLoadBalancer(SlotArraySharedPtr slot_array, ShardVectorSharedPtr shard_vector, 197 : Random::RandomGenerator& random) 198 : : slot_array_(std::move(slot_array)), shard_vector_(std::move(shard_vector)), 199 0 : random_(random) {} 200 : 201 : // Upstream::LoadBalancerBase 202 : Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext*) override; 203 0 : Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { 204 0 : return nullptr; 205 0 : } 206 : // Pool selection not implemented. 207 : absl::optional<Upstream::SelectedPoolAndConnection> 208 : selectExistingConnection(Upstream::LoadBalancerContext* /*context*/, 209 : const Upstream::Host& /*host*/, 210 0 : std::vector<uint8_t>& /*hash_key*/) override { 211 0 : return absl::nullopt; 212 0 : } 213 : // Lifetime tracking not implemented. 214 0 : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override { 215 0 : return {}; 216 0 : } 217 : 218 : private: 219 : const SlotArraySharedPtr slot_array_; 220 : const ShardVectorSharedPtr shard_vector_; 221 : Random::RandomGenerator& random_; 222 : }; 223 : 224 : absl::Mutex mutex_; 225 : SlotArraySharedPtr slot_array_ ABSL_GUARDED_BY(mutex_); 226 : ClusterSlotsSharedPtr current_cluster_slot_; 227 : ShardVectorSharedPtr shard_vector_; 228 : Random::RandomGenerator& random_; 229 : }; 230 : 231 : class RedisClusterThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer { 232 : public: 233 : RedisClusterThreadAwareLoadBalancer(Upstream::LoadBalancerFactorySharedPtr factory) 234 0 : : factory_(std::move(factory)) {} 235 : 236 : // Upstream::ThreadAwareLoadBalancer 237 0 : Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; } 238 0 : void initialize() override{}; 239 : 240 : private: 241 : Upstream::LoadBalancerFactorySharedPtr factory_; 242 : }; 243 : 244 : } // namespace Redis 245 : } // namespace Clusters 246 : } // namespace Extensions 247 : } // namespace Envoy