1
#include "source/extensions/clusters/redis/redis_cluster.h"
2

            
3
#include <cstdint>
4
#include <memory>
5

            
6
#include "envoy/config/cluster/v3/cluster.pb.h"
7
#include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h"
8
#include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h"
9
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
10
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h"
11

            
12
namespace Envoy {
13
namespace Extensions {
14
namespace Clusters {
15
namespace Redis {
16

            
17
absl::StatusOr<std::unique_ptr<RedisCluster::RedisHost>> RedisCluster::RedisHost::create(
18
    Upstream::ClusterInfoConstSharedPtr cluster, const std::string& hostname,
19
112
    Network::Address::InstanceConstSharedPtr address, RedisCluster& parent, bool primary) {
20
112
  absl::Status creation_status = absl::OkStatus();
21
112
  auto ret = std::unique_ptr<RedisCluster::RedisHost>(
22
112
      new RedisCluster::RedisHost(cluster, hostname, address, parent, primary, creation_status));
23
112
  RETURN_IF_NOT_OK(creation_status);
24
112
  return ret;
25
112
}
26

            
27
absl::StatusOr<std::unique_ptr<RedisCluster>> RedisCluster::create(
28
    const envoy::config::cluster::v3::Cluster& cluster,
29
    const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster,
30
    Upstream::ClusterFactoryContext& context,
31
    NetworkFilters::Common::Redis::Client::ClientFactory& client_factory,
32
30
    Network::DnsResolverSharedPtr dns_resolver, ClusterSlotUpdateCallBackSharedPtr factory) {
33
30
  absl::Status creation_status = absl::OkStatus();
34
30
  std::unique_ptr<RedisCluster> ret = absl::WrapUnique(new RedisCluster(
35
30
      cluster, redis_cluster, context, client_factory, dns_resolver, factory, creation_status));
36
30
  RETURN_IF_NOT_OK(creation_status);
37
30
  return ret;
38
30
}
39

            
40
RedisCluster::RedisCluster(
41
    const envoy::config::cluster::v3::Cluster& cluster,
42
    const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster,
43
    Upstream::ClusterFactoryContext& context,
44
    NetworkFilters::Common::Redis::Client::ClientFactory& redis_client_factory,
45
    Network::DnsResolverSharedPtr dns_resolver, ClusterSlotUpdateCallBackSharedPtr lb_factory,
46
    absl::Status& creation_status)
47
31
    : Upstream::BaseDynamicClusterImpl(cluster, context, creation_status),
48
31
      cluster_manager_(context.serverFactoryContext().clusterManager()),
49
31
      cluster_refresh_rate_(std::chrono::milliseconds(
50
31
          PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, cluster_refresh_rate, 5000))),
51
31
      cluster_refresh_timeout_(std::chrono::milliseconds(
52
31
          PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, cluster_refresh_timeout, 3000))),
53
31
      redirect_refresh_interval_(std::chrono::milliseconds(
54
31
          PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, redirect_refresh_interval, 5000))),
55
      redirect_refresh_threshold_(
56
31
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(redis_cluster, redirect_refresh_threshold, 5)),
57
31
      failure_refresh_threshold_(redis_cluster.failure_refresh_threshold()),
58
31
      host_degraded_refresh_threshold_(redis_cluster.host_degraded_refresh_threshold()),
59
31
      dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
60
31
      dns_resolver_(std::move(dns_resolver)),
61
31
      dns_lookup_family_(Upstream::getDnsLookupFamilyFromCluster(cluster)),
62
31
      load_assignment_(cluster.load_assignment()),
63
31
      local_info_(context.serverFactoryContext().localInfo()),
64
31
      random_(context.serverFactoryContext().api().randomGenerator()),
65
      redis_discovery_session_(
66
31
          std::make_shared<RedisDiscoverySession>(*this, redis_client_factory)),
67
31
      lb_factory_(std::move(lb_factory)),
68
31
      auth_username_(NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authUsername(
69
31
          info(), context.serverFactoryContext().api())),
70
31
      auth_password_(NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authPassword(
71
31
          info(), context.serverFactoryContext().api())),
72
31
      cluster_name_(cluster.name()), refresh_manager_(Common::Redis::getClusterRefreshManager(
73
31
                                         context.serverFactoryContext().singletonManager(),
74
31
                                         context.serverFactoryContext().mainThreadDispatcher(),
75
31
                                         context.serverFactoryContext().clusterManager(),
76
31
                                         context.serverFactoryContext().api().timeSource())),
77
31
      registration_handle_(nullptr) {
78
31
  const auto& locality_lb_endpoints = load_assignment_.endpoints();
79
31
  for (const auto& locality_lb_endpoint : locality_lb_endpoints) {
80
32
    for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
81
32
      const auto& host = lb_endpoint.endpoint().address();
82
32
      dns_discovery_resolve_targets_.emplace_back(new DnsDiscoveryResolveTarget(
83
32
          *this, host.socket_address().address(), host.socket_address().port_value()));
84
32
    }
85
31
  }
86

            
87
  // Register the cluster callback using weak_ptr to avoid use-after-free
88
31
  std::weak_ptr<RedisDiscoverySession> weak_session = redis_discovery_session_;
89
31
  registration_handle_ = refresh_manager_->registerCluster(
90
31
      cluster_name_, redirect_refresh_interval_, redirect_refresh_threshold_,
91
31
      failure_refresh_threshold_, host_degraded_refresh_threshold_, [weak_session]() {
92
        // Try to lock the weak pointer to ensure the session is still alive
93
2
        auto session = weak_session.lock();
94
2
        if (session && session->resolve_timer_) {
95
2
          session->resolve_timer_->enableTimer(std::chrono::milliseconds(0));
96
2
        }
97
2
      });
98
31
}
99

            
100
31
RedisCluster::~RedisCluster() {
101
  // Set flag to prevent any callbacks from executing during destruction
102
31
  is_destroying_.store(true);
103

            
104
  // Reset redis_discovery_session_ before other members are destroyed
105
  // to ensure any pending callbacks from refresh_manager_ don't access it.
106
  // This matches the approach in PR #39625.
107
31
  redis_discovery_session_.reset();
108

            
109
  // Also clear DNS discovery targets to prevent their callbacks from
110
  // accessing the destroyed cluster.
111
31
  dns_discovery_resolve_targets_.clear();
112
31
}
113

            
114
29
void RedisCluster::startPreInit() {
115
30
  for (const DnsDiscoveryResolveTargetPtr& target : dns_discovery_resolve_targets_) {
116
30
    target->startResolveDns();
117
30
  }
118
29
  if (!wait_for_warm_on_init_) {
119
1
    onPreInitComplete();
120
1
  }
121
29
}
122

            
123
void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added,
124
                                  const Upstream::HostVector& hosts_removed,
