1
#include "source/extensions/health_checkers/grpc/health_checker_impl.h"
2

            
3
#include <cstdint>
4
#include <iterator>
5
#include <memory>
6

            
7
#include "envoy/config/core/v3/health_check.pb.h"
8
#include "envoy/data/core/v3/health_check_event.pb.h"
9
#include "envoy/server/health_checker_config.h"
10
#include "envoy/type/v3/http.pb.h"
11
#include "envoy/type/v3/range.pb.h"
12

            
13
#include "source/common/buffer/zero_copy_input_stream_impl.h"
14
#include "source/common/common/empty_string.h"
15
#include "source/common/common/enum_to_int.h"
16
#include "source/common/common/macros.h"
17
#include "source/common/config/utility.h"
18
#include "source/common/config/well_known_names.h"
19
#include "source/common/grpc/common.h"
20
#include "source/common/http/header_map_impl.h"
21
#include "source/common/http/header_utility.h"
22
#include "source/common/network/address_impl.h"
23
#include "source/common/network/socket_impl.h"
24
#include "source/common/network/utility.h"
25
#include "source/common/router/router.h"
26
#include "source/common/runtime/runtime_features.h"
27
#include "source/common/upstream/host_utility.h"
28

            
29
#include "absl/strings/match.h"
30
#include "absl/strings/str_cat.h"
31

            
32
namespace Envoy {
33
namespace Upstream {
34
namespace {
35
const std::string& getHostname(const HostSharedPtr& host,
36
                               const absl::optional<std::string>& config_hostname,
37
92
                               const ClusterInfoConstSharedPtr& cluster) {
38
92
  if (config_hostname.has_value()) {
39
2
    return HealthCheckerFactory::getHostname(host, config_hostname.value(), cluster);
40
2
  }
41
90
  return HealthCheckerFactory::getHostname(host, EMPTY_STRING, cluster);
42
92
}
43
} // namespace
44

            
45
Upstream::HealthCheckerSharedPtr GrpcHealthCheckerFactory::createCustomHealthChecker(
46
    const envoy::config::core::v3::HealthCheck& config,
47
6
    Server::Configuration::HealthCheckerFactoryContext& context) {
48
6
  return std::make_shared<ProdGrpcHealthCheckerImpl>(
49
6
      context.cluster(), config, context.mainThreadDispatcher(), context.runtime(),
50
6
      context.api().randomGenerator(), context.eventLogger());
51
6
}
52

            
53
REGISTER_FACTORY(GrpcHealthCheckerFactory, Server::Configuration::CustomHealthCheckerFactory);
54

            
55
GrpcHealthCheckerImpl::GrpcHealthCheckerImpl(const Cluster& cluster,
56
                                             const envoy::config::core::v3::HealthCheck& config,
57
                                             Event::Dispatcher& dispatcher,
58
                                             Runtime::Loader& runtime,
59
                                             Random::RandomGenerator& random,
60
                                             HealthCheckEventLoggerPtr&& event_logger)
61
58
    : HealthCheckerImplBase(cluster, config, dispatcher, runtime, random, std::move(event_logger)),
62
58
      random_generator_(random),
63
58
      service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
64
58
          "grpc.health.v1.Health.Check")),
65
58
      request_headers_parser_(THROW_OR_RETURN_VALUE(
66
          Router::HeaderParser::configure(config.grpc_health_check().initial_metadata()),
67
58
          Router::HeaderParserPtr)) {
68
58
  if (!config.grpc_health_check().service_name().empty()) {
69
6
    service_name_ = config.grpc_health_check().service_name();
70
6
  }
71

            
72
58
  if (!config.grpc_health_check().authority().empty()) {
73
2
    authority_value_ = config.grpc_health_check().authority();
74
2
  }
75
58
}
76

            
77
GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSession(
78
    GrpcHealthCheckerImpl& parent, const HostSharedPtr& host)
79
59
    : ActiveHealthCheckSession(parent, host), parent_(parent),
80
59
      local_connection_info_provider_(std::make_shared<Network::ConnectionInfoSetterImpl>(
81
59
          Network::Utility::getCanonicalIpv4LoopbackAddress(),
82
59
          Network::Utility::getCanonicalIpv4LoopbackAddress())) {}
