Line data Source code
1 : #include "source/extensions/health_checkers/redis/redis.h" 2 : 3 : #include "envoy/config/core/v3/health_check.pb.h" 4 : #include "envoy/data/core/v3/health_check_event.pb.h" 5 : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.h" 6 : #include "envoy/extensions/filters/network/redis_proxy/v3/redis_proxy.pb.validate.h" 7 : #include "envoy/extensions/health_checkers/redis/v3/redis.pb.h" 8 : 9 : namespace Envoy { 10 : namespace Extensions { 11 : namespace HealthCheckers { 12 : namespace RedisHealthChecker { 13 : 14 : RedisHealthChecker::RedisHealthChecker( 15 : const Upstream::Cluster& cluster, const envoy::config::core::v3::HealthCheck& config, 16 : const envoy::extensions::health_checkers::redis::v3::Redis& redis_config, 17 : Event::Dispatcher& dispatcher, Runtime::Loader& runtime, 18 : Upstream::HealthCheckEventLoggerPtr&& event_logger, Api::Api& api, 19 : Extensions::NetworkFilters::Common::Redis::Client::ClientFactory& client_factory) 20 : : HealthCheckerImplBase(cluster, config, dispatcher, runtime, api.randomGenerator(), 21 : std::move(event_logger)), 22 : client_factory_(client_factory), key_(redis_config.key()), 23 : redis_stats_(generateRedisStats(cluster.info()->statsScope())), 24 : auth_username_( 25 : NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authUsername(cluster.info(), api)), 26 : auth_password_(NetworkFilters::RedisProxy::ProtocolOptionsConfigImpl::authPassword( 27 0 : cluster.info(), api)) { 28 0 : if (!key_.empty()) { 29 0 : type_ = Type::Exists; 30 0 : } else { 31 0 : type_ = Type::Ping; 32 0 : } 33 0 : } 34 : 35 : RedisHealthChecker::RedisActiveHealthCheckSession::RedisActiveHealthCheckSession( 36 : RedisHealthChecker& parent, const Upstream::HostSharedPtr& host) 37 0 : : ActiveHealthCheckSession(parent, host), parent_(parent) { 38 0 : redis_command_stats_ = 39 0 : Extensions::NetworkFilters::Common::Redis::RedisCommandStats::createRedisCommandStats( 40 0 : parent_.cluster_.info()->statsScope().symbolTable()); 41 0 : } 42 : 43 0 : RedisHealthChecker::RedisActiveHealthCheckSession::~RedisActiveHealthCheckSession() { 44 0 : ASSERT(current_request_ == nullptr); 45 0 : ASSERT(client_ == nullptr); 46 0 : } 47 : 48 0 : RedisHealthCheckerStats RedisHealthChecker::generateRedisStats(Stats::Scope& scope) { 49 0 : std::string prefix("health_check.redis."); 50 0 : return {ALL_REDIS_HEALTH_CHECKER_STATS(POOL_COUNTER_PREFIX(scope, prefix))}; 51 0 : } 52 : 53 0 : void RedisHealthChecker::RedisActiveHealthCheckSession::onDeferredDelete() { 54 0 : if (current_request_) { 55 0 : current_request_->cancel(); 56 0 : current_request_ = nullptr; 57 0 : } 58 : 59 0 : if (client_) { 60 0 : client_->close(); 61 0 : } 62 0 : } 63 : 64 0 : void RedisHealthChecker::RedisActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) { 65 0 : if (event == Network::ConnectionEvent::RemoteClose || 66 0 : event == Network::ConnectionEvent::LocalClose) { 67 : // This should only happen after any active requests have been failed/cancelled. 68 0 : ASSERT(!current_request_); 69 0 : parent_.dispatcher_.deferredDelete(std::move(client_)); 70 0 : } 71 0 : } 72 : 73 0 : void RedisHealthChecker::RedisActiveHealthCheckSession::onInterval() { 74 0 : if (!client_) { 75 0 : client_ = 76 0 : parent_.client_factory_.create(host_, parent_.dispatcher_, *this, redis_command_stats_, 77 0 : parent_.cluster_.info()->statsScope(), 78 0 : parent_.auth_username_, parent_.auth_password_, false); 79 0 : client_->addConnectionCallbacks(*this); 80 0 : } 81 : 82 0 : ASSERT(!current_request_); 83 : 84 0 : switch (parent_.type_) { 85 0 : case Type::Exists: 86 0 : current_request_ = client_->makeRequest(existsHealthCheckRequest(parent_.key_), *this); 87 0 : break; 88 0 : case Type::Ping: 89 0 : current_request_ = client_->makeRequest(pingHealthCheckRequest(), *this); 90 0 : break; 91 0 : } 92 0 : } 93 : 94 : void RedisHealthChecker::RedisActiveHealthCheckSession::onResponse( 95 0 : NetworkFilters::Common::Redis::RespValuePtr&& value) { 96 0 : current_request_ = nullptr; 97 : 98 0 : switch (parent_.type_) { 99 0 : case Type::Exists: 100 0 : if (value->type() == NetworkFilters::Common::Redis::RespType::Integer && 101 0 : value->asInteger() == 0) { 102 0 : handleSuccess(); 103 0 : } else { 104 0 : parent_.redis_stats_.exists_failure_.inc(); 105 0 : handleFailure(envoy::data::core::v3::ACTIVE); 106 0 : } 107 0 : break; 108 0 : case Type::Ping: 109 0 : if (value->type() == NetworkFilters::Common::Redis::RespType::SimpleString && 110 0 : value->asString() == "PONG") { 111 0 : handleSuccess(); 112 0 : } else { 113 0 : handleFailure(envoy::data::core::v3::ACTIVE); 114 0 : } 115 0 : break; 116 0 : } 117 : 118 0 : if (!parent_.reuse_connection_) { 119 0 : client_->close(); 120 0 : } 121 0 : } 122 : 123 0 : void RedisHealthChecker::RedisActiveHealthCheckSession::onFailure() { 124 0 : current_request_ = nullptr; 125 0 : handleFailure(envoy::data::core::v3::NETWORK); 126 0 : } 127 : 128 : void RedisHealthChecker::RedisActiveHealthCheckSession::onRedirection( 129 0 : NetworkFilters::Common::Redis::RespValuePtr&&, const std::string&, bool) { 130 : // Treat any redirection error response from a Redis server as success. 131 0 : current_request_ = nullptr; 132 0 : handleSuccess(); 133 0 : } 134 : 135 0 : void RedisHealthChecker::RedisActiveHealthCheckSession::onTimeout() { 136 0 : current_request_->cancel(); 137 0 : current_request_ = nullptr; 138 0 : client_->close(); 139 0 : } 140 : 141 0 : RedisHealthChecker::HealthCheckRequest::HealthCheckRequest(const std::string& key) { 142 0 : std::vector<NetworkFilters::Common::Redis::RespValue> values(2); 143 0 : values[0].type(NetworkFilters::Common::Redis::RespType::BulkString); 144 0 : values[0].asString() = "EXISTS"; 145 0 : values[1].type(NetworkFilters::Common::Redis::RespType::BulkString); 146 0 : values[1].asString() = key; 147 0 : request_.type(NetworkFilters::Common::Redis::RespType::Array); 148 0 : request_.asArray().swap(values); 149 0 : } 150 : 151 0 : RedisHealthChecker::HealthCheckRequest::HealthCheckRequest() { 152 0 : std::vector<NetworkFilters::Common::Redis::RespValue> values(1); 153 0 : values[0].type(NetworkFilters::Common::Redis::RespType::BulkString); 154 0 : values[0].asString() = "PING"; 155 0 : request_.type(NetworkFilters::Common::Redis::RespType::Array); 156 0 : request_.asArray().swap(values); 157 0 : } 158 : 159 : } // namespace RedisHealthChecker 160 : } // namespace HealthCheckers 161 : } // namespace Extensions 162 : } // namespace Envoy