125
35
                                  uint32_t current_priority) {
126
35
  Upstream::PriorityStateManager priority_state_manager(*this, local_info_, nullptr);
127

            
128
35
  auto locality_lb_endpoint = localityLbEndpoint();
129
35
  priority_state_manager.initializePriorityFor(locality_lb_endpoint);
130
71
  for (const Upstream::HostSharedPtr& host : hosts_) {
131
71
    if (locality_lb_endpoint.priority() == current_priority) {
132
71
      priority_state_manager.registerHostForPriority(host, locality_lb_endpoint);
133
71
    }
134
71
  }
135

            
136
35
  priority_state_manager.updateClusterPrioritySet(
137
35
      current_priority, std::move(priority_state_manager.priorityState()[current_priority].first),
138
35
      hosts_added, hosts_removed, absl::nullopt, absl::nullopt, absl::nullopt);
139
35
}
140

            
141
40
void RedisCluster::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots) {
142
40
  Upstream::HostVector new_hosts;
143
40
  absl::flat_hash_set<std::string> all_new_hosts;
144

            
145
51
  for (const ClusterSlot& slot : *slots) {
146
51
    if (all_new_hosts.count(slot.primary()->asString()) == 0) {
147
51
      new_hosts.emplace_back(THROW_OR_RETURN_VALUE(
148
51
          RedisHost::create(info(), "", slot.primary(), *this, true), std::unique_ptr<RedisHost>));
149
51
      all_new_hosts.emplace(slot.primary()->asString());
150
51
    }
151
51
    for (auto const& replica : slot.replicas()) {
152
32
      if (all_new_hosts.count(replica.first) == 0) {
153
32
        new_hosts.emplace_back(
154
32
            THROW_OR_RETURN_VALUE(RedisHost::create(info(), "", replica.second, *this, false),
155
32
                                  std::unique_ptr<RedisHost>));
156
32
        all_new_hosts.emplace(replica.first);
157
32
      }
158
32
    }
159
51
  }
160

            
161
  // Get the map of all the latest existing hosts, which is used to filter out the existing
162
  // hosts in the process of updating cluster memberships.
163
40
  Upstream::HostMapConstSharedPtr all_hosts = priority_set_.crossPriorityHostMap();
164
40
  ASSERT(all_hosts != nullptr);
165

            
166
40
  Upstream::HostVector hosts_added;
167
40
  Upstream::HostVector hosts_removed;
168
40
  const bool host_updated = updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed,
169
40
                                                  *all_hosts, all_new_hosts);
