1
#pragma once
2

            
3
#include <cstdint>
4

            
5
#include "envoy/access_log/access_log.h"
6
#include "envoy/api/api.h"
7
#include "envoy/common/random_generator.h"
8
#include "envoy/config/core/v3/health_check.pb.h"
9
#include "envoy/data/core/v3/health_check_event.pb.h"
10
#include "envoy/grpc/status.h"
11
#include "envoy/network/socket.h"
12
#include "envoy/server/health_checker_config.h"
13
#include "envoy/type/v3/http.pb.h"
14
#include "envoy/type/v3/range.pb.h"
15

            
16
#include "source/common/common/dump_state_utils.h"
17
#include "source/common/common/logger.h"
18
#include "source/common/grpc/codec.h"
19
#include "source/common/http/codec_client.h"
20
#include "source/common/http/response_decoder_impl_base.h"
21
#include "source/common/router/header_parser.h"
22
#include "source/common/stream_info/stream_info_impl.h"
23
#include "source/common/upstream/health_checker_impl.h"
24
#include "source/extensions/health_checkers/common/health_checker_base_impl.h"
25

            
26
#include "src/proto/grpc/health/v1/health.pb.h"
27

            
28
namespace Envoy {
29
namespace Upstream {
30

            
31
class GrpcHealthCheckerFactory : public Server::Configuration::CustomHealthCheckerFactory {
32
public:
33
  Upstream::HealthCheckerSharedPtr
34
  createCustomHealthChecker(const envoy::config::core::v3::HealthCheck& config,
35
                            Server::Configuration::HealthCheckerFactoryContext& context) override;
36

            
37
11080
  std::string name() const override { return "envoy.health_checkers.grpc"; }
38
463
  ProtobufTypes::MessagePtr createEmptyConfigProto() override {
39
463
    return ProtobufTypes::MessagePtr{new envoy::config::core::v3::HealthCheck::GrpcHealthCheck()};
40
463
  }
41
};
42

            
43
DECLARE_FACTORY(GrpcHealthCheckerFactory);
44

            
45
/**
46
 * gRPC health checker implementation.
47
 */
48
class GrpcHealthCheckerImpl : public HealthCheckerImplBase {
49
public:
50
  GrpcHealthCheckerImpl(const Cluster& cluster, const envoy::config::core::v3::HealthCheck& config,
51
                        Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
52
                        Random::RandomGenerator& random, HealthCheckEventLoggerPtr&& event_logger);
53

            
54
private:
55
  struct GrpcActiveHealthCheckSession : public ActiveHealthCheckSession,
56
                                        public Http::ResponseDecoderImplBase,
57
                                        public Http::StreamCallbacks {
58
    GrpcActiveHealthCheckSession(GrpcHealthCheckerImpl& parent, const HostSharedPtr& host);
59
    ~GrpcActiveHealthCheckSession() override;
60

            
61
    void onRpcComplete(Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message,
62
                       bool end_stream);
63
    bool isHealthCheckSucceeded(Grpc::Status::GrpcStatus grpc_status) const;
64
    void resetState();
65
    void logHealthCheckStatus(Grpc::Status::GrpcStatus grpc_status,
66
                              const std::string& grpc_message);
67

            
68
    // ActiveHealthCheckSession
69
    void onInterval() override;
70
    void onTimeout() override;
71
    void onDeferredDelete() final;
72

            
73
    // Http::StreamDecoder
74
    void decodeData(Buffer::Instance&, bool end_stream) override;
75
    void decodeMetadata(Http::MetadataMapPtr&&) override {}
76

            
77
    // Http::ResponseDecoder
78
    void decode1xxHeaders(Http::ResponseHeaderMapPtr&&) override {}
79
    void decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
80
    void decodeTrailers(Http::ResponseTrailerMapPtr&&) override;
81
    void dumpState(std::ostream& os, int indent_level) const override {
82
      DUMP_STATE_UNIMPLEMENTED(GrpcActiveHealthCheckSession);
83
    }
84

            
85
    // Http::StreamCallbacks
86
    void onResetStream(Http::StreamResetReason reason,
87
                       absl::string_view transport_failure_reason) override;
88
1
    void onAboveWriteBufferHighWatermark() override {}
89
1
    void onBelowWriteBufferLowWatermark() override {}
90

            
91
    void onEvent(Network::ConnectionEvent event);
92
    void onGoAway(Http::GoAwayErrorCode error_code);
93

            
94
    class ConnectionCallbackImpl : public Network::ConnectionCallbacks {
95
    public:
96
59
      ConnectionCallbackImpl(GrpcActiveHealthCheckSession& parent) : parent_(parent) {}
97
      // Network::ConnectionCallbacks
98
75
      void onEvent(Network::ConnectionEvent event) override { parent_.onEvent(event); }
99
1
      void onAboveWriteBufferHighWatermark() override {}
100
1
      void onBelowWriteBufferLowWatermark() override {}
101

            
102
    private:
103
      GrpcActiveHealthCheckSession& parent_;
104
    };
105

            
106
    class HttpConnectionCallbackImpl : public Http::ConnectionCallbacks {
107
    public:
108
59
      HttpConnectionCallbackImpl(GrpcActiveHealthCheckSession& parent) : parent_(parent) {}
109
      // Http::ConnectionCallbacks
110
9
      void onGoAway(Http::GoAwayErrorCode error_code) override { parent_.onGoAway(error_code); }
111

            
112
    private:
113
      GrpcActiveHealthCheckSession& parent_;
114
    };
115

            
116
    ConnectionCallbackImpl connection_callback_impl_{*this};
117
    HttpConnectionCallbackImpl http_connection_callback_impl_{*this};
118
    GrpcHealthCheckerImpl& parent_;
119
    Http::CodecClientPtr client_;
120
    Http::RequestEncoder* request_encoder_;
121
    Grpc::Decoder decoder_;
122
    std::unique_ptr<grpc::health::v1::HealthCheckResponse> health_check_response_;
123
    Network::ConnectionInfoProviderSharedPtr local_connection_info_provider_;
124
    // If true, stream reset was initiated by us (GrpcActiveHealthCheckSession), not by HTTP stack,
125
    // e.g. remote reset. In this case healthcheck status has already been reported, only state
126
    // cleanup is required.
127
    bool expect_reset_ = false;
128
    // If true, we received a GOAWAY (NO_ERROR code) and are deferring closing the connection
129
    // until the active probe completes.
130
    bool received_no_error_goaway_ = false;
131
  };
132

            
133
  virtual Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) PURE;
134

            
135
  // HealthCheckerImplBase
136
59
  ActiveHealthCheckSessionPtr makeSession(HostSharedPtr host) override {
137
59
    return std::make_unique<GrpcActiveHealthCheckSession>(*this, host);
138
59
  }
139
72
  envoy::data::core::v3::HealthCheckerType healthCheckerType() const override {
140
72
    return envoy::data::core::v3::GRPC;
141
72
  }
142

            
143
protected:
144
  Random::RandomGenerator& random_generator_;
145

            
146
private:
147
  const Protobuf::MethodDescriptor& service_method_;
148
  absl::optional<std::string> service_name_;
149
  absl::optional<std::string> authority_value_;
150
  Router::HeaderParserPtr request_headers_parser_;
151
};
152

            
153
/**
154
 * Production implementation of the gRPC health checker that allocates a real codec client.
155
 */
156
class ProdGrpcHealthCheckerImpl : public GrpcHealthCheckerImpl {
157
public:
158
  using GrpcHealthCheckerImpl::GrpcHealthCheckerImpl;
159

            
160
  // GrpcHealthCheckerImpl
161
  Http::CodecClientPtr createCodecClient(Upstream::Host::CreateConnectionData& data) override;
162
};
163

            
164
} // namespace Upstream
165
} // namespace Envoy