1
#pragma once
2

            
3
#include <array>
4
#include <atomic>
5
#include <chrono>
6
#include <cstdint>
7
#include <functional>
8
#include <list>
9
#include <memory>
10
#include <string>
11
#include <tuple>
12
#include <utility>
13
#include <vector>
14

            
15
#include "envoy/api/api.h"
16
#include "envoy/common/random_generator.h"
17
#include "envoy/config/cluster/v3/cluster.pb.h"
18
#include "envoy/config/endpoint/v3/endpoint.pb.h"
19
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
20
#include "envoy/config/typed_metadata.h"
21
#include "envoy/event/dispatcher.h"
22
#include "envoy/event/timer.h"
23
#include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.h"
24
#include "envoy/extensions/clusters/redis/v3/redis_cluster.pb.validate.h"
25
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
26
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h"
27
#include "envoy/http/codec.h"
28
#include "envoy/local_info/local_info.h"
29
#include "envoy/network/dns.h"
30
#include "envoy/runtime/runtime.h"
31
#include "envoy/secret/secret_manager.h"
32
#include "envoy/server/transport_socket_config.h"
33
#include "envoy/singleton/manager.h"
34
#include "envoy/ssl/context_manager.h"
35
#include "envoy/stats/scope.h"
36
#include "envoy/thread_local/thread_local.h"
37
#include "envoy/upstream/cluster_manager.h"
38
#include "envoy/upstream/health_checker.h"
39
#include "envoy/upstream/load_balancer.h"
40
#include "envoy/upstream/locality.h"
41
#include "envoy/upstream/upstream.h"
42

            
43
#include "source/common/common/callback_impl.h"
44
#include "source/common/common/enum_to_int.h"
45
#include "source/common/common/logger.h"
46
#include "source/common/config/datasource.h"
47
#include "source/common/config/metadata.h"
48
#include "source/common/config/well_known_names.h"
49
#include "source/common/network/address_impl.h"
50
#include "source/common/network/utility.h"
51
#include "source/common/stats/isolated_store_impl.h"
52
#include "source/common/upstream/cluster_factory_impl.h"
53
#include "source/common/upstream/load_balancer_context_base.h"
54
#include "source/common/upstream/outlier_detection_impl.h"
55
#include "source/common/upstream/resource_manager_impl.h"
56
#include "source/common/upstream/upstream_impl.h"
57
#include "source/extensions/clusters/redis/redis_cluster_lb.h"
58
#include "source/extensions/common/redis/cluster_refresh_manager_impl.h"
59
#include "source/extensions/filters/network/common/redis/client.h"
60
#include "source/extensions/filters/network/common/redis/client_impl.h"
61
#include "source/extensions/filters/network/common/redis/codec.h"
62
#include "source/extensions/filters/network/common/redis/utility.h"
63
#include "source/extensions/filters/network/redis_proxy/config.h"
64
#include "source/server/transport_socket_config_impl.h"
65

            
66
namespace Envoy {
67
namespace Extensions {
68
namespace Clusters {
69
namespace Redis {
70

            
71
/*
72
 * This class implements support for the topology part of `Redis Cluster
73
 * <https://redis.io/topics/cluster-spec>`_. Specifically, it allows Envoy to maintain an internal
74
 * representation of the topology of a Redis Cluster, and how often the topology should be
75
 * refreshed.
76
 *
77
 * The target Redis Cluster is obtained from the yaml config file as usual, and we choose a random
78
 * discovery address from DNS if there are no existing hosts (our startup condition). Otherwise, we
79
 * choose a random host from our known set of hosts. Then, against this host we make a topology
80
 * request.
81
 *
82
 * Topology requests are handled by RedisDiscoverySession, which handles the initialization of
83
 * the `CLUSTER SLOTS command <https://redis.io/commands/cluster-slots>`_, and the responses and
84
 * failure cases.
85
 *
86
 * Once the topology is fetched from Redis, the cluster will update the
87
 * RedisClusterLoadBalancerFactory, which will be used by the redis proxy filter for load balancing
88
 * purpose.
89
 */
90

            
91
class RedisCluster : public Upstream::BaseDynamicClusterImpl {
92
public:
93
  ~RedisCluster();
94
  static absl::StatusOr<std::unique_ptr<RedisCluster>>
95
  create(const envoy::config::cluster::v3::Cluster& cluster,
96
         const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster,
97
         Upstream::ClusterFactoryContext& context,
98
         NetworkFilters::Common::Redis::Client::ClientFactory& client_factory,
99
         Network::DnsResolverSharedPtr dns_resolver, ClusterSlotUpdateCallBackSharedPtr factory);
100

            
101
  struct ClusterSlotsRequest : public Extensions::NetworkFilters::Common::Redis::RespValue {
102
  public:
103
7
    ClusterSlotsRequest() {
104
7
      type(Extensions::NetworkFilters::Common::Redis::RespType::Array);
105
7
      std::vector<NetworkFilters::Common::Redis::RespValue> values(2);
106
7
      values[0].type(NetworkFilters::Common::Redis::RespType::BulkString);
107
7
      values[0].asString() = "CLUSTER";
108
7
      values[1].type(NetworkFilters::Common::Redis::RespType::BulkString);
109
7
      values[1].asString() = "SLOTS";
110
7
      asArray().swap(values);
111
7
    }
112

            
113
    static ClusterSlotsRequest instance_;
114
  };
115

            
116
14
  InitializePhase initializePhase() const override { return InitializePhase::Primary; }
117

            
118
  /// TimeSource& timeSource() const { return time_source_; }
119

            
120
protected:
121
  RedisCluster(const envoy::config::cluster::v3::Cluster& cluster,
122
               const envoy::extensions::clusters::redis::v3::RedisClusterConfig& redis_cluster,
123
               Upstream::ClusterFactoryContext& context,
124
               NetworkFilters::Common::Redis::Client::ClientFactory& client_factory,
125
               Network::DnsResolverSharedPtr dns_resolver,
126
               ClusterSlotUpdateCallBackSharedPtr factory, absl::Status& creation_status);
127

            
128
private:
129
  friend class RedisClusterFactory;
130
  friend class RedisClusterTest;
131

            
132
  void startPreInit() override;
133

            
134
  void updateAllHosts(const Upstream::HostVector& hosts_added,
135
                      const Upstream::HostVector& hosts_removed, uint32_t priority);
136

            
137
  void onClusterSlotUpdate(ClusterSlotsSharedPtr&&);
138

            
139
  void reloadHealthyHostsHelper(const Upstream::HostSharedPtr& host) override;
140

            
141
854
  const envoy::config::endpoint::v3::LocalityLbEndpoints& localityLbEndpoint() const {
142
    // Always use the first endpoint.
143
854
    return load_assignment_.endpoints()[0];
144
854
  }
145

            
146
448
  const envoy::config::endpoint::v3::LbEndpoint& lbEndpoint() const {
147
    // Always use the first endpoint.
148
448
    return localityLbEndpoint().lb_endpoints()[0];
149
448
  }
150

            
151
  // A redis node in the Redis cluster.
152
  class RedisHost : public Upstream::HostImpl {
153
  public:
154
    static absl::StatusOr<std::unique_ptr<RedisHost>>
155
    create(Upstream::ClusterInfoConstSharedPtr cluster, const std::string& hostname,
156
           Network::Address::InstanceConstSharedPtr address, RedisCluster& parent, bool primary);
157

            
158
  protected:
159
    RedisHost(Upstream::ClusterInfoConstSharedPtr cluster, const std::string& hostname,
160
              Network::Address::InstanceConstSharedPtr address, RedisCluster& parent, bool primary,
161
              absl::Status& creation_status)
162
112
        : Upstream::HostImpl(
163
112
              creation_status, cluster, hostname, address,
164
              // TODO(zyfjeff): Created through metadata shared pool
165
112
              std::make_shared<envoy::config::core::v3::Metadata>(parent.lbEndpoint().metadata()),
166
112
              std::make_shared<envoy::config::core::v3::Metadata>(
167
112
                  parent.localityLbEndpoint().metadata()),
168
112
              parent.lbEndpoint().load_balancing_weight().value(),
169
              // TODO(adisuissa): Convert to use a shared pool of localities.
170
112
              std::make_shared<const envoy::config::core::v3::Locality>(
171
112
                  parent.localityLbEndpoint().locality()),
172
112
              parent.lbEndpoint().endpoint().health_check_config(),
173
112
              parent.localityLbEndpoint().priority(), parent.lbEndpoint().health_status()),
174
112
          primary_(primary) {}
175

            
176
    bool isPrimary() const { return primary_; }
177

            
178
  private:
179
    const bool primary_;
180
  };
181

            
182
  // Resolves the discovery endpoint.
183
  struct DnsDiscoveryResolveTarget {
184
    DnsDiscoveryResolveTarget(RedisCluster& parent, const std::string& dns_address,
185
                              const uint32_t port);
186

            
187
    ~DnsDiscoveryResolveTarget();
188

            
189
    void startResolveDns();
190

            
191
    RedisCluster& parent_;
192
    Network::ActiveDnsQuery* active_query_{};
193
    const std::string dns_address_;
194
    const uint32_t port_;
195
    Event::TimerPtr resolve_timer_;
196
  };
197

            
198
  using DnsDiscoveryResolveTargetPtr = std::unique_ptr<DnsDiscoveryResolveTarget>;
199

            
200
  struct RedisDiscoverySession;
201

            
202
  struct RedisDiscoveryClient : public Network::ConnectionCallbacks {
203
34
    RedisDiscoveryClient(RedisDiscoverySession& parent) : parent_(parent) {}
204

            
205
    // Network::ConnectionCallbacks
206
    void onEvent(Network::ConnectionEvent event) override;
207
1
    void onAboveWriteBufferHighWatermark() override {}
208
1
    void onBelowWriteBufferLowWatermark() override {}
209

            
210
    RedisDiscoverySession& parent_;
211
    std::string host_;
212
    Extensions::NetworkFilters::Common::Redis::Client::ClientPtr client_;
213
  };
214

            
215
  using RedisDiscoveryClientPtr = std::unique_ptr<RedisDiscoveryClient>;
216

            
217
  struct RedisDiscoverySession
218
      : public Extensions::NetworkFilters::Common::Redis::Client::Config,
219
        public Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks,
220
        public std::enable_shared_from_this<RedisDiscoverySession> {
221
    RedisDiscoverySession(RedisCluster& parent,
222
                          NetworkFilters::Common::Redis::Client::ClientFactory& client_factory);
223

            
224
    ~RedisDiscoverySession() override;
225

            
226
    void registerDiscoveryAddress(std::list<Network::DnsResponse>&& response, const uint32_t port);
227

            
228
    // Start discovery against a random host from existing hosts
229
    void startResolveRedis();
230

            
231
    // Extensions::NetworkFilters::Common::Redis::Client::Config
232
10
    bool disableOutlierEvents() const override { return true; }
233
10
    std::chrono::milliseconds opTimeout() const override {
234
      // Allow the main Health Check infra to control timeout.
235
10
      return parent_.cluster_refresh_timeout_;
236
10
    }
237
1
    bool enableHashtagging() const override { return false; }
238
8
    bool enableRedirection() const override { return false; }
239
10
    uint32_t maxBufferSizeBeforeFlush() const override { return 0; }
240
1
    std::chrono::milliseconds bufferFlushTimeoutInMs() const override { return buffer_timeout_; }
241
1
    uint32_t maxUpstreamUnknownConnections() const override { return 0; }
242
28
    bool enableCommandStats() const override { return true; }
243
    bool connectionRateLimitEnabled() const override { return false; }
244
    uint32_t connectionRateLimitPerSec() const override { return 0; }
245
    // For any readPolicy other than Primary, the RedisClientFactory will send a READONLY command
246
    // when establishing a new connection. Since we're only using this for making the "cluster
247
    // slots" commands, the READONLY command is not relevant in this context. We're setting it to
248
    // Primary to avoid the additional READONLY command.
249
9
    Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override {
250
9
      return Extensions::NetworkFilters::Common::Redis::Client::ReadPolicy::Primary;
251
9
    }
252

            
253
    // Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks
254
    void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
255
    void onFailure() override;
256
    // Note: Below callback isn't used in topology updates
257
    void onRedirection(NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&,
258
1
                       bool) override {}
259
    void onUnexpectedResponse(const NetworkFilters::Common::Redis::RespValuePtr&);
260

            
261
    Network::Address::InstanceConstSharedPtr
262
    ipAddressFromClusterEntry(const std::vector<NetworkFilters::Common::Redis::RespValue>& array);
263
    bool validateCluster(const NetworkFilters::Common::Redis::RespValue& value);
264
    void resolveClusterHostnames(ClusterSlotsSharedPtr&& slots,
265
                                 std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt);
266
    void resolveReplicas(ClusterSlotsSharedPtr slots, std::size_t index,
267
                         std::shared_ptr<std::uint64_t> hostname_resolution_required_cnt);
268
    void finishClusterHostnameResolution(ClusterSlotsSharedPtr slots);
269
    void updateDnsStats(Network::DnsResolver::ResolutionStatus status, bool empty_response);
270

            
271
    RedisCluster& parent_;
272
    Event::Dispatcher& dispatcher_;
273
    std::string current_host_address_;
274
    Extensions::NetworkFilters::Common::Redis::Client::PoolRequest* current_request_{};
275
    absl::node_hash_map<std::string, RedisDiscoveryClientPtr> client_map_;
276

            
277
    std::list<Network::Address::InstanceConstSharedPtr> discovery_address_list_;
278

            
279
    Event::TimerPtr resolve_timer_;
280
    NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_;
281
    const std::chrono::milliseconds buffer_timeout_;
282
    NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_;
283
  };
284

            
285
  Upstream::ClusterManager& cluster_manager_;
286
  const std::chrono::milliseconds cluster_refresh_rate_;
287
  const std::chrono::milliseconds cluster_refresh_timeout_;
288
  const std::chrono::milliseconds redirect_refresh_interval_;
289
  const uint32_t redirect_refresh_threshold_;
290
  const uint32_t failure_refresh_threshold_;
291
  const uint32_t host_degraded_refresh_threshold_;
292
  std::list<DnsDiscoveryResolveTargetPtr> dns_discovery_resolve_targets_;
293
  Event::Dispatcher& dispatcher_;
294
  Network::DnsResolverSharedPtr dns_resolver_;
295
  Network::DnsLookupFamily dns_lookup_family_;
296
  const envoy::config::endpoint::v3::ClusterLoadAssignment load_assignment_;
297
  const LocalInfo::LocalInfo& local_info_;
298
  Random::RandomGenerator& random_;
299
  std::shared_ptr<RedisDiscoverySession> redis_discovery_session_;
300
  const ClusterSlotUpdateCallBackSharedPtr lb_factory_;
301

            
302
  Upstream::HostVector hosts_;
303

            
304
  const std::string auth_username_;
305
  const std::string auth_password_;
306
  const std::string cluster_name_;
307
  const Common::Redis::ClusterRefreshManagerSharedPtr refresh_manager_;
308
  Common::Redis::ClusterRefreshManager::HandlePtr registration_handle_;
309

            
310
  // Flag to prevent callbacks during destruction
311
  std::atomic<bool> is_destroying_{false};
312
};
313

            
314
class RedisClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
315
                                envoy::extensions::clusters::redis::v3::RedisClusterConfig> {
316
public:
317
9
  RedisClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.redis") {}
318

            
319
private:
320
  friend class RedisClusterTest;
321

            
322
  absl::StatusOr<
323
      std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
324
  createClusterWithConfig(
325
      const envoy::config::cluster::v3::Cluster& cluster,
326
      const envoy::extensions::clusters::redis::v3::RedisClusterConfig& proto_config,
327
      Upstream::ClusterFactoryContext& context) override;
328
};
329
} // namespace Redis
330
} // namespace Clusters
331
} // namespace Extensions
332
} // namespace Envoy