170

            
171
  // Create a map containing all the latest hosts to determine whether the slots are updated.
172
40
  Upstream::HostMap updated_hosts = *all_hosts;
173
40
  for (const auto& host : hosts_removed) {
174
6
    updated_hosts.erase(host->address()->asString());
175
6
  }
176
53
  for (const auto& host : hosts_added) {
177
53
    updated_hosts[host->address()->asString()] = host;
178
53
  }
179

            
180
40
  const bool slot_updated =
181
40
      lb_factory_ ? lb_factory_->onClusterSlotUpdate(std::move(slots), updated_hosts) : false;
182

            
183
  // If slot is updated, call updateAllHosts regardless of if there's new hosts to force
184
  // update of the thread local load balancers.
185
40
  if (host_updated || slot_updated) {
186
35
    ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) {
187
35
      return host->priority() == localityLbEndpoint().priority();
188
35
    }));
189
35
    updateAllHosts(hosts_added, hosts_removed, localityLbEndpoint().priority());
190
35
  } else {
191
5
    info_->configUpdateStats().update_no_rebuild_.inc();
192
5
  }
193

            
194
  // TODO(hyang): If there is an initialize callback, fire it now. Note that if the
195
  // cluster refers to multiple DNS names, this will return initialized after a single
196
  // DNS resolution completes. This is not perfect but is easier to code and it is unclear
197
  // if the extra complexity is needed so will start with this.
198
40
  onPreInitComplete();
199
40
}
200

            
201
2
void RedisCluster::reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) {
202
2
  if (lb_factory_) {
203
2
    lb_factory_->onHostHealthUpdate();
204
2
  }
205
2
  if (host && (host->coarseHealth() == Upstream::Host::Health::Degraded ||
206
1
               host->coarseHealth() == Upstream::Host::Health::Unhealthy)) {
207
1
    refresh_manager_->onHostDegraded(cluster_name_);
208
1
  }
209
2
  ClusterImplBase::reloadHealthyHostsHelper(host);
210
2
}
211

            
212
// DnsDiscoveryResolveTarget
213
RedisCluster::DnsDiscoveryResolveTarget::DnsDiscoveryResolveTarget(RedisCluster& parent,
214
                                                                   const std::string& dns_address,
215
                                                                   const uint32_t port)
216
33
    : parent_(parent), dns_address_(dns_address), port_(port) {}
217

            
218
33
RedisCluster::DnsDiscoveryResolveTarget::~DnsDiscoveryResolveTarget() {
219
33
  if (active_query_) {
220
1
    active_query_->cancel(Network::ActiveDnsQuery::CancelReason::QueryAbandoned);
221
1
  }
222
  // Disable timer for mock tests.
223
33
  if (resolve_timer_ && resolve_timer_->enabled()) {
224
2
    resolve_timer_->disableTimer();
225
2
  }
226
33
}
227

            
228
33
void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() {
229
33
  ENVOY_LOG(trace, "starting async DNS resolution for {}", dns_address_);
230

            
231
33
  active_query_ = parent_.dns_resolver_->resolve(
232
33
      dns_address_, parent_.dns_lookup_family_,
233
33
      [this](Network::DnsResolver::ResolutionStatus status, absl::string_view,
234
33
             std::list<Network::DnsResponse>&& response) -> void {
235
32
        active_query_ = nullptr;
236
32
        ENVOY_LOG(trace, "async DNS resolution complete for {}", dns_address_);
237
32
        if (status == Network::DnsResolver::ResolutionStatus::Failure || response.empty()) {
238
4
          if (status == Network::DnsResolver::ResolutionStatus::Failure) {
239
1
            parent_.info_->configUpdateStats().update_failure_.inc();
240
3
          } else {
241
3
            parent_.info_->configUpdateStats().update_empty_.inc();
242
3
          }
243

            
244
4
          if (!resolve_timer_) {
245
2
            resolve_timer_ = parent_.dispatcher_.createTimer([this]() -> void {
246
              // Check if the parent cluster is being destroyed
247
2
              if (parent_.is_destroying_.load()) {
248
                return;
249
              }
250
2
              startResolveDns();
251
2
            });
252
2
          }
253
          // if the initial dns resolved to empty, we'll skip the redis discovery phase and
254
          // treat it as an empty cluster.
255
4
          parent_.onPreInitComplete();
256
4
          resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
257
28
        } else {
258
          // Once the DNS resolve the initial set of addresses, call startResolveRedis on
259
          // the RedisDiscoverySession. The RedisDiscoverySession will using the "cluster
260
          // slots" command for service discovery and slot allocation. All subsequent
261
          // discoveries are handled by RedisDiscoverySession and will not use DNS
262
          // resolution again.
263
28
          parent_.redis_discovery_session_->registerDiscoveryAddress(std::move(response), port_);
264
28
          parent_.redis_discovery_session_->startResolveRedis();
265
28
        }
266
32
      });
267
33
}
268

            
269
// RedisCluster
270
RedisCluster::RedisDiscoverySession::RedisDiscoverySession(
271
    Envoy::Extensions::Clusters::Redis::RedisCluster& parent,
272
    NetworkFilters::Common::Redis::Client::ClientFactory& client_factory)
