LCOV - code coverage report
Current view: top level - source/extensions/health_checkers/grpc - health_checker_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 200 253 79.1 %
Date: 2024-01-05 06:35:25 Functions: 17 20 85.0 %

          Line data    Source code
       1             : #include "source/extensions/health_checkers/grpc/health_checker_impl.h"
       2             : 
       3             : #include <cstdint>
       4             : #include <iterator>
       5             : #include <memory>
       6             : 
       7             : #include "envoy/config/core/v3/health_check.pb.h"
       8             : #include "envoy/data/core/v3/health_check_event.pb.h"
       9             : #include "envoy/server/health_checker_config.h"
      10             : #include "envoy/type/v3/http.pb.h"
      11             : #include "envoy/type/v3/range.pb.h"
      12             : 
      13             : #include "source/common/buffer/zero_copy_input_stream_impl.h"
      14             : #include "source/common/common/empty_string.h"
      15             : #include "source/common/common/enum_to_int.h"
      16             : #include "source/common/common/macros.h"
      17             : #include "source/common/config/utility.h"
      18             : #include "source/common/config/well_known_names.h"
      19             : #include "source/common/grpc/common.h"
      20             : #include "source/common/http/header_map_impl.h"
      21             : #include "source/common/http/header_utility.h"
      22             : #include "source/common/network/address_impl.h"
      23             : #include "source/common/network/socket_impl.h"
      24             : #include "source/common/network/utility.h"
      25             : #include "source/common/router/router.h"
      26             : #include "source/common/runtime/runtime_features.h"
      27             : #include "source/common/upstream/host_utility.h"
      28             : 
      29             : #include "absl/strings/match.h"
      30             : #include "absl/strings/str_cat.h"
      31             : 
      32             : namespace Envoy {
      33             : namespace Upstream {
      34             : namespace {
      35             : const std::string& getHostname(const HostSharedPtr& host,
      36             :                                const absl::optional<std::string>& config_hostname,
      37          26 :                                const ClusterInfoConstSharedPtr& cluster) {
      38          26 :   if (config_hostname.has_value()) {
      39           2 :     return HealthCheckerFactory::getHostname(host, config_hostname.value(), cluster);
      40           2 :   }
      41          24 :   return HealthCheckerFactory::getHostname(host, EMPTY_STRING, cluster);
      42          26 : }
      43             : } // namespace
      44             : 
      45             : Upstream::HealthCheckerSharedPtr GrpcHealthCheckerFactory::createCustomHealthChecker(
      46             :     const envoy::config::core::v3::HealthCheck& config,
      47           0 :     Server::Configuration::HealthCheckerFactoryContext& context) {
      48           0 :   return std::make_shared<ProdGrpcHealthCheckerImpl>(
      49           0 :       context.cluster(), config, context.mainThreadDispatcher(), context.runtime(),
      50           0 :       context.api().randomGenerator(), context.eventLogger());
      51           0 : }
      52             : 
      53             : REGISTER_FACTORY(GrpcHealthCheckerFactory, Server::Configuration::CustomHealthCheckerFactory);
      54             : 
      55             : GrpcHealthCheckerImpl::GrpcHealthCheckerImpl(const Cluster& cluster,
      56             :                                              const envoy::config::core::v3::HealthCheck& config,
      57             :                                              Event::Dispatcher& dispatcher,
      58             :                                              Runtime::Loader& runtime,
      59             :                                              Random::RandomGenerator& random,
      60             :                                              HealthCheckEventLoggerPtr&& event_logger)
      61             :     : HealthCheckerImplBase(cluster, config, dispatcher, runtime, random, std::move(event_logger)),
      62             :       random_generator_(random),
      63             :       service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
      64             :           "grpc.health.v1.Health.Check")),
      65             :       request_headers_parser_(
      66          13 :           Router::HeaderParser::configure(config.grpc_health_check().initial_metadata())) {
      67          13 :   if (!config.grpc_health_check().service_name().empty()) {
      68          10 :     service_name_ = config.grpc_health_check().service_name();
      69          10 :   }
      70             : 
      71          13 :   if (!config.grpc_health_check().authority().empty()) {
      72           2 :     authority_value_ = config.grpc_health_check().authority();
      73           2 :   }
      74          13 : }
      75             : 
      76             : GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::GrpcActiveHealthCheckSession(
      77             :     GrpcHealthCheckerImpl& parent, const HostSharedPtr& host)
      78             :     : ActiveHealthCheckSession(parent, host), parent_(parent),
      79             :       local_connection_info_provider_(std::make_shared<Network::ConnectionInfoSetterImpl>(
      80             :           Network::Utility::getCanonicalIpv4LoopbackAddress(),
      81          13 :           Network::Utility::getCanonicalIpv4LoopbackAddress())) {}
      82             : 
      83          13 : GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::~GrpcActiveHealthCheckSession() {
      84          13 :   ASSERT(client_ == nullptr);
      85          13 : }
      86             : 
      87          13 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onDeferredDelete() {
      88          13 :   if (client_) {
      89             :     // If there is an active request it will get reset, so make sure we ignore the reset.
      90          10 :     expect_reset_ = true;
      91          10 :     client_->close(Network::ConnectionCloseType::Abort);
      92          10 :   }
      93          13 : }
      94             : 
      95             : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeHeaders(
      96           8 :     Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
      97           8 :   const auto http_response_status = Http::Utility::getResponseStatus(*headers);
      98           8 :   if (http_response_status != enumToInt(Http::Code::OK)) {
      99             :     // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
     100             :     // grpc-status be used if available.
     101           0 :     if (end_stream) {
     102           0 :       const auto grpc_status = Grpc::Common::getGrpcStatus(*headers);
     103           0 :       if (grpc_status) {
     104           0 :         onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true);
     105           0 :         return;
     106           0 :       }
     107           0 :     }
     108           0 :     onRpcComplete(Grpc::Utility::httpToGrpcStatus(http_response_status), "non-200 HTTP response",
     109           0 :                   end_stream);
     110           0 :     return;
     111           0 :   }
     112           8 :   if (!Grpc::Common::isGrpcResponseHeaders(*headers, end_stream)) {
     113           1 :     onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "not a gRPC request", false);
     114           1 :     return;
     115           1 :   }
     116           7 :   if (end_stream) {
     117             :     // This is how, for instance, grpc-go signals about missing service - HTTP/2 200 OK with
     118             :     // 'unimplemented' gRPC status.
     119           0 :     const auto grpc_status = Grpc::Common::getGrpcStatus(*headers);
     120           0 :     if (grpc_status) {
     121           0 :       onRpcComplete(grpc_status.value(), Grpc::Common::getGrpcMessage(*headers), true);
     122           0 :       return;
     123           0 :     }
     124           0 :     onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
     125           0 :                   "gRPC protocol violation: unexpected stream end", true);
     126           0 :   }
     127           7 : }
     128             : 
     129             : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeData(Buffer::Instance& data,
     130          21 :                                                                      bool end_stream) {
     131          21 :   if (end_stream) {
     132           3 :     onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
     133           3 :                   "gRPC protocol violation: unexpected stream end", true);
     134           3 :     return;
     135           3 :   }
     136             :   // We should end up with only one frame here.
     137          18 :   std::vector<Grpc::Frame> decoded_frames;
     138          18 :   if (!decoder_.decode(data, decoded_frames)) {
     139           1 :     onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "gRPC wire protocol decode error",
     140           1 :                   false);
     141           1 :     return;
     142           1 :   }
     143          17 :   for (auto& frame : decoded_frames) {
     144           3 :     if (frame.length_ > 0) {
     145           3 :       if (health_check_response_) {
     146             :         // grpc.health.v1.Health.Check is unary RPC, so only one message is allowed.
     147           0 :         onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "unexpected streaming", false);
     148           0 :         return;
     149           0 :       }
     150           3 :       health_check_response_ = std::make_unique<grpc::health::v1::HealthCheckResponse>();
     151           3 :       Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));
     152             : 
     153           3 :       if (frame.flags_ != Grpc::GRPC_FH_DEFAULT ||
     154           3 :           !health_check_response_->ParseFromZeroCopyStream(&stream)) {
     155           0 :         onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal,
     156           0 :                       "invalid grpc.health.v1 RPC payload", false);
     157           0 :         return;
     158           0 :       }
     159           3 :     }
     160           3 :   }
     161          17 : }
     162             : 
     163             : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeTrailers(
     164           3 :     Http::ResponseTrailerMapPtr&& trailers) {
     165           3 :   auto maybe_grpc_status = Grpc::Common::getGrpcStatus(*trailers);
     166           3 :   auto grpc_status =
     167           3 :       maybe_grpc_status
     168           3 :           ? maybe_grpc_status.value()
     169           3 :           : static_cast<Grpc::Status::GrpcStatus>(Grpc::Status::WellKnownGrpcStatus::Internal);
     170           3 :   const std::string grpc_message =
     171           3 :       maybe_grpc_status ? Grpc::Common::getGrpcMessage(*trailers) : "invalid gRPC status";
     172           3 :   onRpcComplete(grpc_status, grpc_message, true);
     173           3 : }
     174             : 
     175          24 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) {
     176          24 :   if (event == Network::ConnectionEvent::RemoteClose ||
     177          24 :       event == Network::ConnectionEvent::LocalClose) {
     178             :     // For the raw disconnect event, we are either between intervals in which case we already have
     179             :     // a timer setup, or we did the close or got a reset, in which case we already setup a new
     180             :     // timer. There is nothing to do here other than blow away the client.
     181          23 :     parent_.dispatcher_.deferredDelete(std::move(client_));
     182          23 :   }
     183          24 : }
     184             : 
     185          26 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onInterval() {
     186          26 :   if (!client_) {
     187          23 :     Upstream::Host::CreateConnectionData conn =
     188          23 :         host_->createHealthCheckConnection(parent_.dispatcher_, parent_.transportSocketOptions(),
     189          23 :                                            parent_.transportSocketMatchMetadata().get());
     190          23 :     client_ = parent_.createCodecClient(conn);
     191          23 :     client_->addConnectionCallbacks(connection_callback_impl_);
     192          23 :     client_->setCodecConnectionCallbacks(http_connection_callback_impl_);
     193          23 :   }
     194             : 
     195          26 :   request_encoder_ = &client_->newStream(*this);
     196          26 :   request_encoder_->getStream().addCallbacks(*this);
     197             : 
     198          26 :   const std::string& authority =
     199          26 :       getHostname(host_, parent_.authority_value_, parent_.cluster_.info());
     200          26 :   auto headers_message =
     201          26 :       Grpc::Common::prepareHeaders(authority, parent_.service_method_.service()->full_name(),
     202          26 :                                    parent_.service_method_.name(), absl::nullopt);
     203          26 :   headers_message->headers().setReferenceUserAgent(
     204          26 :       Http::Headers::get().UserAgentValues.EnvoyHealthChecker);
     205             : 
     206          26 :   StreamInfo::StreamInfoImpl stream_info(Http::Protocol::Http2, parent_.dispatcher_.timeSource(),
     207          26 :                                          local_connection_info_provider_);
     208          26 :   stream_info.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
     209          26 :   stream_info.upstreamInfo()->setUpstreamHost(host_);
     210          26 :   parent_.request_headers_parser_->evaluateHeaders(headers_message->headers(), stream_info);
     211             : 
     212          26 :   Grpc::Common::toGrpcTimeout(parent_.timeout_, headers_message->headers());
     213             : 
     214          26 :   Router::FilterUtility::setUpstreamScheme(
     215          26 :       headers_message->headers(),
     216             :       // Here there is no downstream connection so scheme will be based on
     217             :       // upstream crypto
     218          26 :       host_->transportSocketFactory().implementsSecureTransport());
     219             : 
     220          26 :   auto status = request_encoder_->encodeHeaders(headers_message->headers(), false);
     221             :   // Encoding will only fail if required headers are missing.
     222          26 :   ASSERT(status.ok());
     223             : 
     224          26 :   grpc::health::v1::HealthCheckRequest request;
     225          26 :   if (parent_.service_name_.has_value()) {
     226          21 :     request.set_service(parent_.service_name_.value());
     227          21 :   }
     228             : 
     229          26 :   request_encoder_->encodeData(*Grpc::Common::serializeToGrpcFrame(request), true);
     230          26 : }
     231             : 
     232             : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onResetStream(Http::StreamResetReason,
     233          20 :                                                                         absl::string_view) {
     234          20 :   const bool expected_reset = expect_reset_;
     235          20 :   const bool goaway = received_no_error_goaway_;
     236          20 :   resetState();
     237             : 
     238          20 :   if (expected_reset) {
     239             :     // Stream reset was initiated by us (bogus gRPC response, timeout or cluster host is going
     240             :     // away). In these cases health check failure has already been reported and a GOAWAY (if any)
     241             :     // has already been handled, so just return.
     242          13 :     return;
     243          13 :   }
     244             : 
     245           7 :   ENVOY_CONN_LOG(debug, "connection/stream error health_flags={}", *client_,
     246           7 :                  HostUtility::healthFlagsToString(*host_));
     247             : 
     248           7 :   if (goaway || !parent_.reuse_connection_) {
     249             :     // Stream reset was unexpected, so we haven't closed the connection
     250             :     // yet in response to a GOAWAY or due to disabled connection reuse.
     251           3 :     client_->close(Network::ConnectionCloseType::Abort);
     252           3 :   }
     253             : 
     254             :   // TODO(baranov1ch): according to all HTTP standards, we should check if reason is one of
     255             :   // Http::StreamResetReason::RemoteRefusedStreamReset (which may mean GOAWAY),
     256             :   // Http::StreamResetReason::RemoteReset or Http::StreamResetReason::ConnectionTermination (both
     257             :   // mean connection close), check if connection is not fresh (was used for at least 1 request)
     258             :   // and silently retry request on the fresh connection. This is also true for HTTP/1.1 healthcheck.
     259           7 :   handleFailure(envoy::data::core::v3::NETWORK);
     260           7 : }
     261             : 
     262             : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onGoAway(
     263           6 :     Http::GoAwayErrorCode error_code) {
     264           6 :   ENVOY_CONN_LOG(debug, "connection going away health_flags={}", *client_,
     265           6 :                  HostUtility::healthFlagsToString(*host_));
     266             :   // If we have an active health check probe and receive a GOAWAY indicating
     267             :   // graceful shutdown, allow the probe to complete before closing the connection.
     268             :   // The connection will be closed when the active check completes or another
     269             :   // terminal condition occurs, such as a timeout or stream reset.
     270           6 :   if (request_encoder_ && error_code == Http::GoAwayErrorCode::NoError) {
     271           4 :     received_no_error_goaway_ = true;
     272           4 :     return;
     273           4 :   }
     274             : 
     275             :   // Even if we have active health check probe, fail it on GOAWAY and schedule new one.
     276           2 :   if (request_encoder_) {
     277           1 :     handleFailure(envoy::data::core::v3::NETWORK);
     278             :     // request_encoder_ can already be destroyed if the host was removed during the failure callback
     279             :     // above.
     280           1 :     if (request_encoder_ != nullptr) {
     281           1 :       expect_reset_ = true;
     282           1 :       request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
     283           1 :     }
     284           1 :   }
     285             :   // client_ can already be destroyed if the host was removed during the failure callback above.
     286           2 :   if (client_ != nullptr) {
     287           2 :     client_->close(Network::ConnectionCloseType::Abort);
     288           2 :   }
     289           2 : }
     290             : 
     291             : bool GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::isHealthCheckSucceeded(
     292           8 :     Grpc::Status::GrpcStatus grpc_status) const {
     293           8 :   if (grpc_status != Grpc::Status::WellKnownGrpcStatus::Ok) {
     294           5 :     return false;
     295           5 :   }
     296             : 
     297           3 :   if (!health_check_response_ ||
     298           3 :       health_check_response_->status() != grpc::health::v1::HealthCheckResponse::SERVING) {
     299           0 :     return false;
     300           0 :   }
     301             : 
     302           3 :   return true;
     303           3 : }
     304             : 
     305             : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onRpcComplete(
     306           8 :     Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message, bool end_stream) {
     307           8 :   logHealthCheckStatus(grpc_status, grpc_message);
     308           8 :   if (isHealthCheckSucceeded(grpc_status)) {
     309           3 :     handleSuccess(false);
     310           5 :   } else {
     311           5 :     handleFailure(envoy::data::core::v3::ACTIVE);
     312           5 :   }
     313             : 
     314             :   // Read the value as we may call resetState() and clear it.
     315           8 :   const bool goaway = received_no_error_goaway_;
     316             : 
     317             :   // |end_stream| will be false if we decided to stop healthcheck before HTTP stream has ended -
     318             :   // invalid gRPC payload, unexpected message stream or wrong content-type.
     319           8 :   if (end_stream) {
     320           6 :     resetState();
     321           6 :   } else {
     322             :     // request_encoder_ can already be destroyed if the host was removed during the failure callback
     323             :     // above.
     324           2 :     if (request_encoder_ != nullptr) {
     325             :       // resetState() will be called by onResetStream().
     326           2 :       expect_reset_ = true;
     327           2 :       request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
     328           2 :     }
     329           2 :   }
     330             : 
     331             :   // client_ can already be destroyed if the host was removed during the failure callback above.
     332           8 :   if (client_ != nullptr && (!parent_.reuse_connection_ || goaway)) {
     333           0 :     client_->close(Network::ConnectionCloseType::Abort);
     334           0 :   }
     335           8 : }
     336             : 
     337          26 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::resetState() {
     338          26 :   expect_reset_ = false;
     339          26 :   request_encoder_ = nullptr;
     340          26 :   decoder_ = Grpc::Decoder();
     341          26 :   health_check_response_.reset();
     342          26 :   received_no_error_goaway_ = false;
     343          26 : }
     344             : 
     345           8 : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::onTimeout() {
     346           8 :   ENVOY_CONN_LOG(debug, "connection/stream timeout health_flags={}", *client_,
     347           8 :                  HostUtility::healthFlagsToString(*host_));
     348           8 :   expect_reset_ = true;
     349           8 :   if (received_no_error_goaway_ || !parent_.reuse_connection_) {
     350           0 :     client_->close(Network::ConnectionCloseType::Abort);
     351           8 :   } else {
     352           8 :     request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
     353           8 :   }
     354           8 : }
     355             : 
     356             : void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::logHealthCheckStatus(
     357           8 :     Grpc::Status::GrpcStatus grpc_status, const std::string& grpc_message) {
     358           8 :   const char* service_status;
     359           8 :   if (!health_check_response_) {
     360           5 :     service_status = "rpc_error";
     361           5 :   } else {
     362           3 :     switch (health_check_response_->status()) {
     363           3 :     case grpc::health::v1::HealthCheckResponse::SERVING:
     364           3 :       service_status = "serving";
     365           3 :       break;
     366           0 :     case grpc::health::v1::HealthCheckResponse::NOT_SERVING:
     367           0 :       service_status = "not_serving";
     368           0 :       break;
     369           0 :     case grpc::health::v1::HealthCheckResponse::UNKNOWN:
     370           0 :       service_status = "unknown";
     371           0 :       break;
     372           0 :     case grpc::health::v1::HealthCheckResponse::SERVICE_UNKNOWN:
     373           0 :       service_status = "service_unknown";
     374           0 :       break;
     375           0 :     default:
     376           0 :       service_status = "unknown_healthcheck_response";
     377           0 :       break;
     378           3 :     }
     379           3 :   }
     380           8 :   std::string grpc_status_message;
     381           8 :   if (grpc_status != Grpc::Status::WellKnownGrpcStatus::Ok && !grpc_message.empty()) {
     382           5 :     grpc_status_message = fmt::format("{} ({})", grpc_status, grpc_message);
     383           5 :   } else {
     384           3 :     grpc_status_message = absl::StrCat("", grpc_status);
     385           3 :   }
     386             : 
     387           8 :   ENVOY_CONN_LOG(debug, "hc grpc_status={} service_status={} health_flags={}", *client_,
     388           8 :                  grpc_status_message, service_status, HostUtility::healthFlagsToString(*host_));
     389           8 : }
     390             : 
     391             : Http::CodecClientPtr
     392           0 : ProdGrpcHealthCheckerImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) {
     393           0 :   return std::make_unique<Http::CodecClientProd>(
     394           0 :       Http::CodecType::HTTP2, std::move(data.connection_), data.host_description_, dispatcher_,
     395           0 :       random_generator_, transportSocketOptions());
     396           0 : }
     397             : 
     398             : } // namespace Upstream
     399             : } // namespace Envoy

Generated by: LCOV version 1.15