Line data Source code
1 : #include "redis_cluster.h"
2 :
3 : #include <cstdint>
4 : #include <memory>
5 :
6 : #include "envoy/config/cluster/v3/cluster.pb.h"
7 : #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h"
8 : #include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h"
9 : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
10 : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h"
11 :
12 : namespace Envoy {
13 : namespace Extensions {
14 : namespace Clusters {
15 : namespace Redis {
16 :
17 : RedisCluster::RedisCluster(
18 : const envoy::config::cluster::v3::Cluster& cluster,
19 : const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster,
20 : Upstream::ClusterFactoryContext& context,
21 : NetworkFilters::Common::Redis::Client::ClientFactory& redis_client_factory,
22 : Network::DnsResolverSharedPtr dns_resolver, ClusterSlotUpdateCallBackSharedPtr lb_factory)
23 : : Upstream::BaseDynamicClusterImpl(cluster, context),
24 : cluster_manager_(context.clusterManager()),
25 : cluster_refresh_rate_(std::chrono::milliseconds(
26 : PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, cluster_refresh_rate, 5000))),
27 : cluster_refresh_timeout_(std::chrono::milliseconds(
28 : PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, cluster_refresh_timeout, 3000))),
29 : redirect_refresh_interval_(std::chrono::milliseconds(
30 : PROTOBUF_GET_MS_OR_DEFAULT(redis_cluster, redirect_refresh_interval, 5000))),
31 : redirect_refresh_threshold_(
32 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(redis_cluster, redirect_refresh_threshold, 5)),
33 : failure_refresh_threshold_(redis_cluster.failure_refresh_threshold()),
34 : host_degraded_refresh_threshold_(redis_cluster.host_degraded_refresh_threshold()),
35 : dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
36 : dns_resolver_(std::move(dns_resolver)),
37 : dns_lookup_family_(Upstream::getDnsLookupFamilyFromCluster(cluster)),
38 : load_assignment_(cluster.load_assignment()),
39 : local_info_(context.serverFactoryContext().localInfo()),
40 : random_(context.serverFactoryContext().api().randomGenerator()),
41 : redis_discovery_session_(*this, redis_client_factory), lb_factory_(std::move(lb_factory)),
42 : auth_username_(NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authUsername(
43 : info(), context.serverFactoryContext().api())),
44 : auth_password_(NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authPassword(
45 : info(), context.serverFactoryContext().api())),
46 : cluster_name_(cluster.name()),
47 : refresh_manager_(Common::Redis::getClusterRefreshManager(
48 : context.serverFactoryContext().singletonManager(),
49 : context.serverFactoryContext().mainThreadDispatcher(), context.clusterManager(),
50 : context.serverFactoryContext().api().timeSource())),
51 : registration_handle_(refresh_manager_->registerCluster(
52 : cluster_name_, redirect_refresh_interval_, redirect_refresh_threshold_,
53 0 : failure_refresh_threshold_, host_degraded_refresh_threshold_, [&]() {
54 0 : redis_discovery_session_.resolve_timer_->enableTimer(std::chrono::milliseconds(0));
55 0 : })) {
56 0 : const auto& locality_lb_endpoints = load_assignment_.endpoints();
57 0 : for (const auto& locality_lb_endpoint : locality_lb_endpoints) {
58 0 : for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
59 0 : const auto& host = lb_endpoint.endpoint().address();
60 0 : dns_discovery_resolve_targets_.emplace_back(new DnsDiscoveryResolveTarget(
61 0 : *this, host.socket_address().address(), host.socket_address().port_value()));
62 0 : }
63 0 : }
64 0 : }
65 :
66 0 : void RedisCluster::startPreInit() {
67 0 : for (const DnsDiscoveryResolveTargetPtr& target : dns_discovery_resolve_targets_) {
68 0 : target->startResolveDns();
69 0 : }
70 0 : if (!wait_for_warm_on_init_) {
71 0 : onPreInitComplete();
72 0 : }
73 0 : }
74 :
75 : void RedisCluster::updateAllHosts(const Upstream::HostVector& hosts_added,
76 : const Upstream::HostVector& hosts_removed,
77 0 : uint32_t current_priority) {
78 0 : Upstream::PriorityStateManager priority_state_manager(*this, local_info_, nullptr);
79 :
80 0 : auto locality_lb_endpoint = localityLbEndpoint();
81 0 : priority_state_manager.initializePriorityFor(locality_lb_endpoint);
82 0 : for (const Upstream::HostSharedPtr& host : hosts_) {
83 0 : if (locality_lb_endpoint.priority() == current_priority) {
84 0 : priority_state_manager.registerHostForPriority(host, locality_lb_endpoint);
85 0 : }
86 0 : }
87 :
88 0 : priority_state_manager.updateClusterPrioritySet(
89 0 : current_priority, std::move(priority_state_manager.priorityState()[current_priority].first),
90 0 : hosts_added, hosts_removed, absl::nullopt, absl::nullopt, absl::nullopt);
91 0 : }
92 :
93 0 : void RedisCluster::onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots) {
94 0 : Upstream::HostVector new_hosts;
95 0 : absl::flat_hash_set<std::string> all_new_hosts;
96 :
97 0 : for (const ClusterSlot& slot : *slots) {
98 0 : if (all_new_hosts.count(slot.primary()->asString()) == 0) {
99 0 : new_hosts.emplace_back(new RedisHost(info(), "", slot.primary(), *this, true, time_source_));
100 0 : all_new_hosts.emplace(slot.primary()->asString());
101 0 : }
102 0 : for (auto const& replica : slot.replicas()) {
103 0 : if (all_new_hosts.count(replica.first) == 0) {
104 0 : new_hosts.emplace_back(
105 0 : new RedisHost(info(), "", replica.second, *this, false, time_source_));
106 0 : all_new_hosts.emplace(replica.first);
107 0 : }
108 0 : }
109 0 : }
110 :
111 : // Get the map of all the latest existing hosts, which is used to filter out the existing
112 : // hosts in the process of updating cluster memberships.
113 0 : Upstream::HostMapConstSharedPtr all_hosts = priority_set_.crossPriorityHostMap();
114 0 : ASSERT(all_hosts != nullptr);
115 :
116 0 : Upstream::HostVector hosts_added;
117 0 : Upstream::HostVector hosts_removed;
118 0 : const bool host_updated = updateDynamicHostList(new_hosts, hosts_, hosts_added, hosts_removed,
119 0 : *all_hosts, all_new_hosts);
120 :
121 : // Create a map containing all the latest hosts to determine whether the slots are updated.
122 0 : Upstream::HostMap updated_hosts = *all_hosts;
123 0 : for (const auto& host : hosts_removed) {
124 0 : updated_hosts.erase(host->address()->asString());
125 0 : }
126 0 : for (const auto& host : hosts_added) {
127 0 : updated_hosts[host->address()->asString()] = host;
128 0 : }
129 :
130 0 : const bool slot_updated =
131 0 : lb_factory_ ? lb_factory_->onClusterSlotUpdate(std::move(slots), updated_hosts) : false;
132 :
133 : // If slot is updated, call updateAllHosts regardless of if there's new hosts to force
134 : // update of the thread local load balancers.
135 0 : if (host_updated || slot_updated) {
136 0 : ASSERT(std::all_of(hosts_.begin(), hosts_.end(), [&](const auto& host) {
137 0 : return host->priority() == localityLbEndpoint().priority();
138 0 : }));
139 0 : updateAllHosts(hosts_added, hosts_removed, localityLbEndpoint().priority());
140 0 : } else {
141 0 : info_->configUpdateStats().update_no_rebuild_.inc();
142 0 : }
143 :
144 : // TODO(hyang): If there is an initialize callback, fire it now. Note that if the
145 : // cluster refers to multiple DNS names, this will return initialized after a single
146 : // DNS resolution completes. This is not perfect but is easier to code and it is unclear
147 : // if the extra complexity is needed so will start with this.
148 0 : onPreInitComplete();
149 0 : }
150 :
151 0 : void RedisCluster::reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) {
152 0 : if (lb_factory_) {
153 0 : lb_factory_->onHostHealthUpdate();
154 0 : }
155 0 : if (host && (host->coarseHealth() == Upstream::Host::Health::Degraded ||
156 0 : host->coarseHealth() == Upstream::Host::Health::Unhealthy)) {
157 0 : refresh_manager_->onHostDegraded(cluster_name_);
158 0 : }
159 0 : ClusterImplBase::reloadHealthyHostsHelper(host);
160 0 : }
161 :
162 : // DnsDiscoveryResolveTarget
163 : RedisCluster::DnsDiscoveryResolveTarget::DnsDiscoveryResolveTarget(RedisCluster& parent,
164 : const std::string& dns_address,
165 : const uint32_t port)
166 0 : : parent_(parent), dns_address_(dns_address), port_(port) {}
167 :
168 0 : RedisCluster::DnsDiscoveryResolveTarget::~DnsDiscoveryResolveTarget() {
169 0 : if (active_query_) {
170 0 : active_query_->cancel(Network::ActiveDnsQuery::CancelReason::QueryAbandoned);
171 0 : }
172 : // Disable timer for mock tests.
173 0 : if (resolve_timer_) {
174 0 : resolve_timer_->disableTimer();
175 0 : }
176 0 : }
177 :
178 0 : void RedisCluster::DnsDiscoveryResolveTarget::startResolveDns() {
179 0 : ENVOY_LOG(trace, "starting async DNS resolution for {}", dns_address_);
180 :
181 0 : active_query_ = parent_.dns_resolver_->resolve(
182 0 : dns_address_, parent_.dns_lookup_family_,
183 0 : [this](Network::DnsResolver::ResolutionStatus status,
184 0 : std::list<Network::DnsResponse>&& response) -> void {
185 0 : active_query_ = nullptr;
186 0 : ENVOY_LOG(trace, "async DNS resolution complete for {}", dns_address_);
187 0 : if (status == Network::DnsResolver::ResolutionStatus::Failure || response.empty()) {
188 0 : if (status == Network::DnsResolver::ResolutionStatus::Failure) {
189 0 : parent_.info_->configUpdateStats().update_failure_.inc();
190 0 : } else {
191 0 : parent_.info_->configUpdateStats().update_empty_.inc();
192 0 : }
193 :
194 0 : if (!resolve_timer_) {
195 0 : resolve_timer_ =
196 0 : parent_.dispatcher_.createTimer([this]() -> void { startResolveDns(); });
197 0 : }
198 : // if the initial dns resolved to empty, we'll skip the redis discovery phase and
199 : // treat it as an empty cluster.
200 0 : parent_.onPreInitComplete();
201 0 : resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
202 0 : } else {
203 : // Once the DNS resolve the initial set of addresses, call startResolveRedis on
204 : // the RedisDiscoverySession. The RedisDiscoverySession will using the "cluster
205 : // slots" command for service discovery and slot allocation. All subsequent
206 : // discoveries are handled by RedisDiscoverySession and will not use DNS
207 : // resolution again.
208 0 : parent_.redis_discovery_session_.registerDiscoveryAddress(std::move(response), port_);
209 0 : parent_.redis_discovery_session_.startResolveRedis();
210 0 : }
211 0 : });
212 0 : }
213 :
214 : // RedisCluster
215 : RedisCluster::RedisDiscoverySession::RedisDiscoverySession(
216 : Envoy::Extensions::Clusters::Redis::RedisCluster& parent,
217 : NetworkFilters::Common::Redis::Client::ClientFactory& client_factory)
218 : : parent_(parent), dispatcher_(parent.dispatcher_),
219 0 : resolve_timer_(parent.dispatcher_.createTimer([this]() -> void { startResolveRedis(); })),
220 : client_factory_(client_factory), buffer_timeout_(0),
221 : redis_command_stats_(
222 : NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats(
223 0 : parent_.info()->statsScope().symbolTable())) {}
224 :
225 : // Convert the cluster slot IP/Port response to an address, return null if the response
226 : // does not match the expected type.
227 : Network::Address::InstanceConstSharedPtr
228 : RedisCluster::RedisDiscoverySession::RedisDiscoverySession::ipAddressFromClusterEntry(
229 0 : const std::vector<NetworkFilters::Common::Redis::RespValue>& array) {
230 0 : return Network::Utility::parseInternetAddressNoThrow(array[0].asString(), array[1].asInteger(),
231 0 : false);
232 0 : }
233 :
234 0 : RedisCluster::RedisDiscoverySession::~RedisDiscoverySession() {
235 0 : if (current_request_) {
236 0 : current_request_->cancel();
237 0 : current_request_ = nullptr;
238 0 : }
239 : // Disable timer for mock tests.
240 0 : if (resolve_timer_) {
241 0 : resolve_timer_->disableTimer();
242 0 : }
243 :
244 0 : while (!client_map_.empty()) {
245 0 : client_map_.begin()->second->client_->close();
246 0 : }
247 0 : }
248 :
249 0 : void RedisCluster::RedisDiscoveryClient::onEvent(Network::ConnectionEvent event) {
250 0 : if (event == Network::ConnectionEvent::RemoteClose ||
251 0 : event == Network::ConnectionEvent::LocalClose) {
252 0 : auto client_to_delete = parent_.client_map_.find(host_);
253 0 : ASSERT(client_to_delete != parent_.client_map_.end());
254 0 : parent_.dispatcher_.deferredDelete(std::move(client_to_delete->second->client_));
255 0 : parent_.client_map_.erase(client_to_delete);
256 0 : }
257 0 : }
258 :
259 : void RedisCluster::RedisDiscoverySession::registerDiscoveryAddress(
260 0 : std::list<Envoy::Network::DnsResponse>&& response, const uint32_t port) {
261 : // Since the address from DNS does not have port, we need to make a new address that has
262 : // port in it.
263 0 : for (const Network::DnsResponse& res : response) {
264 0 : const auto& addrinfo = res.addrInfo();
265 0 : ASSERT(addrinfo.address_ != nullptr);
266 0 : discovery_address_list_.push_back(
267 0 : Network::Utility::getAddressWithPort(*(addrinfo.address_), port));
268 0 : }
269 0 : }
270 :
271 0 : void RedisCluster::RedisDiscoverySession::startResolveRedis() {
272 0 : parent_.info_->configUpdateStats().update_attempt_.inc();
273 : // If a resolution is currently in progress, skip it.
274 0 : if (current_request_) {
275 0 : ENVOY_LOG(debug, "redis cluster slot request is already in progress for '{}'",
276 0 : parent_.info_->name());
277 0 : return;
278 0 : }
279 :
280 : // If hosts is empty, we haven't received a successful result from the CLUSTER SLOTS call
281 : // yet. So, pick a random discovery address from dns and make a request.
282 0 : Upstream::HostSharedPtr host;
283 0 : if (parent_.hosts_.empty()) {
284 0 : const int rand_idx = parent_.random_.random() % discovery_address_list_.size();
285 0 : auto it = std::next(discovery_address_list_.begin(), rand_idx);
286 0 : host = Upstream::HostSharedPtr{
287 0 : new RedisHost(parent_.info(), "", *it, parent_, true, parent_.timeSource())};
288 0 : } else {
289 0 : const int rand_idx = parent_.random_.random() % parent_.hosts_.size();
290 0 : host = parent_.hosts_[rand_idx];
291 0 : }
292 :
293 0 : current_host_address_ = host->address()->asString();
294 0 : RedisDiscoveryClientPtr& client = client_map_[current_host_address_];
295 0 : if (!client) {
296 0 : client = std::make_unique<RedisDiscoveryClient>(*this);
297 0 : client->host_ = current_host_address_;
298 0 : client->client_ = client_factory_.create(host, dispatcher_, *this, redis_command_stats_,
299 0 : parent_.info()->statsScope(), parent_.auth_username_,
300 0 : parent_.auth_password_, false);
301 0 : client->client_->addConnectionCallbacks(*client);
302 0 : }
303 0 : ENVOY_LOG(debug, "executing redis cluster slot request for '{}'", parent_.info_->name());
304 0 : current_request_ = client->client_->makeRequest(ClusterSlotsRequest::instance_, *this);
305 0 : }
306 :
307 : void RedisCluster::RedisDiscoverySession::updateDnsStats(
308 0 : Network::DnsResolver::ResolutionStatus status, bool empty_response) {
309 0 : if (status == Network::DnsResolver::ResolutionStatus::Failure) {
310 0 : parent_.info_->configUpdateStats().update_failure_.inc();
311 0 : } else if (empty_response) {
312 0 : parent_.info_->configUpdateStats().update_empty_.inc();
313 0 : }
314 0 : }
315 :
316 : /**
317 : * Resolve the primary cluster entry hostname in each slot.
318 : * If the primary is successfully resolved, we proceed to resolve replicas.
319 : * We use the count of hostnames that require resolution to decide when the resolution process is
320 : * completed, and then call the post-resolution hooks.
321 : *
322 : * If resolving any one of the primary replicas fails, we stop the resolution process and reset
323 : * the timers to retry the resolution. Failure to resolve a replica, on the other hand does not
324 : * stop the process. If we replica resolution fails, we simply log a warning, and move to resolving
325 : * the rest.
326 : *
327 : * @param slots the list of slots which may need DNS resolution
328 : * @param address_resolution_required_cnt the number of hostnames that need DNS resolution
329 : */
330 : void RedisCluster::RedisDiscoverySession::resolveClusterHostnames(
331 : ClusterSlotsSharedPtr&& slots,
332 0 : std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt) {
333 0 : for (uint64_t slot_idx = 0; slot_idx < slots->size(); slot_idx++) {
334 0 : auto& slot = (*slots)[slot_idx];
335 0 : if (slot.primary() == nullptr) {
336 0 : ENVOY_LOG(debug,
337 0 : "starting async DNS resolution for primary slot address {} at index location {}",
338 0 : slot.primary_hostname_, slot_idx);
339 0 : parent_.dns_resolver_->resolve(
340 0 : slot.primary_hostname_, parent_.dns_lookup_family_,
341 0 : [this, slot_idx, slots,
342 0 : hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status,
343 0 : std::list<Network::DnsResponse>&& response) -> void {
344 0 : auto& slot = (*slots)[slot_idx];
345 0 : ENVOY_LOG(
346 0 : debug,
347 0 : "async DNS resolution complete for primary slot address {} at index location {}",
348 0 : slot.primary_hostname_, slot_idx);
349 0 : updateDnsStats(status, response.empty());
350 : // If DNS resolution for a primary fails, we stop resolution for remaining, and reset
351 : // the timer.
352 0 : if (status != Network::DnsResolver::ResolutionStatus::Success) {
353 0 : ENVOY_LOG(error, "Unable to resolve cluster slot primary hostname {}",
354 0 : slot.primary_hostname_);
355 0 : resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
356 0 : return;
357 0 : }
358 : // Primary slot address resolved
359 0 : slot.setPrimary(Network::Utility::getAddressWithPort(
360 0 : *response.front().addrInfo().address_, slot.primary_port_));
361 0 : (*hostname_resolution_required_cnt)--;
362 : // Continue on to resolve replicas
363 0 : resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt);
364 0 : });
365 0 : } else {
366 0 : resolveReplicas(slots, slot_idx, hostname_resolution_required_cnt);
367 0 : }
368 0 : }
369 0 : }
370 :
371 : /**
372 : * Resolve the replicas in a cluster entry. If there are no replicas, simply return.
373 : * If all the hostnames have been resolved, call post-resolution methods.
374 : * Failure to resolve a replica does not stop the overall resolution process. We log a
375 : * warning, and move to the next one.
376 : *
377 : * @param slots the list of slots which may need DNS resolution
378 : * @param index the specific index into `slots` whose replicas need to be resolved
379 : * @param address_resolution_required_cnt the number of address that need to be resolved
380 : */
381 : void RedisCluster::RedisDiscoverySession::resolveReplicas(
382 : ClusterSlotsSharedPtr slots, std::size_t index,
383 0 : std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt) {
384 0 : auto& slot = (*slots)[index];
385 0 : if (slot.replicas_to_resolve_.empty()) {
386 0 : if (*hostname_resolution_required_cnt == 0) {
387 0 : finishClusterHostnameResolution(slots);
388 0 : }
389 0 : return;
390 0 : }
391 :
392 0 : for (uint64_t replica_idx = 0; replica_idx < slot.replicas_to_resolve_.size(); replica_idx++) {
393 0 : auto replica = slot.replicas_to_resolve_[replica_idx];
394 0 : ENVOY_LOG(debug, "starting async DNS resolution for replica address {}", replica.first);
395 0 : parent_.dns_resolver_->resolve(
396 0 : replica.first, parent_.dns_lookup_family_,
397 0 : [this, index, slots, replica_idx,
398 0 : hostname_resolution_required_cnt](Network::DnsResolver::ResolutionStatus status,
399 0 : std::list<Network::DnsResponse>&& response) -> void {
400 0 : auto& slot = (*slots)[index];
401 0 : auto& replica = slot.replicas_to_resolve_[replica_idx];
402 0 : ENVOY_LOG(debug, "async DNS resolution complete for replica address {}", replica.first);
403 0 : updateDnsStats(status, response.empty());
404 : // If DNS resolution fails here, we move on to resolve other replicas in the list.
405 : // We log a warn message.
406 0 : if (status != Network::DnsResolver::ResolutionStatus::Success) {
407 0 : ENVOY_LOG(warn, "Unable to resolve cluster replica address {}", replica.first);
408 0 : } else {
409 : // Replica resolved
410 0 : slot.addReplica(Network::Utility::getAddressWithPort(
411 0 : *response.front().addrInfo().address_, replica.second));
412 0 : }
413 0 : (*hostname_resolution_required_cnt)--;
414 : // finish resolution if all the addresses have been resolved.
415 0 : if (*hostname_resolution_required_cnt <= 0) {
416 0 : finishClusterHostnameResolution(slots);
417 0 : }
418 0 : });
419 0 : }
420 0 : }
421 :
422 : void RedisCluster::RedisDiscoverySession::finishClusterHostnameResolution(
423 0 : ClusterSlotsSharedPtr slots) {
424 0 : parent_.onClusterSlotUpdate(std::move(slots));
425 0 : resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
426 0 : }
427 :
428 : void RedisCluster::RedisDiscoverySession::onResponse(
429 0 : NetworkFilters::Common::Redis::RespValuePtr&& value) {
430 0 : ENVOY_LOG(debug, "redis cluster slot request for '{}' succeeded", parent_.info_->name());
431 0 : current_request_ = nullptr;
432 :
433 0 : const uint32_t SlotRangeStart = 0;
434 0 : const uint32_t SlotRangeEnd = 1;
435 0 : const uint32_t SlotPrimary = 2;
436 0 : const uint32_t SlotReplicaStart = 3;
437 :
438 : // Do nothing if the cluster is empty.
439 0 : if (value->type() != NetworkFilters::Common::Redis::RespType::Array || value->asArray().empty()) {
440 0 : onUnexpectedResponse(value);
441 0 : return;
442 0 : }
443 :
444 0 : auto cluster_slots = std::make_shared<std::vector<ClusterSlot>>();
445 :
446 : // https://redis.io/commands/cluster-slots
447 : // CLUSTER SLOTS represents nested array of redis instances, like this:
448 : //
449 : // 1) 1) (integer) 0 <-- start slot range
450 : // 2) (integer) 5460 <-- end slot range
451 : //
452 : // 3) 1) "127.0.0.1" <-- primary slot IP ADDR(HOSTNAME)
453 : // 2) (integer) 30001 <-- primary slot PORT
454 : // 3) "09dbe9720cda62f7865eabc5fd8857c5d2678366"
455 : //
456 : // 4) 1) "127.0.0.2" <-- replica slot IP ADDR(HOSTNAME)
457 : // 2) (integer) 30004 <-- replica slot PORT
458 : // 3) "821d8ca00d7ccf931ed3ffc7e3db0599d2271abf"
459 : //
460 : // Loop through the cluster slot response and error checks for each field.
461 0 : auto hostname_resolution_required_cnt = std::make_shared<std::uint64_t>(0);
462 0 : for (const NetworkFilters::Common::Redis::RespValue& part : value->asArray()) {
463 0 : if (part.type() != NetworkFilters::Common::Redis::RespType::Array) {
464 0 : onUnexpectedResponse(value);
465 0 : return;
466 0 : }
467 :
468 : // Row 1-2: Slot ranges
469 0 : const std::vector<NetworkFilters::Common::Redis::RespValue>& slot_range = part.asArray();
470 0 : if (slot_range.size() < 3 ||
471 0 : slot_range[SlotRangeStart].type() !=
472 0 : NetworkFilters::Common::Redis::RespType::Integer || // Start slot range is an
473 : // integer.
474 0 : slot_range[SlotRangeEnd].type() !=
475 0 : NetworkFilters::Common::Redis::RespType::Integer) { // End slot range is an
476 : // integer.
477 0 : onUnexpectedResponse(value);
478 0 : return;
479 0 : }
480 :
481 : // Row 3: Primary slot address
482 0 : if (!validateCluster(slot_range[SlotPrimary])) {
483 0 : onUnexpectedResponse(value);
484 0 : return;
485 0 : }
486 : // Try to parse primary slot address as IP address
487 : // It may fail in case the address is a hostname. If this is the case - we'll come back later
488 : // and try to resolve hostnames asynchronously. For example, AWS ElastiCache returns hostname
489 : // instead of IP address.
490 0 : ClusterSlot slot(slot_range[SlotRangeStart].asInteger(), slot_range[SlotRangeEnd].asInteger(),
491 0 : ipAddressFromClusterEntry(slot_range[SlotPrimary].asArray()));
492 0 : if (slot.primary() == nullptr) {
493 : // Primary address is potentially a hostname, save it for async DNS resolution.
494 0 : const auto& array = slot_range[SlotPrimary].asArray();
495 0 : slot.primary_hostname_ = array[0].asString();
496 0 : slot.primary_port_ = array[1].asInteger();
497 0 : (*hostname_resolution_required_cnt)++;
498 0 : }
499 :
500 : // Row 4-N: Replica(s) addresses
501 0 : for (auto replica = std::next(slot_range.begin(), SlotReplicaStart);
502 0 : replica != slot_range.end(); ++replica) {
503 0 : if (!validateCluster(*replica)) {
504 0 : onUnexpectedResponse(value);
505 0 : return;
506 0 : }
507 0 : auto replica_address = ipAddressFromClusterEntry(replica->asArray());
508 0 : if (replica_address) {
509 0 : slot.addReplica(std::move(replica_address));
510 0 : } else {
511 : // Replica address is potentially a hostname, save it for async DNS resolution.
512 0 : const auto& array = replica->asArray();
513 0 : slot.addReplicaToResolve(array[0].asString(), array[1].asInteger());
514 0 : (*hostname_resolution_required_cnt)++;
515 0 : }
516 0 : }
517 0 : cluster_slots->push_back(std::move(slot));
518 0 : }
519 :
520 0 : if (*hostname_resolution_required_cnt > 0) {
521 : // DNS resolution is required, defer finalizing the slot update until resolution is complete.
522 0 : resolveClusterHostnames(std::move(cluster_slots), hostname_resolution_required_cnt);
523 0 : } else {
524 : // All slots addresses were represented by IP/Port pairs.
525 0 : parent_.onClusterSlotUpdate(std::move(cluster_slots));
526 0 : resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
527 0 : }
528 0 : }
529 :
530 : // Ensure that Slot Cluster response has valid format
531 : bool RedisCluster::RedisDiscoverySession::validateCluster(
532 0 : const NetworkFilters::Common::Redis::RespValue& value) {
533 : // Verify data types
534 0 : if (value.type() != NetworkFilters::Common::Redis::RespType::Array) {
535 0 : return false;
536 0 : }
537 0 : const auto& array = value.asArray();
538 0 : if (array.size() < 2 || array[0].type() != NetworkFilters::Common::Redis::RespType::BulkString ||
539 0 : array[1].type() != NetworkFilters::Common::Redis::RespType::Integer) {
540 0 : return false;
541 0 : }
542 : // Verify IP/Host address
543 0 : if (array[0].asString().empty()) {
544 0 : return false;
545 0 : }
546 : // Verify port
547 0 : if (array[1].asInteger() > 0xffff) {
548 0 : return false;
549 0 : }
550 :
551 0 : return true;
552 0 : }
553 :
554 : void RedisCluster::RedisDiscoverySession::onUnexpectedResponse(
555 0 : const NetworkFilters::Common::Redis::RespValuePtr& value) {
556 0 : ENVOY_LOG(warn, "Unexpected response to cluster slot command: {}", value->toString());
557 0 : this->parent_.info_->configUpdateStats().update_failure_.inc();
558 0 : resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
559 0 : }
560 :
561 0 : void RedisCluster::RedisDiscoverySession::onFailure() {
562 0 : ENVOY_LOG(debug, "redis cluster slot request for '{}' failed", parent_.info_->name());
563 0 : current_request_ = nullptr;
564 0 : if (!current_host_address_.empty()) {
565 0 : auto client_to_delete = client_map_.find(current_host_address_);
566 0 : client_to_delete->second->client_->close();
567 0 : }
568 0 : parent_.info()->configUpdateStats().update_failure_.inc();
569 0 : resolve_timer_->enableTimer(parent_.cluster_refresh_rate_);
570 0 : }
571 :
572 : RedisCluster::ClusterSlotsRequest RedisCluster::ClusterSlotsRequest::instance_;
573 :
574 : absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
575 : RedisClusterFactory::createClusterWithConfig(
576 : const envoy::config::cluster::v3::Cluster& cluster,
577 : const envoy::extensions::clusters::redis::v3::RedisClusterConfig& proto_config,
578 0 : Upstream::ClusterFactoryContext& context) {
579 0 : if (!cluster.has_cluster_type() || cluster.cluster_type().name() != "envoy.clusters.redis") {
580 0 : return absl::InvalidArgumentError("Redis cluster can only created with redis cluster type.");
581 0 : }
582 : // TODO(hyang): This is needed to migrate existing cluster, disallow using other lb_policy
583 : // in the future
584 0 : if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
585 0 : return std::make_pair(std::make_shared<RedisCluster>(
586 0 : cluster, proto_config, context,
587 0 : NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_,
588 0 : selectDnsResolver(cluster, context), nullptr),
589 0 : nullptr);
590 0 : }
591 0 : auto lb_factory = std::make_shared<RedisClusterLoadBalancerFactory>(
592 0 : context.serverFactoryContext().api().randomGenerator());
593 0 : return std::make_pair(std::make_shared<RedisCluster>(
594 0 : cluster, proto_config, context,
595 0 : NetworkFilters::Common::Redis::Client::ClientFactoryImpl::instance_,
596 0 : selectDnsResolver(cluster, context), lb_factory),
597 0 : std::make_unique<RedisClusterThreadAwareLoadBalancer>(lb_factory));
598 0 : }
599 :
600 : REGISTER_FACTORY(RedisClusterFactory, Upstream::ClusterFactory);
601 :
602 : } // namespace Redis
603 : } // namespace Clusters
604 : } // namespace Extensions
605 : } // namespace Envoy
|