273
32
    : parent_(parent), dispatcher_(parent.dispatcher_),
274
2088
      resolve_timer_(parent.dispatcher_.createTimer([this]() -> void {
275
        // Check if the parent cluster is being destroyed
276
2083
        if (parent_.is_destroying_.load()) {
277
          return;
278
        }
279
2083
        startResolveRedis();
280
2083
      })),
281
32
      client_factory_(client_factory), buffer_timeout_(0),
282
      redis_command_stats_(
283
32
          NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats(
284
32
              parent_.info()->statsScope().symbolTable())) {}
285

            
286
// Convert the cluster slot IP/Port response to an address, return null if the response
287
// does not match the expected type.
288
Network::Address::InstanceConstSharedPtr
289
RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ipAddressFromClusterEntry(
290
105
    const std::vector<NetworkFilters::Common::Redis::RespValue>& array) {
291
105
  return Network::Utility::parseInternetAddressNoThrow(array[0].asString(), array[1].asInteger(),
292
105
                                                       false);
293
105
}
294

            
295
32
RedisCluster::RedisDiscoverySession::~RedisDiscoverySession() {
296
32
  if (current_request_) {
297
1
    current_request_->cancel();
298
1
    current_request_ = nullptr;
299
1
  }
300
  // Disable timer for mock tests.
301
32
  if (resolve_timer_) {
302
32
    resolve_timer_->disableTimer();
303
32
  }
304

            
305
53
  while (!client_map_.empty()) {
306
21
    client_map_.begin()->second->client_->close();
307
21
  }
308
32
}
309

            
310
42
void RedisCluster::RedisDiscoveryClient::onEvent(Network::ConnectionEvent event) {
311
42
  if (event == Network::ConnectionEvent::RemoteClose ||
312
42
      event == Network::ConnectionEvent::LocalClose) {
313
33
    auto client_to_delete = parent_.client_map_.find(host_);
314
33
    ASSERT(client_to_delete != parent_.client_map_.end());
315
33
    parent_.dispatcher_.deferredDelete(std::move(client_to_delete->second->client_));
316
33
    parent_.client_map_.erase(client_to_delete);
317
33
  }
318
42
}
319

            
320
void RedisCluster::RedisDiscoverySession::registerDiscoveryAddress(
321
28
    std::list<Envoy::Network::DnsResponse>&& response, const uint32_t port) {
322
  // Since the address from DNS does not have port, we need to make a new address that has
323
  // port in it.
324
48
  for (const Network::DnsResponse& res : response) {
325
48
    const auto& addrinfo = res.addrInfo();
326
48
    ASSERT(addrinfo.address_ != nullptr);
327
48
    discovery_address_list_.push_back(
328
48
        Network::Utility::getAddressWithPort(*(addrinfo.address_), port));
329
48
  }
330
28
}
331

            
332
2111
void RedisCluster::RedisDiscoverySession::startResolveRedis() {
333
2111
  parent_.info_->configUpdateStats().update_attempt_.inc();
334
  // If a resolution is currently in progress, skip it.
335
2111
  if (current_request_) {
336
1
    ENVOY_LOG(debug, "redis cluster slot request is already in progress for '{}'",
337
1
              parent_.info_->name());
338
1
    return;
339
1
  }
340

            
341
  // If hosts is empty, we haven't received a successful result from the CLUSTER SLOTS call
342
  // yet. So, pick a random discovery address from dns and make a request.
343
2110
  Upstream::HostSharedPtr host;
344
2110
  if (parent_.hosts_.empty()) {
345
29
    const int rand_idx = parent_.random_.random() % discovery_address_list_.size();
346
29
    auto it = std::next(discovery_address_list_.begin(), rand_idx);
347
29
    host = Upstream::HostSharedPtr{THROW_OR_RETURN_VALUE(
348
29
        RedisHost::create(parent_.info(), "", *it, parent_, true), std::unique_ptr<RedisHost>)};
349
2086
  } else {
350
2081
    const int rand_idx = parent_.random_.random() % parent_.hosts_.size();
351
2081
    host = parent_.hosts_[rand_idx];
352
2081
  }
353

            
354
2110
  current_host_address_ = host->address()->asString();
355
2110
  RedisDiscoveryClientPtr& client = client_map_[current_host_address_];
356
2110
  if (!client) {
357
33
    client = std::make_unique<RedisDiscoveryClient>(*this);
358
33
    client->host_ = current_host_address_;
359
    // absl::nullopt here disables AWS IAM authentication in redis client which is not supported by
360
    // redis cluster implementation
361
33
    client->client_ = client_factory_.create(
362
33
        host, dispatcher_, shared_from_this(), redis_command_stats_, parent_.info()->statsScope(),
363
33
        parent_.auth_username_, parent_.auth_password_, false, absl::nullopt, absl::nullopt);
364
33
    client->client_->addConnectionCallbacks(*client);
365
33
  }
366
2110
  ENVOY_LOG(debug, "executing redis cluster slot request for '{}'", parent_.info_->name());
367
2110
  current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this);
