1
#pragma once
2

            
3
#include <array>
4
#include <string>
5
#include <vector>
6

            
7
#include "envoy/upstream/load_balancer.h"
8
#include "envoy/upstream/upstream.h"
9

            
10
#include "source/common/network/address_impl.h"
11
#include "source/common/upstream/load_balancer_context_base.h"
12
#include "source/common/upstream/upstream_impl.h"
13
#include "source/extensions/clusters/redis/crc16.h"
14
#include "source/extensions/filters/network/common/redis/client.h"
15
#include "source/extensions/filters/network/common/redis/codec.h"
16
#include "source/extensions/filters/network/common/redis/supported_commands.h"
17

            
18
#include "absl/container/btree_map.h"
19
#include "absl/synchronization/mutex.h"
20

            
21
namespace Envoy {
22
namespace Extensions {
23
namespace Clusters {
24
namespace Redis {
25

            
26
static const uint64_t MaxSlot = 16384;
27

            
28
using ReplicaToResolve = std::pair<std::string, uint16_t>;
29

            
30
class ClusterSlot {
31
public:
32
  ClusterSlot(int64_t start, int64_t end, Network::Address::InstanceConstSharedPtr primary)
33
95
      : start_(start), end_(end), primary_(std::move(primary)) {}
34

            
35
105
  int64_t start() const { return start_; }
36
278598
  int64_t end() const { return end_; }
37
266
  Network::Address::InstanceConstSharedPtr primary() const { return primary_; }
38
83
  const absl::btree_map<std::string, Network::Address::InstanceConstSharedPtr>& replicas() const {
39
83
    return replicas_;
40
83
  }
41

            
42
10
  void setPrimary(Network::Address::InstanceConstSharedPtr address) {
43
10
    primary_ = std::move(address);
44
10
  }
45
42
  void addReplica(Network::Address::InstanceConstSharedPtr replica_address) {
46
42
    replicas_.emplace(replica_address->asString(), std::move(replica_address));
47
42
  }
48
12
  void addReplicaToResolve(const std::string& host, uint16_t port) {
49
12
    replicas_to_resolve_.emplace_back(host, port);
50
12
  }
51

            
52
  bool operator==(const ClusterSlot& rhs) const;
53

            
54
  // In case of primary slot address is hostname and needs to be resolved
55
  std::string primary_hostname_;
56
  uint16_t primary_port_;
57
  std::vector<ReplicaToResolve> replicas_to_resolve_;
58

            
59
private:
60
  int64_t start_;
61
  int64_t end_;
62
  Network::Address::InstanceConstSharedPtr primary_;
63
  absl::btree_map<std::string, Network::Address::InstanceConstSharedPtr> replicas_;
64
};
65

            
66
using ClusterSlotsPtr = std::unique_ptr<std::vector<ClusterSlot>>;
67
using ClusterSlotsSharedPtr = std::shared_ptr<std::vector<ClusterSlot>>;
68

            
69
class RedisLoadBalancerContext {
70
public:
71
  virtual ~RedisLoadBalancerContext() = default;
72

            
73
  virtual bool isReadCommand() const PURE;
74
  virtual NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const PURE;
75
};
76

            
77
class RedisLoadBalancerContextImpl : public RedisLoadBalancerContext,
78
                                     public Upstream::LoadBalancerContextBase {
79
public:
80
  /**
81
   * The load balancer context for Redis requests. Note that is_redis_cluster implies using Redis
82
   * cluster which require us to always enable hashtagging.
83
   * @param key specify the key for the Redis request.
84
   * @param enabled_hashtagging specify whether to enable hashtagging, this will always be true if
85
   * is_redis_cluster is true.
86
   * @param is_redis_cluster specify whether this is a request for redis cluster, if true the key
87
   * will be hashed using crc16.
88
   * @param request specify the Redis request.
89
   * @param read_policy specify the read policy.
90
   */
91
  RedisLoadBalancerContextImpl(const std::string& key, bool enabled_hashtagging,
92
                               bool is_redis_cluster,
93
                               const NetworkFilters::Common::Redis::RespValue& request,
94
                               NetworkFilters::Common::Redis::Client::ReadPolicy read_policy =
95
                                   NetworkFilters::Common::Redis::Client::ReadPolicy::Primary);
96

            
97
  // Upstream::LoadBalancerContextBase
98
66
  absl::optional<uint64_t> computeHashKey() override { return hash_key_; }
99

            
100
22
  bool isReadCommand() const override { return is_read_; }
101

            
102
25
  NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override {
103
25
    return read_policy_;
104
25
  }
105

            
106
private:
107
  absl::string_view hashtag(absl::string_view v, bool enabled);
108

            
109
  static bool isReadRequest(const NetworkFilters::Common::Redis::RespValue& request);
110

            
111
  const absl::optional<uint64_t> hash_key_;
112
  const bool is_read_;
113
  const NetworkFilters::Common::Redis::Client::ReadPolicy read_policy_;
114
};
115

            
116
class RedisSpecifyShardContextImpl : public RedisLoadBalancerContextImpl {
117
public:
118
  /**
119
   * The redis specify Shard load balancer context for Redis requests.
120
   * @param shard_index specify the shard index for the Redis request.
121
   * @param request specify the Redis request.
122
   * @param read_policy specify the read policy.
123
   */
124
  RedisSpecifyShardContextImpl(uint64_t shard_index,
125
                               const NetworkFilters::Common::Redis::RespValue& request,
126
                               NetworkFilters::Common::Redis::Client::ReadPolicy read_policy =
127
                                   NetworkFilters::Common::Redis::Client::ReadPolicy::Primary);
128

            
129
  // Upstream::LoadBalancerContextBase
130
10116
  absl::optional<uint64_t> computeHashKey() override { return shard_index_; }
131

            
132
private:
133
  const absl::optional<uint64_t> shard_index_;
134
};
135

            
136
class ClusterSlotUpdateCallBack {
137
public:
138
40
  virtual ~ClusterSlotUpdateCallBack() = default;
139

            
140
  /**
141
   * Callback when cluster slot is updated
142
   * @param slots provides the updated cluster slots.
143
   * @param all_hosts provides the updated hosts.
144
   * @return indicate if the cluster slot is updated or not.
145
   */
146
  virtual bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots,
147
                                   Upstream::HostMap& all_hosts) PURE;
148

            
149
  /**
150
   * Callback when a host's health status is updated
151
   */
152
  virtual void onHostHealthUpdate() PURE;
153
};
154

            
155
using ClusterSlotUpdateCallBackSharedPtr = std::shared_ptr<ClusterSlotUpdateCallBack>;
156

            
157
/**
158
 * This factory is created and returned by RedisCluster's factory() method, the create() method will
159
 * be called on each thread to create a thread local RedisClusterLoadBalancer.
160
 */
161
class RedisClusterLoadBalancerFactory : public ClusterSlotUpdateCallBack,
162
                                        public Upstream::LoadBalancerFactory {
163
public:
164
17
  RedisClusterLoadBalancerFactory(Random::RandomGenerator& random) : random_(random) {}
165

            
166
  // ClusterSlotUpdateCallBack
167
  bool onClusterSlotUpdate(ClusterSlotsSharedPtr&& slots, Upstream::HostMap& all_hosts) override;
168

            
169
  void onHostHealthUpdate() override;
170

            
171
  // Upstream::LoadBalancerFactory
172
  Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override;
173

            
174
private:
175
  class RedisShard {
176
  public:
177
    RedisShard(Upstream::HostConstSharedPtr primary, Upstream::HostVectorConstSharedPtr replicas,
178
               Upstream::HostVectorConstSharedPtr all_hosts, Random::RandomGenerator& random)
179
36
        : primary_(std::move(primary)) {
180
36
      replicas_.updateHosts(Upstream::HostSetImpl::partitionHosts(
181
36
                                std::move(replicas), Upstream::HostsPerLocalityImpl::empty()),
182
36
                            nullptr, {}, {}, random.random());
183
36
      all_hosts_.updateHosts(Upstream::HostSetImpl::partitionHosts(
184
36
                                 std::move(all_hosts), Upstream::HostsPerLocalityImpl::empty()),
185
36
                             nullptr, {}, {}, random.random());
186
36
    }
187
128
    const Upstream::HostConstSharedPtr primary() const { return primary_; }
188
78
    const Upstream::HostSetImpl& replicas() const { return replicas_; }
189
84
    const Upstream::HostSetImpl& allHosts() const { return all_hosts_; }
190

            
191
  private:
192
    const Upstream::HostConstSharedPtr primary_;
193
    Upstream::HostSetImpl replicas_{0, absl::nullopt, absl::nullopt};
194
    Upstream::HostSetImpl all_hosts_{0, absl::nullopt, absl::nullopt};
195
  };
196

            
197
  using RedisShardSharedPtr = std::shared_ptr<const RedisShard>;
198
  using ShardVectorSharedPtr = std::shared_ptr<std::vector<RedisShardSharedPtr>>;
199
  using SlotArray = std::array<uint64_t, MaxSlot>;
200
  using SlotArraySharedPtr = std::shared_ptr<const SlotArray>;
201

            
202
  /*
203
   * This class implements load balancing according to `Redis Cluster
204
   * <https://redis.io/topics/cluster-spec>`_. This load balancer is thread local and created
205
   * through the RedisClusterLoadBalancerFactory by the cluster manager.
206
   *
207
   * The topology is stored in slot_array_ and shard_vector_. According to the
208
   * `Redis Cluster Spec <https://redis.io/topics/cluster-spec#keys-distribution-model`_, the key
209
   * space is split into a fixed size 16384 slots. The current implementation uses a fixed size
210
   * std::array() of the index of the shard in the shard_vector_. This has a fixed cpu and memory
211
   * cost and provide a fast lookup constant time lookup similar to Maglev. This will be used by the
212
   * redis proxy filter for load balancing purpose.
213
   */
214
  class RedisClusterLoadBalancer : public Upstream::LoadBalancer {
215
  public:
216
    RedisClusterLoadBalancer(SlotArraySharedPtr slot_array, ShardVectorSharedPtr shard_vector,
217
                             Random::RandomGenerator& random)
218
59
        : slot_array_(std::move(slot_array)), shard_vector_(std::move(shard_vector)),
219
59
          random_(random) {}
220

            
221
    // Upstream::LoadBalancerBase
222
    Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext*) override;
223
    Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
224
      return nullptr;
225
    }
226
    // Pool selection not implemented.
227
    absl::optional<Upstream::SelectedPoolAndConnection>
228
    selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
229
                             const Upstream::Host& /*host*/,
230
                             std::vector<uint8_t>& /*hash_key*/) override {
231
      return absl::nullopt;
232
    }
233
    // Lifetime tracking not implemented.
234
    OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
235
      return {};
236
    }
237

            
238
  private:
239
    const SlotArraySharedPtr slot_array_;
240
    const ShardVectorSharedPtr shard_vector_;
241
    Random::RandomGenerator& random_;
242
  };
243

            
244
  absl::Mutex mutex_;
245
  SlotArraySharedPtr slot_array_ ABSL_GUARDED_BY(mutex_);
246
  ClusterSlotsSharedPtr current_cluster_slot_;
247
  ShardVectorSharedPtr shard_vector_;
248
  Random::RandomGenerator& random_;
249
};
250

            
251
class RedisClusterThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
252
public:
253
  RedisClusterThreadAwareLoadBalancer(Upstream::LoadBalancerFactorySharedPtr factory)
254
17
      : factory_(std::move(factory)) {}
255

            
256
  // Upstream::ThreadAwareLoadBalancer
257
38
  Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
258
17
  absl::Status initialize() override { return absl::OkStatus(); }
259

            
260
private:
261
  Upstream::LoadBalancerFactorySharedPtr factory_;
262
};
263

            
264
} // namespace Redis
265
} // namespace Clusters
266
} // namespace Extensions
267
} // namespace Envoy