1
#include "source/extensions/health_checkers/redis/redis.h"
2

            
3
#include "envoy/config/core/v3/health_check.pb.h"
4
#include "envoy/data/core/v3/health_check_event.pb.h"
5
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h"
6
#include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h"
7
#include "envoy/extensions/health_checkers/redis/v3/redis.pb.h"
8

            
9
namespace Envoy {
10
namespace Extensions {
11
namespace HealthCheckers {
12
namespace RedisHealthChecker {
13

            
14
RedisHealthChecker::RedisHealthChecker(
15
    const Upstream::Cluster& cluster, const envoy::config::core::v3::HealthCheck& config,
16
    const envoy::extensions::health_checkers::redis::v3::Redis& redis_config,
17
    Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
18
    Upstream::HealthCheckEventLoggerPtr&& event_logger, Api::Api& api,
19
    Extensions::NetworkFilters::Common::Redis::Client::ClientFactory& client_factory,
20
    const absl::optional<envoy::extensions::filters::network::redis_proxy::v3::AwsIam>
21
        aws_iam_config,
22
    const absl::optional<Extensions::NetworkFilters::Common::Redis::AwsIamAuthenticator::
23
                             AwsIamAuthenticatorSharedPtr>
24
        aws_iam_authenticator)
25
14
    : HealthCheckerImplBase(cluster, config, dispatcher, runtime, api.randomGenerator(),
26
14
                            std::move(event_logger)),
27
14
      client_factory_(client_factory), key_(redis_config.key()),
28
14
      redis_stats_(generateRedisStats(cluster.info()->statsScope())),
29
      auth_username_(
30
14
          NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authUsername(cluster.info(), api)),
31
      auth_password_(
32
14
          NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authPassword(cluster.info(), api)),
33
14
      aws_iam_authenticator_(aws_iam_authenticator), aws_iam_config_(aws_iam_config),
34
14
      redis_config_(redis_config)
35

            
36
14
{
37
14
  if (!key_.empty()) {
38
5
    type_ = Type::Exists;
39
9
  } else {
40
9
    type_ = Type::Ping;
41
9
  }
42
14
}
43

            
44
RedisHealthChecker::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession(
45
    RedisHealthChecker& parent, const Upstream::HostSharedPtr& host)
46
10
    : ActiveHealthCheckSession(parent, host), parent_(parent) {
47
10
  redis_command_stats_ =
48
10
      Extensions::NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats(
49
10
          parent_.cluster_.info()->statsScope().symbolTable());
50
10
}
51

            
52
10
RedisHealthChecker::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() {
53
10
  ASSERT(current_request_ == nullptr);
54
10
  ASSERT(client_ == nullptr);
55
10
}
56

            
57
14
RedisHealthCheckerStats RedisHealthChecker::generateRedisStats(Stats::Scope& scope) {
58
14
  std::string prefix("health_check.redis.");
59
14
  return {ALL_REDIS_HEALTH_CHECKER_STATS(POOL_COUNTER_PREFIX(scope, prefix))};
60
14
}
61

            
62
10
void RedisHealthChecker::RedisActiveHealthCheckSession::onDeferredDelete() {
63
10
  if (current_request_) {
64
5
    current_request_->cancel();
65
5
    current_request_ = nullptr;
66
5
  }
67

            
68
10
  if (client_) {
69
9
    client_->close();
70
9
  }
71
10
}
72

            
73
16
void RedisHealthChecker::RedisActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) {
74
16
  if (event == Network::ConnectionEvent::RemoteClose ||
75
16
      event == Network::ConnectionEvent::LocalClose) {
76
    // This should only happen after any active requests have been failed/cancelled.
77
16
    ASSERT(!current_request_);
78
16
    parent_.dispatcher_.deferredDelete(std::move(client_));
79
16
  }
80
16
}
81

            
82
27
void RedisHealthChecker::RedisActiveHealthCheckSession::onInterval() {
83
27
  if (!client_) {
84
16
    client_ = parent_.client_factory_.create(
85
16
        host_, parent_.dispatcher_, redis_config_, redis_command_stats_,
86
16
        parent_.cluster_.info()->statsScope(), parent_.auth_username_, parent_.auth_password_,
87
16
        false, parent_.aws_iam_config_, parent_.aws_iam_authenticator_);
88
16
    client_->addConnectionCallbacks(*this);
89
16
  }
90

            
91
27
  ASSERT(!current_request_);
92

            
93
27
  switch (parent_.type_) {
94
7
  case Type::Exists:
95
7
    current_request_ = client_->makeRequest(existsHealthCheckRequest(parent_.key_), *this);
96
7
    break;
97
20
  case Type::Ping:
98
20
    current_request_ = client_->makeRequest(pingHealthCheckRequest(), *this);
99
20
    break;
100
27
  }
101
27
}
102

            
103
void RedisHealthChecker::RedisActiveHealthCheckSession::onResponse(
104
15
    NetworkFilters::Common::Redis::RespValuePtr&& value) {
105
15
  current_request_ = nullptr;
106

            
107
15
  switch (parent_.type_) {
108
5
  case Type::Exists:
109
5
    if (value->type() == NetworkFilters::Common::Redis::RespType::Integer &&
110
5
        value->asInteger() == 0) {
111
2
      handleSuccess();
112
3
    } else {
113
3
      parent_.redis_stats_.exists_failure_.inc();
114
3
      handleFailure(envoy::data::core::v3::ACTIVE);
115
3
    }
116
5
    break;
117
10
  case Type::Ping:
118
10
    if (value->type() == NetworkFilters::Common::Redis::RespType::SimpleString &&
119
10
        value->asString() == "PONG") {
120
5
      handleSuccess();
121
5
    } else {
122
5
      handleFailure(envoy::data::core::v3::ACTIVE);
123
5
    }
124
10
    break;
125
15
  }
126

            
127
15
  if (!parent_.reuse_connection_) {
128
2
    client_->close();
129
2
  }
130
15
}
131

            
132
3
void RedisHealthChecker::RedisActiveHealthCheckSession::onFailure() {
133
3
  current_request_ = nullptr;
134
3
  handleFailure(envoy::data::core::v3::NETWORK);
135
3
}
136

            
137
void RedisHealthChecker::RedisActiveHealthCheckSession::onRedirection(
138
2
    NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&, bool) {
139
  // Treat any redirection error response from a Redis server as success.
140
2
  current_request_ = nullptr;
141
2
  handleSuccess();
142
2
}
143

            
144
2
void RedisHealthChecker::RedisActiveHealthCheckSession::onTimeout() {
145
2
  current_request_->cancel();
146
2
  current_request_ = nullptr;
147
2
  client_->close();
148
2
}
149

            
150
1
RedisHealthChecker::HealthCheckRequest::HealthCheckRequest(const std::string& key) {
151
1
  std::vector<NetworkFilters::Common::Redis::RespValue> values(2);
152
1
  values[0].type(NetworkFilters::Common::Redis::RespType::BulkString);
153
1
  values[0].asString() = "EXISTS";
154
1
  values[1].type(NetworkFilters::Common::Redis::RespType::BulkString);
155
1
  values[1].asString() = key;
156
1
  request_.type(NetworkFilters::Common::Redis::RespType::Array);
157
1
  request_.asArray().swap(values);
158
1
}
159

            
160
1
RedisHealthChecker::HealthCheckRequest::HealthCheckRequest() {
161
1
  std::vector<NetworkFilters::Common::Redis::RespValue> values(1);
162
1
  values[0].type(NetworkFilters::Common::Redis::RespType::BulkString);
163
1
  values[0].asString() = "PING";
164
1
  request_.type(NetworkFilters::Common::Redis::RespType::Array);
165
1
  request_.asArray().swap(values);
166
1
}
167

            
168
} // namespace RedisHealthChecker
169
} // namespace HealthCheckers
170
} // namespace Extensions
171
} // namespace Envoy