368
2110
}
369

            
370
void RedisCluster::RedisDiscoverySession::updateDnsStats(
371
22
    Network::DnsResolver::ResolutionStatus status, bool empty_response) {
372
22
  if (status == Network::DnsResolver::ResolutionStatus::Failure) {
373
3
    parent_.info_->configUpdateStats().update_failure_.inc();
374
19
  } else if (empty_response) {
375
3
    parent_.info_->configUpdateStats().update_empty_.inc();
376
3
  }
377
22
}
378

            
379
/**
380
 * Resolve the primary cluster entry hostname in each slot.
381
 * If the primary is successfully resolved, we proceed to resolve replicas.
382
 * We use the count of hostnames that require resolution to decide when the resolution process is
383
 * completed, and then call the post-resolution hooks.
384
 *
385
 * If resolving any one of the primary replicas fails, we stop the resolution process and reset
386
 * the timers to retry the resolution. Failure to resolve a replica, on the other hand does not
387
 * stop the process. If we replica resolution fails, we simply log a warning, and move to resolving
388
 * the rest.
389
 *
390
 * @param slots the list of slots which may need DNS resolution
391
 * @param address_resolution_required_cnt the number of hostnames that need DNS resolution
392
 */
393
void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(
394
    ClusterSlotsSharedPtr&& slots,
395
12
    std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt) {
396
25
  for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) {
397
13
    auto& slot = (*slots)[slot_idx];
398
13
    if (slot.primary() == nullptr) {
399
12
      ENVOY_LOG(debug,
400
12
                "starting async DNS resolution for primary slot address {} at index location {}",
401
12
                slot.primary_hostname_, slot_idx);
402
12
      parent_.dns_resolver_->resolve(
403
12
          slot.primary_hostname_, parent_.dns_lookup_family_,
404
12
          [this, slot_idx, slots, hostname_resolution_required_cnt](
405
12
              Network::DnsResolver::ResolutionStatus status, absl::string_view,
406
12
              std::list<Network::DnsResponse>&& response) -> void {
407
12
            auto& slot = (*slots)[slot_idx];
408
12
            ENVOY_LOG(
409
12
                debug,
410
12
                "async DNS resolution complete for primary slot address {} at index location {}",
411
12
                slot.primary_hostname_, slot_idx);
412
12
            updateDnsStats(status, response.empty());
413
            // If DNS resolution for a primary fails, we stop resolution for remaining, and reset
414
            // the timer.
415
12
            if (status != Network::DnsResolver::ResolutionStatus::Completed) {
416
1
              ENVOY_LOG(error, "Unable to resolve cluster slot primary hostname {}",
417
1
                        slot.primary_hostname_);
418
1
              resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
419
1
              return;
420
1
            }
421
            // A successful query can return an empty response.
422
11
            if (response.empty()) {
423
1
              ENVOY_LOG(error, "DNS resolution for primary slot address {} returned no results",
424
1
                        slot.primary_hostname_);
425
1
              resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
426
1
              return;
427
1
            }
428
            // Primary slot address resolved
429
10
            slot.setPrimary(Network::Utility::getAddressWithPort(
430
10
                *response.front().addrInfo().address_, slot.primary_port_));
431
10
            (*hostname_resolution_required_cnt)--;
432
            // Continue on to resolve replicas
433
10
            resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt);
434
10
          });
435
12
    } else {
436
1
      resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt);
