Line data Source code
1 : #pragma once 2 : 3 : #include <cstdint> 4 : 5 : #include "envoy/access_log/access_log.h" 6 : #include "envoy/api/api.h" 7 : #include "envoy/common/random_generator.h" 8 : #include "envoy/config/core/v3/health_check.pb.h" 9 : #include "envoy/data/core/v3/health_check_event.pb.h" 10 : #include "envoy/grpc/status.h" 11 : #include "envoy/network/socket.h" 12 : #include "envoy/server/health_checker_config.h" 13 : #include "envoy/type/v3/http.pb.h" 14 : #include "envoy/type/v3/range.pb.h" 15 : 16 : #include "source/common/common/dump_state_utils.h" 17 : #include "source/common/common/logger.h" 18 : #include "source/common/grpc/codec.h" 19 : #include "source/common/http/codec_client.h" 20 : #include "source/common/router/header_parser.h" 21 : #include "source/common/stream_info/stream_info_impl.h" 22 : #include "source/common/upstream/health_checker_impl.h" 23 : #include "source/extensions/health_checkers/common/health_checker_base_impl.h" 24 : 25 : #include "src/proto/grpc/health/v1/health.pb.h" 26 : 27 : namespace Envoy { 28 : namespace Upstream { 29 : 30 : class GrpcHealthCheckerFactory : public Server::Configuration::CustomHealthCheckerFactory { 31 : public: 32 : Upstream::HealthCheckerSharedPtr 33 : createCustomHealthChecker(const envoy::config::core::v3::HealthCheck& config, 34 : Server::Configuration::HealthCheckerFactoryContext& context) override; 35 : 36 148 : std::string name() const override { return "envoy.health_checkers.grpc"; } 37 12 : ProtobufTypes::MessagePtr createEmptyConfigProto() override { 38 12 : return ProtobufTypes::MessagePtr{new envoy::config::core::v3::HealthCheck::GrpcHealthCheck()}; 39 12 : } 40 : }; 41 : 42 : DECLARE_FACTORY(GrpcHealthCheckerFactory); 43 : 44 : /** 45 : * gRPC health checker implementation. 46 : */ 47 : class GrpcHealthCheckerImpl : public HealthCheckerImplBase { 48 : public: 49 : GrpcHealthCheckerImpl(const Cluster& cluster, const envoy::config::core::v3::HealthCheck& config, 50 : Event::Dispatcher& dispatcher, Runtime::Loader& runtime, 51 : Random::RandomGenerator& random, HealthCheckEventLoggerPtr&& event_logger); 52 : 53 : private: 54 : struct GrpcActiveHealthCheckSession : public ActiveHealthCheckSession, 55 : public Http::ResponseDecoder, 56 : public Http::StreamCallbacks { 57 : GrpcActiveHealthCheckSession(GrpcHealthCheckerImpl& parent, const HostSharedPtr& host); 58 : ~GrpcActiveHealthCheckSession() override; 59 : 60 : void onRpcComplete(Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message, 61 : bool end_stream); 62 : bool isHealthCheckSucceeded(Grpc::Status::GrpcStatus grpc_status) const; 63 : void resetState(); 64 : void logHealthCheckStatus(Grpc::Status::GrpcStatus grpc_status, 65 : const std::string& grpc_message); 66 : 67 : // ActiveHealthCheckSession 68 : void onInterval() override; 69 : void onTimeout() override; 70 : void onDeferredDelete() final; 71 : 72 : // Http::StreamDecoder 73 : void decodeData(Buffer::Instance&, bool end_stream) override; 74 0 : void decodeMetadata(Http::MetadataMapPtr&&) override {} 75 : 76 : // Http::ResponseDecoder 77 0 : void decode1xxHeaders(Http::ResponseHeaderMapPtr&&) override {} 78 : void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override; 79 : void decodeTrailers(Http::ResponseTrailerMapPtr&&) override; 80 0 : void dumpState(std::ostream& os, int indent_level) const override { 81 0 : DUMP_STATE_UNIMPLEMENTED(GrpcActiveHealthCheckSession); 82 0 : } 83 : 84 : // Http::StreamCallbacks 85 : void onResetStream(Http::StreamResetReason reason, 86 : absl::string_view transport_failure_reason) override; 87 0 : void onAboveWriteBufferHighWatermark() override {} 88 0 : void onBelowWriteBufferLowWatermark() override {} 89 : 90 : void onEvent(Network::ConnectionEvent event); 91 : void onGoAway(Http::GoAwayErrorCode error_code); 92 : 93 : class ConnectionCallbackImpl : public Network::ConnectionCallbacks { 94 : public: 95 13 : ConnectionCallbackImpl(GrpcActiveHealthCheckSession& parent) : parent_(parent) {} 96 : // Network::ConnectionCallbacks 97 24 : void onEvent(Network::ConnectionEvent event) override { parent_.onEvent(event); } 98 0 : void onAboveWriteBufferHighWatermark() override {} 99 0 : void onBelowWriteBufferLowWatermark() override {} 100 : 101 : private: 102 : GrpcActiveHealthCheckSession& parent_; 103 : }; 104 : 105 : class HttpConnectionCallbackImpl : public Http::ConnectionCallbacks { 106 : public: 107 13 : HttpConnectionCallbackImpl(GrpcActiveHealthCheckSession& parent) : parent_(parent) {} 108 : // Http::ConnectionCallbacks 109 6 : void onGoAway(Http::GoAwayErrorCode error_code) override { parent_.onGoAway(error_code); } 110 : 111 : private: 112 : GrpcActiveHealthCheckSession& parent_; 113 : }; 114 : 115 : ConnectionCallbackImpl connection_callback_impl_{*this}; 116 : HttpConnectionCallbackImpl http_connection_callback_impl_{*this}; 117 : GrpcHealthCheckerImpl& parent_; 118 : Http::CodecClientPtr client_; 119 : Http::RequestEncoder* request_encoder_; 120 : Grpc::Decoder decoder_; 121 : std::unique_ptr<grpc::health::v1::HealthCheckResponse> health_check_response_; 122 : Network::ConnectionInfoProviderSharedPtr local_connection_info_provider_; 123 : // If true, stream reset was initiated by us (GrpcActiveHealthCheckSession), not by HTTP stack, 124 : // e.g. remote reset. In this case healthcheck status has already been reported, only state 125 : // cleanup is required. 126 : bool expect_reset_ = false; 127 : // If true, we received a GOAWAY (NO_ERROR code) and are deferring closing the connection 128 : // until the active probe completes. 129 : bool received_no_error_goaway_ = false; 130 : }; 131 : 132 : virtual Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE; 133 : 134 : // HealthCheckerImplBase 135 13 : ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override { 136 13 : return std::make_unique<GrpcActiveHealthCheckSession>(*this, host); 137 13 : } 138 16 : envoy::data::core::v3::HealthCheckerType healthCheckerType() const override { 139 16 : return envoy::data::core::v3::GRPC; 140 16 : } 141 : 142 : protected: 143 : Random::RandomGenerator& random_generator_; 144 : 145 : private: 146 : const Protobuf::MethodDescriptor& service_method_; 147 : absl::optional<std::string> service_name_; 148 : absl::optional<std::string> authority_value_; 149 : Router::HeaderParserPtr request_headers_parser_; 150 : }; 151 : 152 : /** 153 : * Production implementation of the gRPC health checker that allocates a real codec client. 154 : */ 155 : class ProdGrpcHealthCheckerImpl : public GrpcHealthCheckerImpl { 156 : public: 157 : using GrpcHealthCheckerImpl::GrpcHealthCheckerImpl; 158 : 159 : // GrpcHealthCheckerImpl 160 : Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override; 161 : }; 162 : 163 : } // namespace Upstream 164 : } // namespace Envoy