Line data Source code
1 : #include "source/extensions/health_checkers/grpc/health_checker_impl.h"
2 :
3 : #include <cstdint>
4 : #include <iterator>
5 : #include <memory>
6 :
7 : #include "envoy/config/core/v3/health_check.pb.h"
8 : #include "envoy/data/core/v3/health_check_event.pb.h"
9 : #include "envoy/server/health_checker_config.h"
10 : #include "envoy/type/v3/http.pb.h"
11 : #include "envoy/type/v3/range.pb.h"
12 :
13 : #include "source/common/buffer/zero_copy_input_stream_impl.h"
14 : #include "source/common/common/empty_string.h"
15 : #include "source/common/common/enum_to_int.h"
16 : #include "source/common/common/macros.h"
17 : #include "source/common/config/utility.h"
18 : #include "source/common/config/well_known_names.h"
19 : #include "source/common/grpc/common.h"
20 : #include "source/common/http/header_map_impl.h"
21 : #include "source/common/http/header_utility.h"
22 : #include "source/common/network/address_impl.h"
23 : #include "source/common/network/socket_impl.h"
24 : #include "source/common/network/utility.h"
25 : #include "source/common/router/router.h"
26 : #include "source/common/runtime/runtime_features.h"
27 : #include "source/common/upstream/host_utility.h"
28 :
29 : #include "absl/strings/match.h"
30 : #include "absl/strings/str_cat.h"
31 :
32 : namespace Envoy {
33 : namespace Upstream {
34 : namespace {
35 : const std::string& getHostname(const HostSharedPtr& host,
36 : const absl::optional<std::string>& config_hostname,
37 26 : const ClusterInfoConstSharedPtr& cluster) {
38 26 : if (config_hostname.has_value()) {
39 2 : return HealthCheckerFactory::getHostname(host, config_hostname.value(), cluster);
40 2 : }
41 24 : return HealthCheckerFactory::getHostname(host, EMPTY_STRING, cluster);
42 26 : }
43 : } // namespace
44 :
45 : Upstream::HealthCheckerSharedPtr GrpcHealthCheckerFactory::createCustomHealthChecker(
46 : const envoy::config::core::v3::HealthCheck& config,
47 0 : Server::Configuration::HealthCheckerFactoryContext& context) {
48 0 : return std::make_shared<ProdGrpcHealthCheckerImpl>(
49 0 : context.cluster(), config, context.mainThreadDispatcher(), context.runtime(),
50 0 : context.api().randomGenerator(), context.eventLogger());
51 0 : }
52 :
53 : REGISTER_FACTORY(GrpcHealthCheckerFactory, Server::Configuration::CustomHealthCheckerFactory);
54 :
55 : GrpcHealthCheckerImpl::GrpcHealthCheckerImpl(const Cluster& cluster,
56 : const envoy::config::core::v3::HealthCheck& config,
57 : Event::Dispatcher& dispatcher,
58 : Runtime::Loader& runtime,
59 : Random::RandomGenerator& random,
60 : HealthCheckEventLoggerPtr&& event_logger)
61 : : HealthCheckerImplBase(cluster, config, dispatcher, runtime, random, std::move(event_logger)),
62 : random_generator_(random),
63 : service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
64 : "grpc.health.v1.Health.Check")),
65 : request_headers_parser_(
66 13 : Router::HeaderParser::configure(config.grpc_health_check().initial_metadata())) {
67 13 : if (!config.grpc_health_check().service_name().empty()) {
68 10 : service_name_ = config.grpc_health_check().service_name();
69 10 : }
70 :
71 13 : if (!config.grpc_health_check().authority().empty()) {
72 2 : authority_value_ = config.grpc_health_check().authority();
73 2 : }
74 13 : }
75 :
76 : GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSession(
77 : GrpcHealthCheckerImpl& parent, const HostSharedPtr& host)
78 : : ActiveHealthCheckSession(parent, host), parent_(parent),
79 : local_connection_info_provider_(std::make_shared<Network::ConnectionInfoSetterImpl>(
80 : Network::Utility::getCanonicalIpv4LoopbackAddress(),
81 13 : Network::Utility::getCanonicalIpv4LoopbackAddress())) {}
82 :
83 13 : GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() {
84 13 : ASSERT(client_ == nullptr);
85 13 : }
86 :
87 13 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onDeferredDelete() {
88 13 : if (client_) {
89 : // If there is an active request it will get reset, so make sure we ignore the reset.
90 10 : expect_reset_ = true;
91 10 : client_->close(Network::ConnectionCloseType::Abort);
92 10 : }
93 13 : }
94 :
95 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeHeaders(
96 8 : Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
97 8 : const auto http_response_status = Http::Utility::getResponseStatus(*headers);
98 8 : if (http_response_status != enumToInt(Http::Code::OK)) {
99 : // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
100 : // grpc-status be used if available.
101 0 : if (end_stream) {
102 0 : const auto grpc_status = Grpc::Common::getGrpcStatus(*headers);
103 0 : if (grpc_status) {
104 0 : onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true);
105 0 : return;
106 0 : }
107 0 : }
108 0 : onRpcComplete(Grpc::Utility::httpToGrpcStatus(http_response_status), "non-200 HTTP response",
109 0 : end_stream);
110 0 : return;
111 0 : }
112 8 : if (!Grpc::Common::isGrpcResponseHeaders(*headers, end_stream)) {
113 1 : onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "not a gRPC request", false);
114 1 : return;
115 1 : }
116 7 : if (end_stream) {
117 : // This is how, for instance, grpc-go signals about missing service - HTTP/2 200 OK with
118 : // 'unimplemented' gRPC status.
119 0 : const auto grpc_status = Grpc::Common::getGrpcStatus(*headers);
120 0 : if (grpc_status) {
121 0 : onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true);
122 0 : return;
123 0 : }
124 0 : onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
125 0 : "gRPC protocol violation: unexpected stream end", true);
126 0 : }
127 7 : }
128 :
129 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeData(Buffer::Instance& data,
130 21 : bool end_stream) {
131 21 : if (end_stream) {
132 3 : onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
133 3 : "gRPC protocol violation: unexpected stream end", true);
134 3 : return;
135 3 : }
136 : // We should end up with only one frame here.
137 18 : std::vector<Grpc::Frame> decoded_frames;
138 18 : if (!decoder_.decode(data, decoded_frames)) {
139 1 : onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "gRPC wire protocol decode error",
140 1 : false);
141 1 : return;
142 1 : }
143 17 : for (auto& frame : decoded_frames) {
144 3 : if (frame.length_ > 0) {
145 3 : if (health_check_response_) {
146 : // grpc.health.v1.Health.Check is unary RPC, so only one message is allowed.
147 0 : onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "unexpected streaming", false);
148 0 : return;
149 0 : }
150 3 : health_check_response_ = std::make_unique<grpc::health::v1::HealthCheckResponse>();
151 3 : Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));
152 :
153 3 : if (frame.flags_ != Grpc::GRPC_FH_DEFAULT ||
154 3 : !health_check_response_->ParseFromZeroCopyStream(&stream)) {
155 0 : onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
156 0 : "invalid grpc.health.v1 RPC payload", false);
157 0 : return;
158 0 : }
159 3 : }
160 3 : }
161 17 : }
162 :
163 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeTrailers(
164 3 : Http::ResponseTrailerMapPtr&& trailers) {
165 3 : auto maybe_grpc_status = Grpc::Common::getGrpcStatus(*trailers);
166 3 : auto grpc_status =
167 3 : maybe_grpc_status
168 3 : ? maybe_grpc_status.value()
169 3 : : static_cast<Grpc::Status::GrpcStatus>(Grpc::Status::WellKnownGrpcStatus::Internal);
170 3 : const std::string grpc_message =
171 3 : maybe_grpc_status ? Grpc::Common::getGrpcMessage(*trailers) : "invalid gRPC status";
172 3 : onRpcComplete(grpc_status, grpc_message, true);
173 3 : }
174 :
175 24 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) {
176 24 : if (event == Network::ConnectionEvent::RemoteClose ||
177 24 : event == Network::ConnectionEvent::LocalClose) {
178 : // For the raw disconnect event, we are either between intervals in which case we already have
179 : // a timer setup, or we did the close or got a reset, in which case we already setup a new
180 : // timer. There is nothing to do here other than blow away the client.
181 23 : parent_.dispatcher_.deferredDelete(std::move(client_));
182 23 : }
183 24 : }
184 :
185 26 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onInterval() {
186 26 : if (!client_) {
187 23 : Upstream::Host::CreateConnectionData conn =
188 23 : host_->createHealthCheckConnection(parent_.dispatcher_, parent_.transportSocketOptions(),
189 23 : parent_.transportSocketMatchMetadata().get());
190 23 : client_ = parent_.createCodecClient(conn);
191 23 : client_->addConnectionCallbacks(connection_callback_impl_);
192 23 : client_->setCodecConnectionCallbacks(http_connection_callback_impl_);
193 23 : }
194 :
195 26 : request_encoder_ = &client_->newStream(*this);
196 26 : request_encoder_->getStream().addCallbacks(*this);
197 :
198 26 : const std::string& authority =
199 26 : getHostname(host_, parent_.authority_value_, parent_.cluster_.info());
200 26 : auto headers_message =
201 26 : Grpc::Common::prepareHeaders(authority, parent_.service_method_.service()->full_name(),
202 26 : parent_.service_method_.name(), absl::nullopt);
203 26 : headers_message->headers().setReferenceUserAgent(
204 26 : Http::Headers::get().UserAgentValues.EnvoyHealthChecker);
205 :
206 26 : StreamInfo::StreamInfoImpl stream_info(Http::Protocol::Http2, parent_.dispatcher_.timeSource(),
207 26 : local_connection_info_provider_);
208 26 : stream_info.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
209 26 : stream_info.upstreamInfo()->setUpstreamHost(host_);
210 26 : parent_.request_headers_parser_->evaluateHeaders(headers_message->headers(), stream_info);
211 :
212 26 : Grpc::Common::toGrpcTimeout(parent_.timeout_, headers_message->headers());
213 :
214 26 : Router::FilterUtility::setUpstreamScheme(
215 26 : headers_message->headers(),
216 : // Here there is no downstream connection so scheme will be based on
217 : // upstream crypto
218 26 : host_->transportSocketFactory().implementsSecureTransport());
219 :
220 26 : auto status = request_encoder_->encodeHeaders(headers_message->headers(), false);
221 : // Encoding will only fail if required headers are missing.
222 26 : ASSERT(status.ok());
223 :
224 26 : grpc::health::v1::HealthCheckRequest request;
225 26 : if (parent_.service_name_.has_value()) {
226 21 : request.set_service(parent_.service_name_.value());
227 21 : }
228 :
229 26 : request_encoder_->encodeData(*Grpc::Common::serializeToGrpcFrame(request), true);
230 26 : }
231 :
232 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onResetStream(Http::StreamResetReason,
233 20 : absl::string_view) {
234 20 : const bool expected_reset = expect_reset_;
235 20 : const bool goaway = received_no_error_goaway_;
236 20 : resetState();
237 :
238 20 : if (expected_reset) {
239 : // Stream reset was initiated by us (bogus gRPC response, timeout or cluster host is going
240 : // away). In these cases health check failure has already been reported and a GOAWAY (if any)
241 : // has already been handled, so just return.
242 13 : return;
243 13 : }
244 :
245 7 : ENVOY_CONN_LOG(debug, "connection/stream error health_flags={}", *client_,
246 7 : HostUtility::healthFlagsToString(*host_));
247 :
248 7 : if (goaway || !parent_.reuse_connection_) {
249 : // Stream reset was unexpected, so we haven't closed the connection
250 : // yet in response to a GOAWAY or due to disabled connection reuse.
251 3 : client_->close(Network::ConnectionCloseType::Abort);
252 3 : }
253 :
254 : // TODO(baranov1ch): according to all HTTP standards, we should check if reason is one of
255 : // Http::StreamResetReason::RemoteRefusedStreamReset (which may mean GOAWAY),
256 : // Http::StreamResetReason::RemoteReset or Http::StreamResetReason::ConnectionTermination (both
257 : // mean connection close), check if connection is not fresh (was used for at least 1 request)
258 : // and silently retry request on the fresh connection. This is also true for HTTP/1.1 healthcheck.
259 7 : handleFailure(envoy::data::core::v3::NETWORK);
260 7 : }
261 :
262 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onGoAway(
263 6 : Http::GoAwayErrorCode error_code) {
264 6 : ENVOY_CONN_LOG(debug, "connection going away health_flags={}", *client_,
265 6 : HostUtility::healthFlagsToString(*host_));
266 : // If we have an active health check probe and receive a GOAWAY indicating
267 : // graceful shutdown, allow the probe to complete before closing the connection.
268 : // The connection will be closed when the active check completes or another
269 : // terminal condition occurs, such as a timeout or stream reset.
270 6 : if (request_encoder_ && error_code == Http::GoAwayErrorCode::NoError) {
271 4 : received_no_error_goaway_ = true;
272 4 : return;
273 4 : }
274 :
275 : // Even if we have active health check probe, fail it on GOAWAY and schedule new one.
276 2 : if (request_encoder_) {
277 1 : handleFailure(envoy::data::core::v3::NETWORK);
278 : // request_encoder_ can already be destroyed if the host was removed during the failure callback
279 : // above.
280 1 : if (request_encoder_ != nullptr) {
281 1 : expect_reset_ = true;
282 1 : request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
283 1 : }
284 1 : }
285 : // client_ can already be destroyed if the host was removed during the failure callback above.
286 2 : if (client_ != nullptr) {
287 2 : client_->close(Network::ConnectionCloseType::Abort);
288 2 : }
289 2 : }
290 :
291 : bool GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::isHealthCheckSucceeded(
292 8 : Grpc::Status::GrpcStatus grpc_status) const {
293 8 : if (grpc_status != Grpc::Status::WellKnownGrpcStatus::Ok) {
294 5 : return false;
295 5 : }
296 :
297 3 : if (!health_check_response_ ||
298 3 : health_check_response_->status() != grpc::health::v1::HealthCheckResponse::SERVING) {
299 0 : return false;
300 0 : }
301 :
302 3 : return true;
303 3 : }
304 :
305 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onRpcComplete(
306 8 : Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message, bool end_stream) {
307 8 : logHealthCheckStatus(grpc_status, grpc_message);
308 8 : if (isHealthCheckSucceeded(grpc_status)) {
309 3 : handleSuccess(false);
310 5 : } else {
311 5 : handleFailure(envoy::data::core::v3::ACTIVE);
312 5 : }
313 :
314 : // Read the value as we may call resetState() and clear it.
315 8 : const bool goaway = received_no_error_goaway_;
316 :
317 : // |end_stream| will be false if we decided to stop healthcheck before HTTP stream has ended -
318 : // invalid gRPC payload, unexpected message stream or wrong content-type.
319 8 : if (end_stream) {
320 6 : resetState();
321 6 : } else {
322 : // request_encoder_ can already be destroyed if the host was removed during the failure callback
323 : // above.
324 2 : if (request_encoder_ != nullptr) {
325 : // resetState() will be called by onResetStream().
326 2 : expect_reset_ = true;
327 2 : request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
328 2 : }
329 2 : }
330 :
331 : // client_ can already be destroyed if the host was removed during the failure callback above.
332 8 : if (client_ != nullptr && (!parent_.reuse_connection_ || goaway)) {
333 0 : client_->close(Network::ConnectionCloseType::Abort);
334 0 : }
335 8 : }
336 :
337 26 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::resetState() {
338 26 : expect_reset_ = false;
339 26 : request_encoder_ = nullptr;
340 26 : decoder_ = Grpc::Decoder();
341 26 : health_check_response_.reset();
342 26 : received_no_error_goaway_ = false;
343 26 : }
344 :
345 8 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onTimeout() {
346 8 : ENVOY_CONN_LOG(debug, "connection/stream timeout health_flags={}", *client_,
347 8 : HostUtility::healthFlagsToString(*host_));
348 8 : expect_reset_ = true;
349 8 : if (received_no_error_goaway_ || !parent_.reuse_connection_) {
350 0 : client_->close(Network::ConnectionCloseType::Abort);
351 8 : } else {
352 8 : request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
353 8 : }
354 8 : }
355 :
356 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::logHealthCheckStatus(
357 8 : Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message) {
358 8 : const char* service_status;
359 8 : if (!health_check_response_) {
360 5 : service_status = "rpc_error";
361 5 : } else {
362 3 : switch (health_check_response_->status()) {
363 3 : case grpc::health::v1::HealthCheckResponse::SERVING:
364 3 : service_status = "serving";
365 3 : break;
366 0 : case grpc::health::v1::HealthCheckResponse::NOT_SERVING:
367 0 : service_status = "not_serving";
368 0 : break;
369 0 : case grpc::health::v1::HealthCheckResponse::UNKNOWN:
370 0 : service_status = "unknown";
371 0 : break;
372 0 : case grpc::health::v1::HealthCheckResponse::SERVICE_UNKNOWN:
373 0 : service_status = "service_unknown";
374 0 : break;
375 0 : default:
376 0 : service_status = "unknown_healthcheck_response";
377 0 : break;
378 3 : }
379 3 : }
380 8 : std::string grpc_status_message;
381 8 : if (grpc_status != Grpc::Status::WellKnownGrpcStatus::Ok && !grpc_message.empty()) {
382 5 : grpc_status_message = fmt::format("{} ({})", grpc_status, grpc_message);
383 5 : } else {
384 3 : grpc_status_message = absl::StrCat("", grpc_status);
385 3 : }
386 :
387 8 : ENVOY_CONN_LOG(debug, "hc grpc_status={} service_status={} health_flags={}", *client_,
388 8 : grpc_status_message, service_status, HostUtility::healthFlagsToString(*host_));
389 8 : }
390 :
391 : Http::CodecClientPtr
392 0 : ProdGrpcHealthCheckerImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) {
393 0 : return std::make_unique<Http::CodecClientProd>(
394 0 : Http::CodecType::HTTP2, std::move(data.connection_), data.host_description_, dispatcher_,
395 0 : random_generator_, transportSocketOptions());
396 0 : }
397 :
398 : } // namespace Upstream
399 : } // namespace Envoy
|