LCOV - code coverage report
Current view: top level - source/extensions/clusters/redis - redis_cluster.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 376 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 29 0.0 %

          Line data    Source code
       1             : #include "redis_cluster.h"
       2             : 
       3             : #include <cstdint>
       4             : #include <memory>
       5             : 
       6             : #include "envoy/config/cluster/v3/cluster.pb.h"
       7             : #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h"
       8             : #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h"
       9             : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
      10             : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h"
      11             : 
      12             : namespace Envoy {
      13             : namespace Extensions {
      14             : namespace Clusters {
      15             : namespace Redis {
      16             : 
      17             : RedisCluster::RedisCluster(
      18             :     const envoy::config::cluster::v3::Cluster& cluster,
      19             :     const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster,
      20             :     Upstream::ClusterFactoryContext& context,
      21             :     NetworkFilters::Common::Redis::Client::ClientFactory& redis_client_factory,
      22             :     Network::DnsResolverSharedPtr dns_resolver, ClusterSlotUpdateCallBackSharedPtr lb_factory)
      23             :     : Upstream::BaseDynamicClusterImpl(cluster, context),
      24             :       cluster_manager_(context.clusterManager()),
      25             :       cluster_refresh_rate_(std::chrono::milliseconds(
      26             :           PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, cluster_refresh_rate, 5000))),
      27             :       cluster_refresh_timeout_(std::chrono::milliseconds(
      28             :           PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, cluster_refresh_timeout, 3000))),
      29             :       redirect_refresh_interval_(std::chrono::milliseconds(
      30             :           PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, redirect_refresh_interval, 5000))),
      31             :       redirect_refresh_threshold_(
      32             :           PROTOBUF_GET_WRAPPED_OR_DEFAULT(redis_cluster, redirect_refresh_threshold, 5)),
      33             :       failure_refresh_threshold_(redis_cluster.failure_refresh_threshold()),
      34             :       host_degraded_refresh_threshold_(redis_cluster.host_degraded_refresh_threshold()),
      35             :       dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
      36             :       dns_resolver_(std::move(dns_resolver)),
      37             :       dns_lookup_family_(Upstream::getDnsLookupFamilyFromCluster(cluster)),
      38             :       load_assignment_(cluster.load_assignment()),
      39             :       local_info_(context.serverFactoryContext().localInfo()),
      40             :       random_(context.serverFactoryContext().api().randomGenerator()),
      41             :       redis_discovery_session_(*this, redis_client_factory), lb_factory_(std::move(lb_factory)),
      42             :       auth_username_(NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authUsername(
      43             :           info(), context.serverFactoryContext().api())),
      44             :       auth_password_(NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authPassword(
      45             :           info(), context.serverFactoryContext().api())),
      46             :       cluster_name_(cluster.name()),
      47             :       refresh_manager_(Common::Redis::getClusterRefreshManager(
      48             :           context.serverFactoryContext().singletonManager(),
      49             :           context.serverFactoryContext().mainThreadDispatcher(), context.clusterManager(),
      50             :           context.serverFactoryContext().api().timeSource())),
      51             :       registration_handle_(refresh_manager_->registerCluster(
      52             :           cluster_name_, redirect_refresh_interval_, redirect_refresh_threshold_,
      53           0 :           failure_refresh_threshold_, host_degraded_refresh_threshold_, [&]() {
      54           0 :             redis_discovery_session_.resolve_timer_->enableTimer(std::chrono::milliseconds(0));
      55           0 :           })) {
      56           0 :   const auto& locality_lb_endpoints = load_assignment_.endpoints();
      57           0 :   for (const auto& locality_lb_endpoint : locality_lb_endpoints) {
      58           0 :     for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
      59           0 :       const auto& host = lb_endpoint.endpoint().address();
      60           0 :       dns_discovery_resolve_targets_.emplace_back(new DnsDiscoveryResolveTarget(
      61           0 :           *this, host.socket_address().address(), host.socket_address().port_value()));
      62           0 :     }
      63           0 :   }
      64           0 : }
      65             : 
      66           0 : void RedisCluster::startPreInit() {
      67           0 :   for (const DnsDiscoveryResolveTargetPtr& target : dns_discovery_resolve_targets_) {
      68           0 :     target->startResolveDns();
      69           0 :   }
      70           0 :   if (!wait_for_warm_on_init_) {
      71           0 :     onPreInitComplete();
      72           0 :   }
      73           0 : }
      74             : 
      75             : void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added,
      76             :                                   const Upstream::HostVector& hosts_removed,
      77           0 :                                   uint32_t current_priority) {
      78           0 :   Upstream::PriorityStateManager priority_state_manager(*this, local_info_, nullptr);
      79             : 
      80           0 :   auto locality_lb_endpoint = localityLbEndpoint();
      81           0 :   priority_state_manager.initializePriorityFor(locality_lb_endpoint);
      82           0 :   for (const Upstream::HostSharedPtr& host : hosts_) {
      83           0 :     if (locality_lb_endpoint.priority() == current_priority) {
      84           0 :       priority_state_manager.registerHostForPriority(host, locality_lb_endpoint);
      85           0 :     }
      86           0 :   }
      87             : 
      88           0 :   priority_state_manager.updateClusterPrioritySet(
      89           0 :       current_priority, std::move(priority_state_manager.priorityState()[current_priority].first),
      90           0 :       hosts_added, hosts_removed, absl::nullopt, absl::nullopt, absl::nullopt);
      91           0 : }
      92             : 
      93           0 : void RedisCluster::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots) {
      94           0 :   Upstream::HostVector new_hosts;
      95           0 :   absl::flat_hash_set<std::string> all_new_hosts;
      96             : 
      97           0 :   for (const ClusterSlot& slot : *slots) {
      98           0 :     if (all_new_hosts.count(slot.primary()->asString()) == 0) {
      99           0 :       new_hosts.emplace_back(new RedisHost(info(), "", slot.primary(), *this, true, time_source_));
     100           0 :       all_new_hosts.emplace(slot.primary()->asString());
     101           0 :     }
     102           0 :     for (auto const& replica : slot.replicas()) {
     103           0 :       if (all_new_hosts.count(replica.first) == 0) {
     104           0 :         new_hosts.emplace_back(
     105           0 :             new RedisHost(info(), "", replica.second, *this, false, time_source_));
     106           0 :         all_new_hosts.emplace(replica.first);
     107           0 :       }
     108           0 :     }
     109           0 :   }
     110             : 
     111             :   // Get the map of all the latest existing hosts, which is used to filter out the existing
     112             :   // hosts in the process of updating cluster memberships.
     113           0 :   Upstream::HostMapConstSharedPtr all_hosts = priority_set_.crossPriorityHostMap();
     114           0 :   ASSERT(all_hosts != nullptr);
     115             : 
     116           0 :   Upstream::HostVector hosts_added;
     117           0 :   Upstream::HostVector hosts_removed;
     118           0 :   const bool host_updated = updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed,
     119           0 :                                                   *all_hosts, all_new_hosts);
     120             : 
     121             :   // Create a map containing all the latest hosts to determine whether the slots are updated.
     122           0 :   Upstream::HostMap updated_hosts = *all_hosts;
     123           0 :   for (const auto& host : hosts_removed) {
     124           0 :     updated_hosts.erase(host->address()->asString());
     125           0 :   }
     126           0 :   for (const auto& host : hosts_added) {
     127           0 :     updated_hosts[host->address()->asString()] = host;
     128           0 :   }
     129             : 
     130           0 :   const bool slot_updated =
     131           0 :       lb_factory_ ? lb_factory_->onClusterSlotUpdate(std::move(slots), updated_hosts) : false;
     132             : 
     133             :   // If slot is updated, call updateAllHosts regardless of if there's new hosts to force
     134             :   // update of the thread local load balancers.
     135           0 :   if (host_updated || slot_updated) {
     136           0 :     ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) {
     137           0 :       return host->priority() == localityLbEndpoint().priority();
     138           0 :     }));
     139           0 :     updateAllHosts(hosts_added, hosts_removed, localityLbEndpoint().priority());
     140           0 :   } else {
     141           0 :     info_->configUpdateStats().update_no_rebuild_.inc();
     142           0 :   }
     143             : 
     144             :   // TODO(hyang): If there is an initialize callback, fire it now. Note that if the
     145             :   // cluster refers to multiple DNS names, this will return initialized after a single
     146             :   // DNS resolution completes. This is not perfect but is easier to code and it is unclear
     147             :   // if the extra complexity is needed so will start with this.
     148           0 :   onPreInitComplete();
     149           0 : }
     150             : 
     151           0 : void RedisCluster::reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) {
     152           0 :   if (lb_factory_) {
     153           0 :     lb_factory_->onHostHealthUpdate();
     154           0 :   }
     155           0 :   if (host && (host->coarseHealth() == Upstream::Host::Health::Degraded ||
     156           0 :                host->coarseHealth() == Upstream::Host::Health::Unhealthy)) {
     157           0 :     refresh_manager_->onHostDegraded(cluster_name_);
     158           0 :   }
     159           0 :   ClusterImplBase::reloadHealthyHostsHelper(host);
     160           0 : }
     161             : 
     162             : // DnsDiscoveryResolveTarget
     163             : RedisCluster::DnsDiscoveryResolveTarget::DnsDiscoveryResolveTarget(RedisCluster& parent,
     164             :                                                                    const std::string& dns_address,
     165             :                                                                    const uint32_t port)
     166           0 :     : parent_(parent), dns_address_(dns_address), port_(port) {}
     167             : 
     168           0 : RedisCluster::DnsDiscoveryResolveTarget::~DnsDiscoveryResolveTarget() {
     169           0 :   if (active_query_) {
     170           0 :     active_query_->cancel(Network::ActiveDnsQuery::CancelReason::QueryAbandoned);
     171           0 :   }
     172             :   // Disable timer for mock tests.
     173           0 :   if (resolve_timer_) {
     174           0 :     resolve_timer_->disableTimer();
     175           0 :   }
     176           0 : }
     177             : 
     178           0 : void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() {
     179           0 :   ENVOY_LOG(trace, "starting async DNS resolution for {}", dns_address_);
     180             : 
     181           0 :   active_query_ = parent_.dns_resolver_->resolve(
     182           0 :       dns_address_, parent_.dns_lookup_family_,
     183           0 :       [this](Network::DnsResolver::ResolutionStatus status,
     184           0 :              std::list<Network::DnsResponse>&& response) -> void {
     185           0 :         active_query_ = nullptr;
     186           0 :         ENVOY_LOG(trace, "async DNS resolution complete for {}", dns_address_);
     187           0 :         if (status == Network::DnsResolver::ResolutionStatus::Failure || response.empty()) {
     188           0 :           if (status == Network::DnsResolver::ResolutionStatus::Failure) {
     189           0 :             parent_.info_->configUpdateStats().update_failure_.inc();
     190           0 :           } else {
     191           0 :             parent_.info_->configUpdateStats().update_empty_.inc();
     192           0 :           }
     193             : 
     194           0 :           if (!resolve_timer_) {
     195           0 :             resolve_timer_ =
     196           0 :                 parent_.dispatcher_.createTimer([this]() -> void { startResolveDns(); });
     197           0 :           }
     198             :           // if the initial dns resolved to empty, we'll skip the redis discovery phase and
     199             :           // treat it as an empty cluster.
     200           0 :           parent_.onPreInitComplete();
     201           0 :           resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
     202           0 :         } else {
     203             :           // Once the DNS resolve the initial set of addresses, call startResolveRedis on
     204             :           // the RedisDiscoverySession. The RedisDiscoverySession will using the "cluster
     205             :           // slots" command for service discovery and slot allocation. All subsequent
     206             :           // discoveries are handled by RedisDiscoverySession and will not use DNS
     207             :           // resolution again.
     208           0 :           parent_.redis_discovery_session_.registerDiscoveryAddress(std::move(response), port_);
     209           0 :           parent_.redis_discovery_session_.startResolveRedis();
     210           0 :         }
     211           0 :       });
     212           0 : }
     213             : 
     214             : // RedisCluster
     215             : RedisCluster::RedisDiscoverySession::RedisDiscoverySession(
     216             :     Envoy::Extensions::Clusters::Redis::RedisCluster& parent,
     217             :     NetworkFilters::Common::Redis::Client::ClientFactory& client_factory)
     218             :     : parent_(parent), dispatcher_(parent.dispatcher_),
     219           0 :       resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { startResolveRedis(); })),
     220             :       client_factory_(client_factory), buffer_timeout_(0),
     221             :       redis_command_stats_(
     222             :           NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats(
     223           0 :               parent_.info()->statsScope().symbolTable())) {}
     224             : 
     225             : // Convert the cluster slot IP/Port response to an address, return null if the response
     226             : // does not match the expected type.
     227             : Network::Address::InstanceConstSharedPtr
     228             : RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ipAddressFromClusterEntry(
     229           0 :     const std::vector<NetworkFilters::Common::Redis::RespValue>& array) {
     230           0 :   return Network::Utility::parseInternetAddressNoThrow(array[0].asString(), array[1].asInteger(),
     231           0 :                                                        false);
     232           0 : }
     233             : 
     234           0 : RedisCluster::RedisDiscoverySession::~RedisDiscoverySession() {
     235           0 :   if (current_request_) {
     236           0 :     current_request_->cancel();
     237           0 :     current_request_ = nullptr;
     238           0 :   }
     239             :   // Disable timer for mock tests.
     240           0 :   if (resolve_timer_) {
     241           0 :     resolve_timer_->disableTimer();
     242           0 :   }
     243             : 
     244           0 :   while (!client_map_.empty()) {
     245           0 :     client_map_.begin()->second->client_->close();
     246           0 :   }
     247           0 : }
     248             : 
     249           0 : void RedisCluster::RedisDiscoveryClient::onEvent(Network::ConnectionEvent event) {
     250           0 :   if (event == Network::ConnectionEvent::RemoteClose ||
     251           0 :       event == Network::ConnectionEvent::LocalClose) {
     252           0 :     auto client_to_delete = parent_.client_map_.find(host_);
     253           0 :     ASSERT(client_to_delete != parent_.client_map_.end());
     254           0 :     parent_.dispatcher_.deferredDelete(std::move(client_to_delete->second->client_));
     255           0 :     parent_.client_map_.erase(client_to_delete);
     256           0 :   }
     257           0 : }
     258             : 
     259             : void RedisCluster::RedisDiscoverySession::registerDiscoveryAddress(
     260           0 :     std::list<Envoy::Network::DnsResponse>&& response, const uint32_t port) {
     261             :   // Since the address from DNS does not have port, we need to make a new address that has
     262             :   // port in it.
     263           0 :   for (const Network::DnsResponse& res : response) {
     264           0 :     const auto& addrinfo = res.addrInfo();
     265           0 :     ASSERT(addrinfo.address_ != nullptr);
     266           0 :     discovery_address_list_.push_back(
     267           0 :         Network::Utility::getAddressWithPort(*(addrinfo.address_), port));
     268           0 :   }
     269           0 : }
     270             : 
     271           0 : void RedisCluster::RedisDiscoverySession::startResolveRedis() {
     272           0 :   parent_.info_->configUpdateStats().update_attempt_.inc();
     273             :   // If a resolution is currently in progress, skip it.
     274           0 :   if (current_request_) {
     275           0 :     ENVOY_LOG(debug, "redis cluster slot request is already in progress for '{}'",
     276           0 :               parent_.info_->name());
     277           0 :     return;
     278           0 :   }
     279             : 
     280             :   // If hosts is empty, we haven't received a successful result from the CLUSTER SLOTS call
     281             :   // yet. So, pick a random discovery address from dns and make a request.
     282           0 :   Upstream::HostSharedPtr host;
     283           0 :   if (parent_.hosts_.empty()) {
     284           0 :     const int rand_idx = parent_.random_.random() % discovery_address_list_.size();
     285           0 :     auto it = std::next(discovery_address_list_.begin(), rand_idx);
     286           0 :     host = Upstream::HostSharedPtr{
     287           0 :         new RedisHost(parent_.info(), "", *it, parent_, true, parent_.timeSource())};
     288           0 :   } else {
     289           0 :     const int rand_idx = parent_.random_.random() % parent_.hosts_.size();
     290           0 :     host = parent_.hosts_[rand_idx];
     291           0 :   }
     292             : 
     293           0 :   current_host_address_ = host->address()->asString();
     294           0 :   RedisDiscoveryClientPtr& client = client_map_[current_host_address_];
     295           0 :   if (!client) {
     296           0 :     client = std::make_unique<RedisDiscoveryClient>(*this);
     297           0 :     client->host_ = current_host_address_;
     298           0 :     client->client_ = client_factory_.create(host, dispatcher_, *this, redis_command_stats_,
     299           0 :                                              parent_.info()->statsScope(), parent_.auth_username_,
     300           0 :                                              parent_.auth_password_, false);
     301           0 :     client->client_->addConnectionCallbacks(*client);
     302           0 :   }
     303           0 :   ENVOY_LOG(debug, "executing redis cluster slot request for '{}'", parent_.info_->name());
     304           0 :   current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this);
     305           0 : }
     306             : 
     307             : void RedisCluster::RedisDiscoverySession::updateDnsStats(
     308           0 :     Network::DnsResolver::ResolutionStatus status, bool empty_response) {
     309           0 :   if (status == Network::DnsResolver::ResolutionStatus::Failure) {
     310           0 :     parent_.info_->configUpdateStats().update_failure_.inc();
     311           0 :   } else if (empty_response) {
     312           0 :     parent_.info_->configUpdateStats().update_empty_.inc();
     313           0 :   }
     314           0 : }
     315             : 
     316             : /**
     317             :  * Resolve the primary cluster entry hostname in each slot.
     318             :  * If the primary is successfully resolved, we proceed to resolve replicas.
     319             :  * We use the count of hostnames that require resolution to decide when the resolution process is
     320             :  * completed, and then call the post-resolution hooks.
     321             :  *
     322             :  * If resolving any one of the primary replicas fails, we stop the resolution process and reset
     323             :  * the timers to retry the resolution. Failure to resolve a replica, on the other hand does not
     324             :  * stop the process. If we replica resolution fails, we simply log a warning, and move to resolving
     325             :  * the rest.
     326             :  *
     327             :  * @param slots the list of slots which may need DNS resolution
     328             :  * @param address_resolution_required_cnt the number of hostnames that need DNS resolution
     329             :  */
     330             : void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(
     331             :     ClusterSlotsSharedPtr&& slots,
     332           0 :     std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt) {
     333           0 :   for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) {
     334           0 :     auto& slot = (*slots)[slot_idx];
     335           0 :     if (slot.primary() == nullptr) {
     336           0 :       ENVOY_LOG(debug,
     337           0 :                 "starting async DNS resolution for primary slot address {} at index location {}",
     338           0 :                 slot.primary_hostname_, slot_idx);
     339           0 :       parent_.dns_resolver_->resolve(
     340           0 :           slot.primary_hostname_, parent_.dns_lookup_family_,
     341           0 :           [this, slot_idx, slots,
     342           0 :            hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status,
     343           0 :                                              std::list<Network::DnsResponse>&& response) -> void {
     344           0 :             auto& slot = (*slots)[slot_idx];
     345           0 :             ENVOY_LOG(
     346           0 :                 debug,
     347           0 :                 "async DNS resolution complete for primary slot address {} at index location {}",
     348           0 :                 slot.primary_hostname_, slot_idx);
     349           0 :             updateDnsStats(status, response.empty());
     350             :             // If DNS resolution for a primary fails, we stop resolution for remaining, and reset
     351             :             // the timer.
     352           0 :             if (status != Network::DnsResolver::ResolutionStatus::Success) {
     353           0 :               ENVOY_LOG(error, "Unable to resolve cluster slot primary hostname {}",
     354           0 :                         slot.primary_hostname_);
     355           0 :               resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
     356           0 :               return;
     357           0 :             }
     358             :             // Primary slot address resolved
     359           0 :             slot.setPrimary(Network::Utility::getAddressWithPort(
     360           0 :                 *response.front().addrInfo().address_, slot.primary_port_));
     361           0 :             (*hostname_resolution_required_cnt)--;
     362             :             // Continue on to resolve replicas
     363           0 :             resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt);
     364           0 :           });
     365           0 :     } else {
     366           0 :       resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt);
     367           0 :     }
     368           0 :   }
     369           0 : }
     370             : 
     371             : /**
     372             :  * Resolve the replicas in a cluster entry. If there are no replicas, simply return.
     373             :  * If all the hostnames have been resolved, call post-resolution methods.
     374             :  * Failure to resolve a replica does not stop the overall resolution process. We log a
     375             :  * warning, and move to the next one.
     376             :  *
     377             :  * @param slots the list of slots which may need DNS resolution
     378             :  * @param index the specific index into `slots` whose replicas need to be resolved
     379             :  * @param address_resolution_required_cnt the number of address that need to be resolved
     380             :  */
     381             : void RedisCluster::RedisDiscoverySession::resolveReplicas(
     382             :     ClusterSlotsSharedPtr slots, std::size_t index,
     383           0 :     std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt) {
     384           0 :   auto& slot = (*slots)[index];
     385           0 :   if (slot.replicas_to_resolve_.empty()) {
     386           0 :     if (*hostname_resolution_required_cnt == 0) {
     387           0 :       finishClusterHostnameResolution(slots);
     388           0 :     }
     389           0 :     return;
     390           0 :   }
     391             : 
     392           0 :   for (uint64_t replica_idx = 0; replica_idx < slot.replicas_to_resolve_.size(); replica_idx++) {
     393           0 :     auto replica = slot.replicas_to_resolve_[replica_idx];
     394           0 :     ENVOY_LOG(debug, "starting async DNS resolution for replica address {}", replica.first);
     395           0 :     parent_.dns_resolver_->resolve(
     396           0 :         replica.first, parent_.dns_lookup_family_,
     397           0 :         [this, index, slots, replica_idx,
     398           0 :          hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status,
     399           0 :                                            std::list<Network::DnsResponse>&& response) -> void {
     400           0 :           auto& slot = (*slots)[index];
     401           0 :           auto& replica = slot.replicas_to_resolve_[replica_idx];
     402           0 :           ENVOY_LOG(debug, "async DNS resolution complete for replica address {}", replica.first);
     403           0 :           updateDnsStats(status, response.empty());
     404             :           // If DNS resolution fails here, we move on to resolve other replicas in the list.
     405             :           // We log a warn message.
     406           0 :           if (status != Network::DnsResolver::ResolutionStatus::Success) {
     407           0 :             ENVOY_LOG(warn, "Unable to resolve cluster replica address {}", replica.first);
     408           0 :           } else {
     409             :             // Replica resolved
     410           0 :             slot.addReplica(Network::Utility::getAddressWithPort(
     411           0 :                 *response.front().addrInfo().address_, replica.second));
     412           0 :           }
     413           0 :           (*hostname_resolution_required_cnt)--;
     414             :           // finish resolution if all the addresses have been resolved.
     415           0 :           if (*hostname_resolution_required_cnt <= 0) {
     416           0 :             finishClusterHostnameResolution(slots);
     417           0 :           }
     418           0 :         });
     419           0 :   }
     420           0 : }
     421             : 
     422             : void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution(
     423           0 :     ClusterSlotsSharedPtr slots) {
     424           0 :   parent_.onClusterSlotUpdate(std::move(slots));
     425           0 :   resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
     426           0 : }
     427             : 
     428             : void RedisCluster::RedisDiscoverySession::onResponse(
     429           0 :     NetworkFilters::Common::Redis::RespValuePtr&& value) {
     430           0 :   ENVOY_LOG(debug, "redis cluster slot request for '{}' succeeded", parent_.info_->name());
     431           0 :   current_request_ = nullptr;
     432             : 
     433           0 :   const uint32_t SlotRangeStart = 0;
     434           0 :   const uint32_t SlotRangeEnd = 1;
     435           0 :   const uint32_t SlotPrimary = 2;
     436           0 :   const uint32_t SlotReplicaStart = 3;
     437             : 
     438             :   // Do nothing if the cluster is empty.
     439           0 :   if (value->type() != NetworkFilters::Common::Redis::RespType::Array || value->asArray().empty()) {
     440           0 :     onUnexpectedResponse(value);
     441           0 :     return;
     442           0 :   }
     443             : 
     444           0 :   auto cluster_slots = std::make_shared<std::vector<ClusterSlot>>();
     445             : 
     446             :   // https://redis.io/commands/cluster-slots
     447             :   // CLUSTER SLOTS represents nested array of redis instances, like this:
     448             :   //
     449             :   // 1) 1) (integer) 0                                      <-- start slot range
     450             :   //    2) (integer) 5460                                   <-- end slot range
     451             :   //
     452             :   //    3) 1) "127.0.0.1"                                   <-- primary slot IP ADDR(HOSTNAME)
     453             :   //       2) (integer) 30001                               <-- primary slot PORT
     454             :   //       3) "09dbe9720cda62f7865eabc5fd8857c5d2678366"
     455             :   //
     456             :   //    4) 1) "127.0.0.2"                                   <-- replica slot IP ADDR(HOSTNAME)
     457             :   //       2) (integer) 30004                               <-- replica slot PORT
     458             :   //       3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf"
     459             :   //
     460             :   // Loop through the cluster slot response and error checks for each field.
     461           0 :   auto hostname_resolution_required_cnt = std::make_shared<std::uint64_t>(0);
     462           0 :   for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) {
     463           0 :     if (part.type() != NetworkFilters::Common::Redis::RespType::Array) {
     464           0 :       onUnexpectedResponse(value);
     465           0 :       return;
     466           0 :     }
     467             : 
     468             :     // Row 1-2: Slot ranges
     469           0 :     const std::vector<NetworkFilters::Common::Redis::RespValue>& slot_range = part.asArray();
     470           0 :     if (slot_range.size() < 3 ||
     471           0 :         slot_range[SlotRangeStart].type() !=
     472           0 :             NetworkFilters::Common::Redis::RespType::Integer || // Start slot range is an
     473             :                                                                 // integer.
     474           0 :         slot_range[SlotRangeEnd].type() !=
     475           0 :             NetworkFilters::Common::Redis::RespType::Integer) { // End slot range is an
     476             :                                                                 // integer.
     477           0 :       onUnexpectedResponse(value);
     478           0 :       return;
     479           0 :     }
     480             : 
     481             :     // Row 3: Primary slot address
     482           0 :     if (!validateCluster(slot_range[SlotPrimary])) {
     483           0 :       onUnexpectedResponse(value);
     484           0 :       return;
     485           0 :     }
     486             :     // Try to parse primary slot address as IP address
     487             :     // It may fail in case the address is a hostname. If this is the case - we'll come back later
     488             :     // and try to resolve hostnames asynchronously. For example, AWS ElastiCache returns hostname
     489             :     // instead of IP address.
     490           0 :     ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(),
     491           0 :                      ipAddressFromClusterEntry(slot_range[SlotPrimary].asArray()));
     492           0 :     if (slot.primary() == nullptr) {
     493             :       // Primary address is potentially a hostname, save it for async DNS resolution.
     494           0 :       const auto& array = slot_range[SlotPrimary].asArray();
     495           0 :       slot.primary_hostname_ = array[0].asString();
     496           0 :       slot.primary_port_ = array[1].asInteger();
     497           0 :       (*hostname_resolution_required_cnt)++;
     498           0 :     }
     499             : 
     500             :     // Row 4-N: Replica(s) addresses
     501           0 :     for (auto replica = std::next(slot_range.begin(), SlotReplicaStart);
     502           0 :          replica != slot_range.end(); ++replica) {
     503           0 :       if (!validateCluster(*replica)) {
     504           0 :         onUnexpectedResponse(value);
     505           0 :         return;
     506           0 :       }
     507           0 :       auto replica_address = ipAddressFromClusterEntry(replica->asArray());
     508           0 :       if (replica_address) {
     509           0 :         slot.addReplica(std::move(replica_address));
     510           0 :       } else {
     511             :         // Replica address is potentially a hostname, save it for async DNS resolution.
     512           0 :         const auto& array = replica->asArray();
     513           0 :         slot.addReplicaToResolve(array[0].asString(), array[1].asInteger());
     514           0 :         (*hostname_resolution_required_cnt)++;
     515           0 :       }
     516           0 :     }
     517           0 :     cluster_slots->push_back(std::move(slot));
     518           0 :   }
     519             : 
     520           0 :   if (*hostname_resolution_required_cnt > 0) {
     521             :     // DNS resolution is required, defer finalizing the slot update until resolution is complete.
     522           0 :     resolveClusterHostnames(std::move(cluster_slots), hostname_resolution_required_cnt);
     523           0 :   } else {
     524             :     // All slots addresses were represented by IP/Port pairs.
     525           0 :     parent_.onClusterSlotUpdate(std::move(cluster_slots));
     526           0 :     resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
     527           0 :   }
     528           0 : }
     529             : 
     530             : // Ensure that Slot Cluster response has valid format
     531             : bool RedisCluster::RedisDiscoverySession::validateCluster(
     532           0 :     const NetworkFilters::Common::Redis::RespValue& value) {
     533             :   // Verify data types
     534           0 :   if (value.type() != NetworkFilters::Common::Redis::RespType::Array) {
     535           0 :     return false;
     536           0 :   }
     537           0 :   const auto& array = value.asArray();
     538           0 :   if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString ||
     539           0 :       array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) {
     540           0 :     return false;
     541           0 :   }
     542             :   // Verify IP/Host address
     543           0 :   if (array[0].asString().empty()) {
     544           0 :     return false;
     545           0 :   }
     546             :   // Verify port
     547           0 :   if (array[1].asInteger() > 0xffff) {
     548           0 :     return false;
     549           0 :   }
     550             : 
     551           0 :   return true;
     552           0 : }
     553             : 
     554             : void RedisCluster::RedisDiscoverySession::onUnexpectedResponse(
     555           0 :     const NetworkFilters::Common::Redis::RespValuePtr& value) {
     556           0 :   ENVOY_LOG(warn, "Unexpected response to cluster slot command: {}", value->toString());
     557           0 :   this->parent_.info_->configUpdateStats().update_failure_.inc();
     558           0 :   resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
     559           0 : }
     560             : 
     561           0 : void RedisCluster::RedisDiscoverySession::onFailure() {
     562           0 :   ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", parent_.info_->name());
     563           0 :   current_request_ = nullptr;
     564           0 :   if (!current_host_address_.empty()) {
     565           0 :     auto client_to_delete = client_map_.find(current_host_address_);
     566           0 :     client_to_delete->second->client_->close();
     567           0 :   }
     568           0 :   parent_.info()->configUpdateStats().update_failure_.inc();
     569           0 :   resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
     570           0 : }
     571             : 
     572             : RedisCluster::ClusterSlotsRequest RedisCluster::ClusterSlotsRequest::instance_;
     573             : 
     574             : absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
     575             : RedisClusterFactory::createClusterWithConfig(
     576             :     const envoy::config::cluster::v3::Cluster& cluster,
     577             :     const envoy::extensions::clusters::redis::v3::RedisClusterConfig& proto_config,
     578           0 :     Upstream::ClusterFactoryContext& context) {
     579           0 :   if (!cluster.has_cluster_type() || cluster.cluster_type().name() != "envoy.clusters.redis") {
     580           0 :     return absl::InvalidArgumentError("Redis cluster can only created with redis cluster type.");
     581           0 :   }
     582             :   // TODO(hyang): This is needed to migrate existing cluster, disallow using other lb_policy
     583             :   // in the future
     584           0 :   if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
     585           0 :     return std::make_pair(std::make_shared<RedisCluster>(
     586           0 :                               cluster, proto_config, context,
     587           0 :                               NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_,
     588           0 :                               selectDnsResolver(cluster, context), nullptr),
     589           0 :                           nullptr);
     590           0 :   }
     591           0 :   auto lb_factory = std::make_shared<RedisClusterLoadBalancerFactory>(
     592           0 :       context.serverFactoryContext().api().randomGenerator());
     593           0 :   return std::make_pair(std::make_shared<RedisCluster>(
     594           0 :                             cluster, proto_config, context,
     595           0 :                             NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_,
     596           0 :                             selectDnsResolver(cluster, context), lb_factory),
     597           0 :                         std::make_unique<RedisClusterThreadAwareLoadBalancer>(lb_factory));
     598           0 : }
     599             : 
     600             : REGISTER_FACTORY(RedisClusterFactory, Upstream::ClusterFactory);
     601             : 
     602             : } // namespace Redis
     603             : } // namespace Clusters
     604             : } // namespace Extensions
     605             : } // namespace Envoy

Generated by: LCOV version 1.15