83

            
84
59
GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() {
85
59
  ASSERT(client_ == nullptr);
86
59
}
87

            
88
59
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onDeferredDelete() {
89
59
  if (client_) {
90
    // If there is an active request it will get reset, so make sure we ignore the reset.
91
48
    expect_reset_ = true;
92
48
    client_->close(Network::ConnectionCloseType::Abort);
93
48
  }
94
59
}
95

            
96
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeHeaders(
97
74
    Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
98
74
  const auto http_response_status = Http::Utility::getResponseStatus(*headers);
99
74
  if (http_response_status != enumToInt(Http::Code::OK)) {
100
    // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
101
    // grpc-status be used if available.
102
3
    if (end_stream) {
103
2
      const auto grpc_status = Grpc::Common::getGrpcStatus(*headers);
104
2
      if (grpc_status) {
105
1
        onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true);
106
1
        return;
107
1
      }
108
2
    }
109
2
    onRpcComplete(Grpc::Utility::httpToGrpcStatus(http_response_status), "non-200 HTTP response",
110
2
                  end_stream);
111
2
    return;
112
3
  }
113
71
  if (!Grpc::Common::isGrpcResponseHeaders(*headers, end_stream)) {
114
2
    onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "not a gRPC request", false);
115
2
    return;
116
2
  }
117
69
  if (end_stream) {
118
    // This is how, for instance, grpc-go signals about missing service - HTTP/2 200 OK with
119
    // 'unimplemented' gRPC status.
120
2
    const auto grpc_status = Grpc::Common::getGrpcStatus(*headers);
121
2
    if (grpc_status) {
122
2
      onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true);
123
2
      return;
124
2
    }
125
    onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
126
                  "gRPC protocol violation: unexpected stream end", true);
127
  }
128
69
}
129

            
130
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeData(Buffer::Instance& data,
131
74
                                                                     bool end_stream) {
132
74
  if (end_stream) {
133
8
    onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
134
8
                  "gRPC protocol violation: unexpected stream end", true);
135
8
    return;
136
8
  }
137
  // We should end up with only one frame here.
138
66
  std::vector<Grpc::Frame> decoded_frames;
139
66
  if (!decoder_.decode(data, decoded_frames).ok()) {
140
    onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "gRPC wire protocol decode error",
141
                  false);
142
    return;
143
  }
144
66
  for (auto& frame : decoded_frames) {
145
60
    if (frame.length_ > 0) {
146
59
      if (health_check_response_) {
147
        // grpc.health.v1.Health.Check is unary RPC, so only one message is allowed.
148
        onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "unexpected streaming", false);
149
        return;
150
      }
151
59
      health_check_response_ = std::make_unique<grpc::health::v1::HealthCheckResponse>();
152
59
      Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));
153

            
154
59
      if (frame.flags_ != Grpc::GRPC_FH_DEFAULT ||
155
59
          !health_check_response_->ParseFromZeroCopyStream(&stream)) {
156
        onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
157
                      "invalid grpc.health.v1 RPC payload", false);
158
        return;
159
      }
160
59
    }
161
60
  }