437
1
    }
438
13
  }
439
12
}
440

            
441
/**
442
 * Resolve the replicas in a cluster entry. If there are no replicas, simply return.
443
 * If all the hostnames have been resolved, call post-resolution methods.
444
 * Failure to resolve a replica does not stop the overall resolution process. We log a
445
 * warning, and move to the next one.
446
 *
447
 * @param slots the list of slots which may need DNS resolution
448
 * @param index the specific index into `slots` whose replicas need to be resolved
449
 * @param address_resolution_required_cnt the number of address that need to be resolved
450
 */
451
void RedisCluster::RedisDiscoverySession::resolveReplicas(
452
    ClusterSlotsSharedPtr slots, std::size_t index,
453
11
    std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt) {
454
11
  auto& slot = (*slots)[index];
455
11
  if (slot.replicas_to_resolve_.empty()) {
456
3
    if (*hostname_resolution_required_cnt == 0) {
457
2
      finishClusterHostnameResolution(slots);
458
2
    }
459
3
    return;
460
3
  }
461

            
462
18
  for (uint64_t replica_idx = 0; replica_idx < slot.replicas_to_resolve_.size(); replica_idx++) {
463
10
    auto replica = slot.replicas_to_resolve_[replica_idx];
464
10
    ENVOY_LOG(debug, "starting async DNS resolution for replica address {}", replica.first);
465
10
    parent_.dns_resolver_->resolve(
466
10
        replica.first, parent_.dns_lookup_family_,
467
10
        [this, index, slots, replica_idx, hostname_resolution_required_cnt](
468
10
            Network::DnsResolver::ResolutionStatus status, absl::string_view,
469
10
            std::list<Network::DnsResponse>&& response) -> void {
470
10
          auto& slot = (*slots)[index];
471
10
          auto& replica = slot.replicas_to_resolve_[replica_idx];
472
10
          ENVOY_LOG(debug, "async DNS resolution complete for replica address {}", replica.first);
473
10
          updateDnsStats(status, response.empty());
474
          // If DNS resolution fails here, we move on to resolve other replicas in the list.
475
          // We log a warn message.
476
10
          if (status != Network::DnsResolver::ResolutionStatus::Completed) {
477
2
            ENVOY_LOG(warn, "Unable to resolve cluster replica address {}", replica.first);
478
8
          } else if (response.empty()) {
479
            // A successful query can return an empty response.
480
2
            ENVOY_LOG(warn, "DNS resolution for cluster replica address {} returned no results",
481
2
                      replica.first);
482
6
          } else {
483
            // Replica resolved
484
6
            slot.addReplica(Network::Utility::getAddressWithPort(
485
6
                *response.front().addrInfo().address_, replica.second));
486
6
          }
487
10
          (*hostname_resolution_required_cnt)--;
488
          // finish resolution if all the addresses have been resolved.
489
10
          if (*hostname_resolution_required_cnt <= 0) {
490
8
            finishClusterHostnameResolution(slots);
491
8
          }
492
10
        });
493
10
  }
494
8
}
495

            
496
void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution(
497
10
    ClusterSlotsSharedPtr slots) {
498
10
  parent_.onClusterSlotUpdate(std::move(slots));
499
10
  resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
500
10
}
501

            
502
void RedisCluster::RedisDiscoverySession::onResponse(
503
2104
    NetworkFilters::Common::Redis::RespValuePtr&& value) {
504
2104
  ENVOY_LOG(debug, "redis cluster slot request for '{}' succeeded", parent_.info_->name());
505
2104
  current_request_ = nullptr;
506

            
507
2104
  const uint32_t SlotRangeStart = 0;
508
2104
  const uint32_t SlotRangeEnd = 1;
509
2104
  const uint32_t SlotPrimary = 2;
510
2104
  const uint32_t SlotReplicaStart = 3;
511

            
512
  // Do nothing if the cluster is empty.
513
2104
  if (value->type() != NetworkFilters::Common::Redis::RespType::Array || value->asArray().empty()) {
514
1536
    onUnexpectedResponse(value);
515
1536
    return;
516
1536
  }
517

            
518
568
  auto cluster_slots = std::make_shared<std::vector<ClusterSlot>>();
519

            
520
  // https://redis.io/commands/cluster-slots
521
  // CLUSTER SLOTS represents nested array of redis instances, like this:
522
  //
523
  // 1) 1) (integer) 0                                      <-- start slot range
524
  //    2) (integer) 5460                                   <-- end slot range
525
  //
526
  //    3) 1) "127.0.0.1"                                   <-- primary slot IP ADDR(HOSTNAME)
527
  //       2) (integer) 30001                               <-- primary slot PORT
528
  //       3) "09dbe9720cda62f7865eabc5fd8857c5d2678366"
529
  //
530
  //    4) 1) "127.0.0.2"                                   <-- replica slot IP ADDR(HOSTNAME)
531
  //       2) (integer) 30004                               <-- replica slot PORT
532
  //       3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf"
533
  //
534
  // Loop through the cluster slot response and error checks for each field.
535
568
  auto hostname_resolution_required_cnt = std::make_shared<std::uint64_t>(0);
536
579
  for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) {
537
579
    if (part.type() != NetworkFilters::Common::Redis::RespType::Array) {
538
257
      onUnexpectedResponse(value);
539
257
      return;
540
257
    }
541

            
542
    // Row 1-2: Slot ranges
543
322
    const std::vector<NetworkFilters::Common::Redis::RespValue>& slot_range = part.asArray();
544
322
    if (slot_range.size() < 3 ||
545
322
        slot_range[SlotRangeStart].type() !=
546
194
            NetworkFilters::Common::Redis::RespType::Integer || // Start slot range is an
547
                                                                // integer.
548
322
        slot_range[SlotRangeEnd].type() !=
549
232
            NetworkFilters::Common::Redis::RespType::Integer) { // End slot range is an
550
                                                                // integer.
551
224
      onUnexpectedResponse(value);
552
224
      return;
553
224
    }
554

            
555
    // Row 3: Primary slot address
556
98
    if (!validateCluster(slot_range[SlotPrimary])) {
557
31
      onUnexpectedResponse(value);
558
31
      return;
559
31
    }
560
    // Try to parse primary slot address as IP address
561
    // It may fail in case the address is a hostname. If this is the case - we'll come back later
562
    // and try to resolve hostnames asynchronously. For example, AWS ElastiCache returns hostname
563
    // instead of IP address.
564
67
    ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(),
565
67
                     ipAddressFromClusterEntry(slot_range[SlotPrimary].asArray()));
