Line data Source code
1 : #pragma once 2 : 3 : #include <array> 4 : #include <atomic> 5 : #include <chrono> 6 : #include <cstdint> 7 : #include <functional> 8 : #include <list> 9 : #include <memory> 10 : #include <string> 11 : #include <tuple> 12 : #include <utility> 13 : #include <vector> 14 : 15 : #include "envoy/api/api.h" 16 : #include "envoy/common/random_generator.h" 17 : #include "envoy/config/cluster/v3/cluster.pb.h" 18 : #include "envoy/config/endpoint/v3/endpoint.pb.h" 19 : #include "envoy/config/endpoint/v3/endpoint_components.pb.h" 20 : #include "envoy/config/typed_metadata.h" 21 : #include "envoy/event/dispatcher.h" 22 : #include "envoy/event/timer.h" 23 : #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h" 24 : #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h" 25 : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" 26 : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h" 27 : #include "envoy/http/codec.h" 28 : #include "envoy/local_info/local_info.h" 29 : #include "envoy/network/dns.h" 30 : #include "envoy/runtime/runtime.h" 31 : #include "envoy/secret/secret_manager.h" 32 : #include "envoy/server/transport_socket_config.h" 33 : #include "envoy/singleton/manager.h" 34 : #include "envoy/ssl/context_manager.h" 35 : #include "envoy/stats/scope.h" 36 : #include "envoy/thread_local/thread_local.h" 37 : #include "envoy/upstream/cluster_manager.h" 38 : #include "envoy/upstream/health_checker.h" 39 : #include "envoy/upstream/load_balancer.h" 40 : #include "envoy/upstream/locality.h" 41 : #include "envoy/upstream/upstream.h" 42 : 43 : #include "source/common/common/callback_impl.h" 44 : #include "source/common/common/enum_to_int.h" 45 : #include "source/common/common/logger.h" 46 : #include "source/common/config/datasource.h" 47 : #include "source/common/config/metadata.h" 48 : #include "source/common/config/well_known_names.h" 49 : #include "source/common/network/address_impl.h" 50 : #include "source/common/network/utility.h" 51 : #include "source/common/stats/isolated_store_impl.h" 52 : #include "source/common/upstream/cluster_factory_impl.h" 53 : #include "source/common/upstream/load_balancer_impl.h" 54 : #include "source/common/upstream/outlier_detection_impl.h" 55 : #include "source/common/upstream/resource_manager_impl.h" 56 : #include "source/common/upstream/upstream_impl.h" 57 : #include "source/extensions/clusters/redis/redis_cluster_lb.h" 58 : #include "source/extensions/common/redis/cluster_refresh_manager_impl.h" 59 : #include "source/extensions/filters/network/common/redis/client.h" 60 : #include "source/extensions/filters/network/common/redis/client_impl.h" 61 : #include "source/extensions/filters/network/common/redis/codec.h" 62 : #include "source/extensions/filters/network/common/redis/utility.h" 63 : #include "source/extensions/filters/network/redis_proxy/config.h" 64 : #include "source/server/transport_socket_config_impl.h" 65 : 66 : namespace Envoy { 67 : namespace Extensions { 68 : namespace Clusters { 69 : namespace Redis { 70 : 71 : /* 72 : * This class implements support for the topology part of `Redis Cluster 73 : * <https://redis.io/topics/cluster-spec>`_. Specifically, it allows Envoy to maintain an internal 74 : * representation of the topology of a Redis Cluster, and how often the topology should be 75 : * refreshed. 76 : * 77 : * The target Redis Cluster is obtained from the yaml config file as usual, and we choose a random 78 : * discovery address from DNS if there are no existing hosts (our startup condition). Otherwise, we 79 : * choose a random host from our known set of hosts. Then, against this host we make a topology 80 : * request. 81 : * 82 : * Topology requests are handled by RedisDiscoverySession, which handles the initialization of 83 : * the `CLUSTER SLOTS command <https://redis.io/commands/cluster-slots>`_, and the responses and 84 : * failure cases. 85 : * 86 : * Once the topology is fetched from Redis, the cluster will update the 87 : * RedisClusterLoadBalancerFactory, which will be used by the redis proxy filter for load balancing 88 : * purpose. 89 : */ 90 : 91 : class RedisCluster : public Upstream::BaseDynamicClusterImpl { 92 : public: 93 : RedisCluster(const envoy::config::cluster::v3::Cluster& cluster, 94 : const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster, 95 : Upstream::ClusterFactoryContext& context, 96 : NetworkFilters::Common::Redis::Client::ClientFactory& client_factory, 97 : Network::DnsResolverSharedPtr dns_resolver, 98 : ClusterSlotUpdateCallBackSharedPtr factory); 99 : 100 : struct ClusterSlotsRequest : public Extensions::NetworkFilters::Common::Redis::RespValue { 101 : public: 102 2 : ClusterSlotsRequest() { 103 2 : type(Extensions::NetworkFilters::Common::Redis::RespType::Array); 104 2 : std::vector<NetworkFilters::Common::Redis::RespValue> values(2); 105 2 : values[0].type(NetworkFilters::Common::Redis::RespType::BulkString); 106 2 : values[0].asString() = "CLUSTER"; 107 2 : values[1].type(NetworkFilters::Common::Redis::RespType::BulkString); 108 2 : values[1].asString() = "SLOTS"; 109 2 : asArray().swap(values); 110 2 : } 111 : 112 : static ClusterSlotsRequest instance_; 113 : }; 114 : 115 0 : InitializePhase initializePhase() const override { return InitializePhase::Primary; } 116 : 117 0 : TimeSource& timeSource() const { return time_source_; } 118 : 119 : private: 120 : friend class RedisClusterTest; 121 : 122 : void startPreInit() override; 123 : 124 : void updateAllHosts(const Upstream::HostVector& hosts_added, 125 : const Upstream::HostVector& hosts_removed, uint32_t priority); 126 : 127 : void onClusterSlotUpdate(ClusterSlotsSharedPtr&&); 128 : 129 : void reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) override; 130 : 131 0 : const envoy::config::endpoint::v3::LocalityLbEndpoints& localityLbEndpoint() const { 132 : // Always use the first endpoint. 133 0 : return load_assignment_.endpoints()[0]; 134 0 : } 135 : 136 0 : const envoy::config::endpoint::v3::LbEndpoint& lbEndpoint() const { 137 : // Always use the first endpoint. 138 0 : return localityLbEndpoint().lb_endpoints()[0]; 139 0 : } 140 : 141 : // A redis node in the Redis cluster. 142 : class RedisHost : public Upstream::HostImpl { 143 : public: 144 : RedisHost(Upstream::ClusterInfoConstSharedPtr cluster, const std::string& hostname, 145 : Network::Address::InstanceConstSharedPtr address, RedisCluster& parent, bool primary, 146 : TimeSource& time_source) 147 : : Upstream::HostImpl( 148 : cluster, hostname, address, 149 : // TODO(zyfjeff): Created through metadata shared pool 150 : std::make_shared<envoy::config::core::v3::Metadata>(parent.lbEndpoint().metadata()), 151 : parent.lbEndpoint().load_balancing_weight().value(), 152 : parent.localityLbEndpoint().locality(), 153 : parent.lbEndpoint().endpoint().health_check_config(), 154 : parent.localityLbEndpoint().priority(), parent.lbEndpoint().health_status(), 155 : time_source), 156 0 : primary_(primary) {} 157 : 158 0 : bool isPrimary() const { return primary_; } 159 : 160 : private: 161 : const bool primary_; 162 : }; 163 : 164 : // Resolves the discovery endpoint. 165 : struct DnsDiscoveryResolveTarget { 166 : DnsDiscoveryResolveTarget(RedisCluster& parent, const std::string& dns_address, 167 : const uint32_t port); 168 : 169 : ~DnsDiscoveryResolveTarget(); 170 : 171 : void startResolveDns(); 172 : 173 : RedisCluster& parent_; 174 : Network::ActiveDnsQuery* active_query_{}; 175 : const std::string dns_address_; 176 : const uint32_t port_; 177 : Event::TimerPtr resolve_timer_; 178 : }; 179 : 180 : using DnsDiscoveryResolveTargetPtr = std::unique_ptr<DnsDiscoveryResolveTarget>; 181 : 182 : struct RedisDiscoverySession; 183 : 184 : struct RedisDiscoveryClient : public Network::ConnectionCallbacks { 185 0 : RedisDiscoveryClient(RedisDiscoverySession& parent) : parent_(parent) {} 186 : 187 : // Network::ConnectionCallbacks 188 : void onEvent(Network::ConnectionEvent event) override; 189 0 : void onAboveWriteBufferHighWatermark() override {} 190 0 : void onBelowWriteBufferLowWatermark() override {} 191 : 192 : RedisDiscoverySession& parent_; 193 : std::string host_; 194 : Extensions::NetworkFilters::Common::Redis::Client::ClientPtr client_; 195 : }; 196 : 197 : using RedisDiscoveryClientPtr = std::unique_ptr<RedisDiscoveryClient>; 198 : 199 : struct RedisDiscoverySession 200 : : public Extensions::NetworkFilters::Common::Redis::Client::Config, 201 : public Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks { 202 : RedisDiscoverySession(RedisCluster& parent, 203 : NetworkFilters::Common::Redis::Client::ClientFactory& client_factory); 204 : 205 : ~RedisDiscoverySession() override; 206 : 207 : void registerDiscoveryAddress(std::list<Network::DnsResponse>&& response, const uint32_t port); 208 : 209 : // Start discovery against a random host from existing hosts 210 : void startResolveRedis(); 211 : 212 : // Extensions::NetworkFilters::Common::Redis::Client::Config 213 0 : bool disableOutlierEvents() const override { return true; } 214 0 : std::chrono::milliseconds opTimeout() const override { 215 : // Allow the main Health Check infra to control timeout. 216 0 : return parent_.cluster_refresh_timeout_; 217 0 : } 218 0 : bool enableHashtagging() const override { return false; } 219 0 : bool enableRedirection() const override { return false; } 220 0 : uint32_t maxBufferSizeBeforeFlush() const override { return 0; } 221 0 : std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; } 222 0 : uint32_t maxUpstreamUnknownConnections() const override { return 0; } 223 0 : bool enableCommandStats() const override { return true; } 224 0 : bool connectionRateLimitEnabled() const override { return false; } 225 0 : uint32_t connectionRateLimitPerSec() const override { return 0; } 226 : // For any readPolicy other than Primary, the RedisClientFactory will send a READONLY command 227 : // when establishing a new connection. Since we're only using this for making the "cluster 228 : // slots" commands, the READONLY command is not relevant in this context. We're setting it to 229 : // Primary to avoid the additional READONLY command. 230 0 : Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override { 231 0 : return Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy::Primary; 232 0 : } 233 : 234 : // Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks 235 : void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override; 236 : void onFailure() override; 237 : // Note: Below callback isn't used in topology updates 238 : void onRedirection(NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&, 239 0 : bool) override {} 240 : void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&); 241 : 242 : Network::Address::InstanceConstSharedPtr 243 : ipAddressFromClusterEntry(const std::vector<NetworkFilters::Common::Redis::RespValue>& array); 244 : bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value); 245 : void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots, 246 : std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt); 247 : void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index, 248 : std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt); 249 : void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots); 250 : void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response); 251 : 252 : RedisCluster& parent_; 253 : Event::Dispatcher& dispatcher_; 254 : std::string current_host_address_; 255 : Extensions::NetworkFilters::Common::Redis::Client::PoolRequest* current_request_{}; 256 : absl::node_hash_map<std::string, RedisDiscoveryClientPtr> client_map_; 257 : 258 : std::list<Network::Address::InstanceConstSharedPtr> discovery_address_list_; 259 : 260 : Event::TimerPtr resolve_timer_; 261 : NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_; 262 : const std::chrono::milliseconds buffer_timeout_; 263 : NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_; 264 : }; 265 : 266 : Upstream::ClusterManager& cluster_manager_; 267 : const std::chrono::milliseconds cluster_refresh_rate_; 268 : const std::chrono::milliseconds cluster_refresh_timeout_; 269 : const std::chrono::milliseconds redirect_refresh_interval_; 270 : const uint32_t redirect_refresh_threshold_; 271 : const uint32_t failure_refresh_threshold_; 272 : const uint32_t host_degraded_refresh_threshold_; 273 : std::list<DnsDiscoveryResolveTargetPtr> dns_discovery_resolve_targets_; 274 : Event::Dispatcher& dispatcher_; 275 : Network::DnsResolverSharedPtr dns_resolver_; 276 : Network::DnsLookupFamily dns_lookup_family_; 277 : const envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment_; 278 : const LocalInfo::LocalInfo& local_info_; 279 : Random::RandomGenerator& random_; 280 : RedisDiscoverySession redis_discovery_session_; 281 : const ClusterSlotUpdateCallBackSharedPtr lb_factory_; 282 : 283 : Upstream::HostVector hosts_; 284 : 285 : const std::string auth_username_; 286 : const std::string auth_password_; 287 : const std::string cluster_name_; 288 : const Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager_; 289 : const Common::Redis::ClusterRefreshManager::HandlePtr registration_handle_; 290 : }; 291 : 292 : class RedisClusterFactory : public Upstream::ConfigurableClusterFactoryBase< 293 : envoy::extensions::clusters::redis::v3::RedisClusterConfig> { 294 : public: 295 2 : RedisClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.redis") {} 296 : 297 : private: 298 : friend class RedisClusterTest; 299 : 300 : absl::StatusOr< 301 : std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>> 302 : createClusterWithConfig( 303 : const envoy::config::cluster::v3::Cluster& cluster, 304 : const envoy::extensions::clusters::redis::v3::RedisClusterConfig& proto_config, 305 : Upstream::ClusterFactoryContext& context) override; 306 : }; 307 : } // namespace Redis 308 : } // namespace Clusters 309 : } // namespace Extensions 310 : } // namespace Envoy