162
66
}
163

            
164
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeTrailers(
165
59
    Http::ResponseTrailerMapPtr&& trailers) {
166
59
  auto maybe_grpc_status = Grpc::Common::getGrpcStatus(*trailers);
167
59
  auto grpc_status =
168
59
      maybe_grpc_status
169
59
          ? maybe_grpc_status.value()
170
59
          : static_cast<Grpc::Status::GrpcStatus>(Grpc::Status::WellKnownGrpcStatus::Internal);
171
59
  const std::string grpc_message =
172
59
      maybe_grpc_status ? Grpc::Common::getGrpcMessage(*trailers) : "invalid gRPC status";
173
59
  onRpcComplete(grpc_status, grpc_message, true);
174
59
}
175

            
176
75
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) {
177
75
  if (event == Network::ConnectionEvent::RemoteClose ||
178
75
      event == Network::ConnectionEvent::LocalClose) {
179
    // For the raw disconnect event, we are either between intervals in which case we already have
180
    // a timer setup, or we did the close or got a reset, in which case we already setup a new
181
    // timer. There is nothing to do here other than blow away the client.
182
70
    parent_.dispatcher_.deferredDelete(std::move(client_));
183
70
  }
184
75
}
185

            
186
92
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onInterval() {
187
92
  if (!client_) {
188
70
    Upstream::Host::CreateConnectionData conn =
189
70
        host_->createHealthCheckConnection(parent_.dispatcher_, parent_.transportSocketOptions(),
190
70
                                           parent_.transportSocketMatchMetadata().get());
191
70
    client_ = parent_.createCodecClient(conn);
192
70
    client_->addConnectionCallbacks(connection_callback_impl_);
193
70
    client_->setCodecConnectionCallbacks(http_connection_callback_impl_);
194
70
  }
195

            
196
92
  request_encoder_ = &client_->newStream(*this);
197
92
  request_encoder_->getStream().addCallbacks(*this);
198

            
199
92
  absl::string_view authority =
200
92
      getHostname(host_, parent_.authority_value_, parent_.cluster_.info());
201
92
  auto headers_message =
202
92
      Grpc::Common::prepareHeaders(authority, parent_.service_method_.service()->full_name(),
203
92
                                   parent_.service_method_.name(), absl::nullopt);
204
92
  headers_message->headers().setReferenceUserAgent(
205
92
      Http::Headers::get().UserAgentValues.EnvoyHealthChecker);
206

            
207
92
  StreamInfo::StreamInfoImpl stream_info(Http::Protocol::Http2, parent_.dispatcher_.timeSource(),
208
92
                                         local_connection_info_provider_,
209
92
                                         StreamInfo::FilterState::LifeSpan::FilterChain);
210
92
  stream_info.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
211
92
  stream_info.upstreamInfo()->setUpstreamHost(host_);
212
92
  parent_.request_headers_parser_->evaluateHeaders(headers_message->headers(), stream_info);
213

            
214
92
  Grpc::Common::toGrpcTimeout(parent_.timeout_, headers_message->headers());
215

            
216
92
  Router::FilterUtility::setUpstreamScheme(
217
92
      headers_message->headers(),
218
      // Here there is no downstream connection so scheme will be based on
219
      // upstream crypto
220
92
      false, host_->transportSocketFactory().implementsSecureTransport(), true);
221

            
222
92
  auto status = request_encoder_->encodeHeaders(headers_message->headers(), false);
223
  // Encoding will only fail if required headers are missing.
224
92
  ASSERT(status.ok());
225

            
226
92
  grpc::health::v1::HealthCheckRequest request;
227
92
  if (parent_.service_name_.has_value()) {
228
6
    request.set_service(parent_.service_name_.value());
229
6
  }
230

            
231
92
  request_encoder_->encodeData(*Grpc::Common::serializeToGrpcFrame(request), true);
232
92
}
233

            
234
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onResetStream(Http::StreamResetReason,
235
21
                                                                        absl::string_view) {
236
21
  const bool expected_reset = expect_reset_;
237
21
  const bool goaway = received_no_error_goaway_;
238
21
  resetState();
239

            
240
21
  if (expected_reset) {
241
    // Stream reset was initiated by us (bogus gRPC response, timeout or cluster host is going
242
    // away). In these cases health check failure has already been reported and a GOAWAY (if any)
243
    // has already been handled, so just return.
244
15
    return;
245
15
  }
246

            
247
6
  ENVOY_CONN_LOG(debug, "connection/stream error health_flags={}", *client_,
248
6
                 HostUtility::healthFlagsToString(*host_));
249

            
250
6
  if (goaway || !parent_.reuse_connection_) {
251
    // Stream reset was unexpected, so we haven't closed the connection
252
    // yet in response to a GOAWAY or due to disabled connection reuse.
253
3
    client_->close(Network::ConnectionCloseType::Abort);
254
3
  }
255

            
256
  // TODO(baranov1ch): according to all HTTP standards, we should check if reason is one of
257
  // Http::StreamResetReason::RemoteRefusedStreamReset (which may mean GOAWAY),
258
  // Http::StreamResetReason::RemoteReset or Http::StreamResetReason::ConnectionTermination (both
259
  // mean connection close), check if connection is not fresh (was used for at least 1 request)
260
  // and silently retry request on the fresh connection. This is also true for HTTP/1.1 healthcheck.
261
6
  handleFailure(envoy::data::core::v3::NETWORK);
262
6
}
263

            
264
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onGoAway(
265
9
    Http::GoAwayErrorCode error_code) {
266
9
  ENVOY_CONN_LOG(debug, "connection going away health_flags={}", *client_,
267
9
                 HostUtility::healthFlagsToString(*host_));
268
  // If we have an active health check probe and receive a GOAWAY indicating
269
  // graceful shutdown, allow the probe to complete before closing the connection.
270
  // The connection will be closed when the active check completes or another
271
  // terminal condition occurs, such as a timeout or stream reset.
272
9
  if (request_encoder_ && error_code == Http::GoAwayErrorCode::NoError) {
273
6
    received_no_error_goaway_ = true;
274
6
    return;
275
6
  }
276

            
277
  // Even if we have active health check probe, fail it on GOAWAY and schedule new one.
278
3
  if (request_encoder_) {
279
2
    handleFailure(envoy::data::core::v3::NETWORK);
280
    // request_encoder_ can already be destroyed if the host was removed during the failure callback
281
    // above.
282
2
    if (request_encoder_ != nullptr) {
283
1
      expect_reset_ = true;
284
1
      request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
285
1
    }
286
2
  }
287
  // client_ can already be destroyed if the host was removed during the failure callback above.
288
3
  if (client_ != nullptr) {
289
2
    client_->close(Network::ConnectionCloseType::Abort);
290
2
  }
291
3
}
292

            
293
bool GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::isHealthCheckSucceeded(
294
74
    Grpc::Status::GrpcStatus grpc_status) const {
295
74
  if (grpc_status != Grpc::Status::WellKnownGrpcStatus::Ok) {
296
17
    return false;
297
17
  }
298

            
299
57
  if (!health_check_response_ ||
300
57
      health_check_response_->status() != grpc::health::v1::HealthCheckResponse::SERVING) {
301
13
    return false;
302
13
  }
303

            
304
44
  return true;
305
57
}
306

            
307
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onRpcComplete(
308
74
    Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message, bool end_stream) {
309
74
  logHealthCheckStatus(grpc_status, grpc_message);
310
74
  if (isHealthCheckSucceeded(grpc_status)) {
311
44
    handleSuccess(false);
312
46
  } else {
313
30
    handleFailure(envoy::data::core::v3::ACTIVE);
314
30
  }
315

            
316
  // Read the value as we may call resetState() and clear it.
317
74
  const bool goaway = received_no_error_goaway_;
318

            
319
  // |end_stream| will be false if we decided to stop healthcheck before HTTP stream has ended -
320
  // invalid gRPC payload, unexpected message stream or wrong content-type.
321
74
  if (end_stream) {
322
71
    resetState();
323
71
  } else {
324
    // request_encoder_ can already be destroyed if the host was removed during the failure callback
325
    // above.
326
3
    if (request_encoder_ != nullptr) {
327
      // resetState() will be called by onResetStream().
328
2
      expect_reset_ = true;
329
2
      request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
330
2
    }
331
3
  }
332

            
333
  // client_ can already be destroyed if the host was removed during the failure callback above.
334
74
  if (client_ != nullptr && (!parent_.reuse_connection_ || goaway)) {
335
6
    client_->close(Network::ConnectionCloseType::Abort);
336
6
  }
337
74
}
338

            
339
92
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::resetState() {
340
92
  expect_reset_ = false;
341
92
  request_encoder_ = nullptr;
342
92
  decoder_ = Grpc::Decoder();
343
92
  health_check_response_.reset();
344
92
  received_no_error_goaway_ = false;
345
92
}
346

            
347
9
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onTimeout() {
348
9
  ENVOY_CONN_LOG(debug, "connection/stream timeout health_flags={}", *client_,
349
9
                 HostUtility::healthFlagsToString(*host_));
350
9
  expect_reset_ = true;
351
9
  if (received_no_error_goaway_ || !parent_.reuse_connection_) {
352
2
    client_->close(Network::ConnectionCloseType::Abort);
353
7
  } else {
354
7
    request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
355
7
  }
356
9
}
357

            
358
void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::logHealthCheckStatus(
359
74
    Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message) {
360
74
  const char* service_status;
361
74
  if (!health_check_response_) {
362
15
    service_status = "rpc_error";
363
59
  } else {
364
59
    switch (health_check_response_->status()) {
365
48
    case grpc::health::v1::HealthCheckResponse::SERVING:
366
48
      service_status = "serving";
367
48
      break;
368
7
    case grpc::health::v1::HealthCheckResponse::NOT_SERVING:
369
7
      service_status = "not_serving";
370
7
      break;
371
    case grpc::health::v1::HealthCheckResponse::UNKNOWN:
372
      service_status = "unknown";
373
      break;
374
2
    case grpc::health::v1::HealthCheckResponse::SERVICE_UNKNOWN:
375
2
      service_status = "service_unknown";
376
2
      break;
377
2
    default:
378
2
      service_status = "unknown_healthcheck_response";
379
2
      break;
380
59
    }
381
59
  }
382
74
  std::string grpc_status_message;
383
74
  if (grpc_status != Grpc::Status::WellKnownGrpcStatus::Ok && !grpc_message.empty()) {
384
13
    grpc_status_message = fmt::format("{} ({})", grpc_status, grpc_message);
385
61
  } else {
386
61
    grpc_status_message = absl::StrCat("", grpc_status);
387
61
  }
388

            
389
74
  ENVOY_CONN_LOG(debug, "hc grpc_status={} service_status={} health_flags={}", *client_,
390
74
                 grpc_status_message, service_status, HostUtility::healthFlagsToString(*host_));
391
74
}
392

            
393
Http::CodecClientPtr
394
5
ProdGrpcHealthCheckerImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) {
395
5
  return std::make_unique<Http::CodecClientProd>(
396
5
      Http::CodecType::HTTP2, std::move(data.connection_), data.host_description_, dispatcher_,
397
5
      random_generator_, transportSocketOptions());
398
5
}
399

            
400
} // namespace Upstream
401
} // namespace Envoy