566
67
    if (slot.primary() == nullptr) {
567
      // Primary address is potentially a hostname, save it for async DNS resolution.
568
12
      const auto& array = slot_range[SlotPrimary].asArray();
569
12
      slot.primary_hostname_ = array[0].asString();
570
12
      slot.primary_port_ = array[1].asInteger();
571
12
      (*hostname_resolution_required_cnt)++;
572
12
    }
573

            
574
    // Row 4-N: Replica(s) addresses
575
67
    for (auto replica = std::next(slot_range.begin(), SlotReplicaStart);
576
105
         replica != slot_range.end(); ++replica) {
577
52
      if (!validateCluster(*replica)) {
578
14
        onUnexpectedResponse(value);
579
14
        return;
580
14
      }
581
38
      auto replica_address = ipAddressFromClusterEntry(replica->asArray());
582
38
      if (replica_address) {
583
26
        slot.addReplica(std::move(replica_address));
584
26
      } else {
585
        // Replica address is potentially a hostname, save it for async DNS resolution.
586
12
        const auto& array = replica->asArray();
587
12
        slot.addReplicaToResolve(array[0].asString(), array[1].asInteger());
588
12
        (*hostname_resolution_required_cnt)++;
589
12
      }
590
38
    }
591
53
    cluster_slots->push_back(std::move(slot));
592
53
  }
