LCOV - code coverage report
Current view: top level - source/extensions/clusters/redis - redis_cluster.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 10 39 25.6 %
Date: 2024-01-05 06:35:25 Functions: 2 23 8.7 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <array>
       4             : #include <atomic>
       5             : #include <chrono>
       6             : #include <cstdint>
       7             : #include <functional>
       8             : #include <list>
       9             : #include <memory>
      10             : #include <string>
      11             : #include <tuple>
      12             : #include <utility>
      13             : #include <vector>
      14             : 
      15             : #include "envoy/api/api.h"
      16             : #include "envoy/common/random_generator.h"
      17             : #include "envoy/config/cluster/v3/cluster.pb.h"
      18             : #include "envoy/config/endpoint/v3/endpoint.pb.h"
      19             : #include "envoy/config/endpoint/v3/endpoint_components.pb.h"
      20             : #include "envoy/config/typed_metadata.h"
      21             : #include "envoy/event/dispatcher.h"
      22             : #include "envoy/event/timer.h"
      23             : #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h"
      24             : #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h"
      25             : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
      26             : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h"
      27             : #include "envoy/http/codec.h"
      28             : #include "envoy/local_info/local_info.h"
      29             : #include "envoy/network/dns.h"
      30             : #include "envoy/runtime/runtime.h"
      31             : #include "envoy/secret/secret_manager.h"
      32             : #include "envoy/server/transport_socket_config.h"
      33             : #include "envoy/singleton/manager.h"
      34             : #include "envoy/ssl/context_manager.h"
      35             : #include "envoy/stats/scope.h"
      36             : #include "envoy/thread_local/thread_local.h"
      37             : #include "envoy/upstream/cluster_manager.h"
      38             : #include "envoy/upstream/health_checker.h"
      39             : #include "envoy/upstream/load_balancer.h"
      40             : #include "envoy/upstream/locality.h"
      41             : #include "envoy/upstream/upstream.h"
      42             : 
      43             : #include "source/common/common/callback_impl.h"
      44             : #include "source/common/common/enum_to_int.h"
      45             : #include "source/common/common/logger.h"
      46             : #include "source/common/config/datasource.h"
      47             : #include "source/common/config/metadata.h"
      48             : #include "source/common/config/well_known_names.h"
      49             : #include "source/common/network/address_impl.h"
      50             : #include "source/common/network/utility.h"
      51             : #include "source/common/stats/isolated_store_impl.h"
      52             : #include "source/common/upstream/cluster_factory_impl.h"
      53             : #include "source/common/upstream/load_balancer_impl.h"
      54             : #include "source/common/upstream/outlier_detection_impl.h"
      55             : #include "source/common/upstream/resource_manager_impl.h"
      56             : #include "source/common/upstream/upstream_impl.h"
      57             : #include "source/extensions/clusters/redis/redis_cluster_lb.h"
      58             : #include "source/extensions/common/redis/cluster_refresh_manager_impl.h"
      59             : #include "source/extensions/filters/network/common/redis/client.h"
      60             : #include "source/extensions/filters/network/common/redis/client_impl.h"
      61             : #include "source/extensions/filters/network/common/redis/codec.h"
      62             : #include "source/extensions/filters/network/common/redis/utility.h"
      63             : #include "source/extensions/filters/network/redis_proxy/config.h"
      64             : #include "source/server/transport_socket_config_impl.h"
      65             : 
      66             : namespace Envoy {
      67             : namespace Extensions {
      68             : namespace Clusters {
      69             : namespace Redis {
      70             : 
      71             : /*
      72             :  * This class implements support for the topology part of `Redis Cluster
      73             :  * <https://redis.io/topics/cluster-spec>`_. Specifically, it allows Envoy to maintain an internal
      74             :  * representation of the topology of a Redis Cluster, and how often the topology should be
      75             :  * refreshed.
      76             :  *
      77             :  * The target Redis Cluster is obtained from the yaml config file as usual, and we choose a random
      78             :  * discovery address from DNS if there are no existing hosts (our startup condition). Otherwise, we
      79             :  * choose a random host from our known set of hosts. Then, against this host we make a topology
      80             :  * request.
      81             :  *
      82             :  * Topology requests are handled by RedisDiscoverySession, which handles the initialization of
      83             :  * the `CLUSTER SLOTS command <https://redis.io/commands/cluster-slots>`_, and the responses and
      84             :  * failure cases.
      85             :  *
      86             :  * Once the topology is fetched from Redis, the cluster will update the
      87             :  * RedisClusterLoadBalancerFactory, which will be used by the redis proxy filter for load balancing
      88             :  * purpose.
      89             :  */
      90             : 
      91             : class RedisCluster : public Upstream::BaseDynamicClusterImpl {
      92             : public:
      93             :   RedisCluster(const envoy::config::cluster::v3::Cluster& cluster,
      94             :                const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster,
      95             :                Upstream::ClusterFactoryContext& context,
      96             :                NetworkFilters::Common::Redis::Client::ClientFactory& client_factory,
      97             :                Network::DnsResolverSharedPtr dns_resolver,
      98             :                ClusterSlotUpdateCallBackSharedPtr factory);
      99             : 
     100             :   struct ClusterSlotsRequest : public Extensions::NetworkFilters::Common::Redis::RespValue {
     101             :   public:
     102           2 :     ClusterSlotsRequest() {
     103           2 :       type(Extensions::NetworkFilters::Common::Redis::RespType::Array);
     104           2 :       std::vector<NetworkFilters::Common::Redis::RespValue> values(2);
     105           2 :       values[0].type(NetworkFilters::Common::Redis::RespType::BulkString);
     106           2 :       values[0].asString() = "CLUSTER";
     107           2 :       values[1].type(NetworkFilters::Common::Redis::RespType::BulkString);
     108           2 :       values[1].asString() = "SLOTS";
     109           2 :       asArray().swap(values);
     110           2 :     }
     111             : 
     112             :     static ClusterSlotsRequest instance_;
     113             :   };
     114             : 
     115           0 :   InitializePhase initializePhase() const override { return InitializePhase::Primary; }
     116             : 
     117           0 :   TimeSource& timeSource() const { return time_source_; }
     118             : 
     119             : private:
     120             :   friend class RedisClusterTest;
     121             : 
     122             :   void startPreInit() override;
     123             : 
     124             :   void updateAllHosts(const Upstream::HostVector& hosts_added,
     125             :                       const Upstream::HostVector& hosts_removed, uint32_t priority);
     126             : 
     127             :   void onClusterSlotUpdate(ClusterSlotsSharedPtr&&);
     128             : 
     129             :   void reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) override;
     130             : 
     131           0 :   const envoy::config::endpoint::v3::LocalityLbEndpoints& localityLbEndpoint() const {
     132             :     // Always use the first endpoint.
     133           0 :     return load_assignment_.endpoints()[0];
     134           0 :   }
     135             : 
     136           0 :   const envoy::config::endpoint::v3::LbEndpoint& lbEndpoint() const {
     137             :     // Always use the first endpoint.
     138           0 :     return localityLbEndpoint().lb_endpoints()[0];
     139           0 :   }
     140             : 
     141             :   // A redis node in the Redis cluster.
     142             :   class RedisHost : public Upstream::HostImpl {
     143             :   public:
     144             :     RedisHost(Upstream::ClusterInfoConstSharedPtr cluster, const std::string& hostname,
     145             :               Network::Address::InstanceConstSharedPtr address, RedisCluster& parent, bool primary,
     146             :               TimeSource& time_source)
     147             :         : Upstream::HostImpl(
     148             :               cluster, hostname, address,
     149             :               // TODO(zyfjeff): Created through metadata shared pool
     150             :               std::make_shared<envoy::config::core::v3::Metadata>(parent.lbEndpoint().metadata()),
     151             :               parent.lbEndpoint().load_balancing_weight().value(),
     152             :               parent.localityLbEndpoint().locality(),
     153             :               parent.lbEndpoint().endpoint().health_check_config(),
     154             :               parent.localityLbEndpoint().priority(), parent.lbEndpoint().health_status(),
     155             :               time_source),
     156           0 :           primary_(primary) {}
     157             : 
     158           0 :     bool isPrimary() const { return primary_; }
     159             : 
     160             :   private:
     161             :     const bool primary_;
     162             :   };
     163             : 
     164             :   // Resolves the discovery endpoint.
     165             :   struct DnsDiscoveryResolveTarget {
     166             :     DnsDiscoveryResolveTarget(RedisCluster& parent, const std::string& dns_address,
     167             :                               const uint32_t port);
     168             : 
     169             :     ~DnsDiscoveryResolveTarget();
     170             : 
     171             :     void startResolveDns();
     172             : 
     173             :     RedisCluster& parent_;
     174             :     Network::ActiveDnsQuery* active_query_{};
     175             :     const std::string dns_address_;
     176             :     const uint32_t port_;
     177             :     Event::TimerPtr resolve_timer_;
     178             :   };
     179             : 
     180             :   using DnsDiscoveryResolveTargetPtr = std::unique_ptr<DnsDiscoveryResolveTarget>;
     181             : 
     182             :   struct RedisDiscoverySession;
     183             : 
     184             :   struct RedisDiscoveryClient : public Network::ConnectionCallbacks {
     185           0 :     RedisDiscoveryClient(RedisDiscoverySession& parent) : parent_(parent) {}
     186             : 
     187             :     // Network::ConnectionCallbacks
     188             :     void onEvent(Network::ConnectionEvent event) override;
     189           0 :     void onAboveWriteBufferHighWatermark() override {}
     190           0 :     void onBelowWriteBufferLowWatermark() override {}
     191             : 
     192             :     RedisDiscoverySession& parent_;
     193             :     std::string host_;
     194             :     Extensions::NetworkFilters::Common::Redis::Client::ClientPtr client_;
     195             :   };
     196             : 
     197             :   using RedisDiscoveryClientPtr = std::unique_ptr<RedisDiscoveryClient>;
     198             : 
     199             :   struct RedisDiscoverySession
     200             :       : public Extensions::NetworkFilters::Common::Redis::Client::Config,
     201             :         public Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks {
     202             :     RedisDiscoverySession(RedisCluster& parent,
     203             :                           NetworkFilters::Common::Redis::Client::ClientFactory& client_factory);
     204             : 
     205             :     ~RedisDiscoverySession() override;
     206             : 
     207             :     void registerDiscoveryAddress(std::list<Network::DnsResponse>&& response, const uint32_t port);
     208             : 
     209             :     // Start discovery against a random host from existing hosts
     210             :     void startResolveRedis();
     211             : 
     212             :     // Extensions::NetworkFilters::Common::Redis::Client::Config
     213           0 :     bool disableOutlierEvents() const override { return true; }
     214           0 :     std::chrono::milliseconds opTimeout() const override {
     215             :       // Allow the main Health Check infra to control timeout.
     216           0 :       return parent_.cluster_refresh_timeout_;
     217           0 :     }
     218           0 :     bool enableHashtagging() const override { return false; }
     219           0 :     bool enableRedirection() const override { return false; }
     220           0 :     uint32_t maxBufferSizeBeforeFlush() const override { return 0; }
     221           0 :     std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; }
     222           0 :     uint32_t maxUpstreamUnknownConnections() const override { return 0; }
     223           0 :     bool enableCommandStats() const override { return true; }
     224           0 :     bool connectionRateLimitEnabled() const override { return false; }
     225           0 :     uint32_t connectionRateLimitPerSec() const override { return 0; }
     226             :     // For any readPolicy other than Primary, the RedisClientFactory will send a READONLY command
     227             :     // when establishing a new connection. Since we're only using this for making the "cluster
     228             :     // slots" commands, the READONLY command is not relevant in this context. We're setting it to
     229             :     // Primary to avoid the additional READONLY command.
     230           0 :     Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override {
     231           0 :       return Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy::Primary;
     232           0 :     }
     233             : 
     234             :     // Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks
     235             :     void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
     236             :     void onFailure() override;
     237             :     // Note: Below callback isn't used in topology updates
     238             :     void onRedirection(NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&,
     239           0 :                        bool) override {}
     240             :     void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&);
     241             : 
     242             :     Network::Address::InstanceConstSharedPtr
     243             :     ipAddressFromClusterEntry(const std::vector<NetworkFilters::Common::Redis::RespValue>& array);
     244             :     bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value);
     245             :     void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots,
     246             :                                  std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt);
     247             :     void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index,
     248             :                          std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt);
     249             :     void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots);
     250             :     void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response);
     251             : 
     252             :     RedisCluster& parent_;
     253             :     Event::Dispatcher& dispatcher_;
     254             :     std::string current_host_address_;
     255             :     Extensions::NetworkFilters::Common::Redis::Client::PoolRequest* current_request_{};
     256             :     absl::node_hash_map<std::string, RedisDiscoveryClientPtr> client_map_;
     257             : 
     258             :     std::list<Network::Address::InstanceConstSharedPtr> discovery_address_list_;
     259             : 
     260             :     Event::TimerPtr resolve_timer_;
     261             :     NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_;
     262             :     const std::chrono::milliseconds buffer_timeout_;
     263             :     NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_;
     264             :   };
     265             : 
     266             :   Upstream::ClusterManager& cluster_manager_;
     267             :   const std::chrono::milliseconds cluster_refresh_rate_;
     268             :   const std::chrono::milliseconds cluster_refresh_timeout_;
     269             :   const std::chrono::milliseconds redirect_refresh_interval_;
     270             :   const uint32_t redirect_refresh_threshold_;
     271             :   const uint32_t failure_refresh_threshold_;
     272             :   const uint32_t host_degraded_refresh_threshold_;
     273             :   std::list<DnsDiscoveryResolveTargetPtr> dns_discovery_resolve_targets_;
     274             :   Event::Dispatcher& dispatcher_;
     275             :   Network::DnsResolverSharedPtr dns_resolver_;
     276             :   Network::DnsLookupFamily dns_lookup_family_;
     277             :   const envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment_;
     278             :   const LocalInfo::LocalInfo& local_info_;
     279             :   Random::RandomGenerator& random_;
     280             :   RedisDiscoverySession redis_discovery_session_;
     281             :   const ClusterSlotUpdateCallBackSharedPtr lb_factory_;
     282             : 
     283             :   Upstream::HostVector hosts_;
     284             : 
     285             :   const std::string auth_username_;
     286             :   const std::string auth_password_;
     287             :   const std::string cluster_name_;
     288             :   const Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager_;
     289             :   const Common::Redis::ClusterRefreshManager::HandlePtr registration_handle_;
     290             : };
     291             : 
     292             : class RedisClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
     293             :                                 envoy::extensions::clusters::redis::v3::RedisClusterConfig> {
     294             : public:
     295           2 :   RedisClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.redis") {}
     296             : 
     297             : private:
     298             :   friend class RedisClusterTest;
     299             : 
     300             :   absl::StatusOr<
     301             :       std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
     302             :   createClusterWithConfig(
     303             :       const envoy::config::cluster::v3::Cluster& cluster,
     304             :       const envoy::extensions::clusters::redis::v3::RedisClusterConfig& proto_config,
     305             :       Upstream::ClusterFactoryContext& context) override;
     306             : };
     307             : } // namespace Redis
     308             : } // namespace Clusters
     309             : } // namespace Extensions
     310             : } // namespace Envoy

Generated by: LCOV version 1.15