1
#pragma once
2

            
3
#include <chrono>
4

            
5
#include "envoy/api/api.h"
6
#include "envoy/config/core/v3/health_check.pb.h"
7
#include "envoy/data/core/v3/health_check_event.pb.h"
8
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
9
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h"
10
#include "envoy/extensions/health_checkers/redis/v3/redis.pb.h"
11

            
12
#include "source/extensions/filters/network/common/redis/client_impl.h"
13
#include "source/extensions/filters/network/redis_proxy/config.h"
14
#include "source/extensions/filters/network/redis_proxy/conn_pool_impl.h"
15
#include "source/extensions/health_checkers/common/health_checker_base_impl.h"
16

            
17
namespace Envoy {
18
namespace Extensions {
19
namespace HealthCheckers {
20
namespace RedisHealthChecker {
21

            
22
/**
23
 * All redis health checker stats. @see stats_macros.h
24
 */
25
14
#define ALL_REDIS_HEALTH_CHECKER_STATS(COUNTER) COUNTER(exists_failure)
26

            
27
/**
28
 * Definition of all redis health checker stats. @see stats_macros.h
29
 */
30
struct RedisHealthCheckerStats {
31
  ALL_REDIS_HEALTH_CHECKER_STATS(GENERATE_COUNTER_STRUCT)
32
};
33

            
34
/**
35
 * Redis health checker implementation. Sends PING and expects PONG.
36
 */
37
class RedisHealthChecker : public Upstream::HealthCheckerImplBase {
38
public:
39
  RedisHealthChecker(
40
      const Upstream::Cluster& cluster, const envoy::config::core::v3::HealthCheck& config,
41
      const envoy::extensions::health_checkers::redis::v3::Redis& redis_config,
42
      Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
43
      Upstream::HealthCheckEventLoggerPtr&& event_logger, Api::Api& api,
44
      Extensions::NetworkFilters::Common::Redis::Client::ClientFactory& client_factory,
45
      const absl::optional<envoy::extensions::filters::network::redis_proxy::v3::AwsIam>
46
          aws_iam_config,
47
      const absl::optional<Extensions::NetworkFilters::Common::Redis::AwsIamAuthenticator::
48
                               AwsIamAuthenticatorSharedPtr>
49
          aws_iam_authenticator);
50

            
51
39
  static const NetworkFilters::Common::Redis::RespValue& pingHealthCheckRequest() {
52
39
    static HealthCheckRequest* request = new HealthCheckRequest();
53
39
    return request->request_;
54
39
  }
55

            
56
  static const NetworkFilters::Common::Redis::RespValue&
57
14
  existsHealthCheckRequest(const std::string& key) {
58
14
    static HealthCheckRequest* request = new HealthCheckRequest(key);
59
14
    return request->request_;
60
14
  }
61

            
62
protected:
63
11
  envoy::data::core::v3::HealthCheckerType healthCheckerType() const override {
64
11
    return envoy::data::core::v3::REDIS;
65
11
  }
66

            
67
private:
68
  friend class RedisHealthCheckerTest;
69
  RedisHealthCheckerStats generateRedisStats(Stats::Scope& scope);
70

            
71
  struct RedisConfig : public Extensions::NetworkFilters::Common::Redis::Client::Config {
72
10
    RedisConfig(std::chrono::milliseconds timeout) : parent_timeout_(timeout) {}
73

            
74
    // Extensions::NetworkFilters::Common::Redis::Client::Config
75
1
    bool disableOutlierEvents() const override { return true; }
76
1
    std::chrono::milliseconds opTimeout() const override {
77
      // Allow the main Health Check infra to control timeout.
78
1
      return parent_timeout_ * 2;
79
1
    }
80
1
    bool enableHashtagging() const override { return false; }
81
1
    bool enableRedirection() const override {
82
1
      return true;
83
1
    } // Redirection errors are treated as check successes.
84
    NetworkFilters::Common::Redis::Client::ReadPolicy readPolicy() const override {
85
      return NetworkFilters::Common::Redis::Client::ReadPolicy::Primary;
86
    }
87

            
88
    // Batching
89
2
    unsigned int maxBufferSizeBeforeFlush() const override {
90
2
      return 0;
91
2
    } // Forces an immediate flush
92
1
    std::chrono::milliseconds bufferFlushTimeoutInMs() const override {
93
1
      return std::chrono::milliseconds(1);
94
1
    }
95

            
96
1
    uint32_t maxUpstreamUnknownConnections() const override { return 0; }
97
5
    bool enableCommandStats() const override { return false; }
98
    bool connectionRateLimitEnabled() const override { return false; }
99
    uint32_t connectionRateLimitPerSec() const override { return 0; }
100

            
101
    const std::chrono::milliseconds parent_timeout_;
102
    absl::optional<envoy::extensions::filters::network::redis_proxy::v3::AwsIam> aws_iam_config_;
103
  };
104

            
105
  struct RedisActiveHealthCheckSession
106
      : public ActiveHealthCheckSession,
107
        public Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks,
108
        public Network::ConnectionCallbacks {
109
    RedisActiveHealthCheckSession(RedisHealthChecker& parent, const Upstream::HostSharedPtr& host);
110
    ~RedisActiveHealthCheckSession() override;
111

            
112
    // ActiveHealthCheckSession
113
    void onInterval() override;
114
    void onTimeout() override;
115
    void onDeferredDelete() final;
116

            
117
    // Extensions::NetworkFilters::Common::Redis::Client::ClientCallbacks
118
    void onResponse(NetworkFilters::Common::Redis::RespValuePtr&& value) override;
119
    void onFailure() override;
120
    void onRedirection(NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&,
121
                       bool) override;
122

            
123
    // Network::ConnectionCallbacks
124
    void onEvent(Network::ConnectionEvent event) override;
125
7
    void onAboveWriteBufferHighWatermark() override {}
126
7
    void onBelowWriteBufferLowWatermark() override {}
127

            
128
    RedisHealthChecker& parent_;
129
    std::shared_ptr<RedisConfig> redis_config_{std::make_shared<RedisConfig>(parent_.timeout_)};
130
    Extensions::NetworkFilters::Common::Redis::Client::ClientPtr client_;
131
    Extensions::NetworkFilters::Common::Redis::Client::PoolRequest* current_request_{};
132
    Extensions::NetworkFilters::Common::Redis::RedisCommandStatsSharedPtr redis_command_stats_;
133
  };
134

            
135
  enum class Type { Ping, Exists };
136

            
137
  struct HealthCheckRequest {
138
    HealthCheckRequest(const std::string& key);
139
    HealthCheckRequest();
140

            
141
    NetworkFilters::Common::Redis::RespValue request_;
142
  };
143

            
144
  using RedisActiveHealthCheckSessionPtr = std::unique_ptr<RedisActiveHealthCheckSession>;
145

            
146
  // HealthCheckerImplBase
147
9
  ActiveHealthCheckSessionPtr makeSession(Upstream::HostSharedPtr host) override {
148
9
    return std::make_unique<RedisActiveHealthCheckSession>(*this, host);
149
9
  }
150

            
151
  Extensions::NetworkFilters::Common::Redis::Client::ClientFactory& client_factory_;
152
  Type type_;
153
  const std::string key_;
154
  RedisHealthCheckerStats redis_stats_;
155
  const std::string auth_username_;
156
  const std::string auth_password_;
157
  const absl::optional<
158
      Extensions::NetworkFilters::Common::Redis::AwsIamAuthenticator::AwsIamAuthenticatorSharedPtr>
159
      aws_iam_authenticator_;
160
  const absl::optional<envoy::extensions::filters::network::redis_proxy::v3::AwsIam>
161
      aws_iam_config_;
162

            
163
  const envoy::extensions::health_checkers::redis::v3::Redis& redis_config_;
164
};
165

            
166
} // namespace RedisHealthChecker
167
} // namespace HealthCheckers
168
} // namespace Extensions
169
} // namespace Envoy