593

            
594
42
  if (*hostname_resolution_required_cnt > 0) {
595
    // DNS resolution is required, defer finalizing the slot update until resolution is complete.
596
12
    resolveClusterHostnames(std::move(cluster_slots), hostname_resolution_required_cnt);
597
30
  } else {
598
    // All slots addresses were represented by IP/Port pairs.
599
30
    parent_.onClusterSlotUpdate(std::move(cluster_slots));
600
30
    resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
601
30
  }
602
42
}
603

            
604
// Ensure that Slot Cluster response has valid format
605
bool RedisCluster::RedisDiscoverySession::validateCluster(
606
150
    const NetworkFilters::Common::Redis::RespValue& value) {
607
  // Verify data types
608
150
  if (value.type() != NetworkFilters::Common::Redis::RespType::Array) {
609
23
    return false;
610
23
  }
611
127
  const auto& array = value.asArray();
612
127
  if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString ||
613
127
      array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) {
614
20
    return false;
615
20
  }
616
  // Verify IP/Host address
617
107
  if (array[0].asString().empty()) {
618
2
    return false;
619
2
  }
620
  // Verify port
621
105
  if (array[1].asInteger() > 0xffff) {
622
    return false;
623
  }
624

            
625
105
  return true;
626
105
}
627

            
628
void RedisCluster::RedisDiscoverySession::onUnexpectedResponse(
629
2062
    const NetworkFilters::Common::Redis::RespValuePtr& value) {
630
2062
  ENVOY_LOG(warn, "Unexpected response to cluster slot command: {}", value->toString());
631
2062
  this->parent_.info_->configUpdateStats().update_failure_.inc();
632
2062
  resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
633
2062
}
634

            
635
5
void RedisCluster::RedisDiscoverySession::onFailure() {
636
5
  ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", parent_.info_->name());
637
5
  current_request_ = nullptr;
638
5
  if (!current_host_address_.empty()) {
639
5
    auto client_to_delete = client_map_.find(current_host_address_);
640
5
    client_to_delete->second->client_->close();
641
5
  }
642
5
  parent_.info()->configUpdateStats().update_failure_.inc();
643
5
  resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
644
5
}
645

            
646
RedisCluster::ClusterSlotsRequest RedisCluster::ClusterSlotsRequest::instance_;
647

            
648
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
649
RedisClusterFactory::createClusterWithConfig(
650
    const envoy::config::cluster::v3::Cluster& cluster,
651
    const envoy::extensions::clusters::redis::v3::RedisClusterConfig& proto_config,
652
9
    Upstream::ClusterFactoryContext& context) {
653
9
  if (!cluster.has_cluster_type() || cluster.cluster_type().name() != "envoy.clusters.redis") {
654
1
    return absl::InvalidArgumentError("Redis cluster can only created with redis cluster type.");
655
1
  }
656
8
  auto resolver =
657
8
      THROW_OR_RETURN_VALUE(selectDnsResolver(cluster, context), Network::DnsResolverSharedPtr);
658
  // TODO(hyang): This is needed to migrate existing cluster, disallow using other lb_policy
659
  // in the future
660
8
  absl::Status creation_status = absl::OkStatus();
661
8
  if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
662
1
    auto ret =
663
1
        std::make_pair(std::shared_ptr<RedisCluster>(new RedisCluster(
664
1
                           cluster, proto_config, context,
665
1
                           NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_,
666
1
                           resolver, nullptr, creation_status)),
667
1
                       nullptr);
668
1
    RETURN_IF_NOT_OK(creation_status);
669
1
    return ret;
670
1
  }
671
7
  auto lb_factory = std::make_shared<RedisClusterLoadBalancerFactory>(
672
7
      context.serverFactoryContext().api().randomGenerator());
673
7
  absl::StatusOr<std::unique_ptr<RedisCluster>> cluster_or_error = RedisCluster::create(
674
7
      cluster, proto_config, context,
675
7
      NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_, resolver, lb_factory);
676
7
  RETURN_IF_NOT_OK(cluster_or_error.status());
677
7
  return std::make_pair(std::shared_ptr<RedisCluster>(std::move(*cluster_or_error)),
678
7
                        std::make_unique<RedisClusterThreadAwareLoadBalancer>(lb_factory));
679
7
}
680

            
681
REGISTER_FACTORY(RedisClusterFactory, Upstream::ClusterFactory);
682

            
683
} // namespace Redis
684
} // namespace Clusters
685
} // namespace Extensions
686
} // namespace Envoy