Line data Source code
1 : #include "source/extensions/health_checkers/tcp/health_checker_impl.h" 2 : 3 : #include <cstdint> 4 : #include <iterator> 5 : #include <memory> 6 : 7 : #include "envoy/config/core/v3/health_check.pb.h" 8 : #include "envoy/data/core/v3/health_check_event.pb.h" 9 : #include "envoy/server/health_checker_config.h" 10 : #include "envoy/type/v3/http.pb.h" 11 : #include "envoy/type/v3/range.pb.h" 12 : 13 : #include "source/common/buffer/zero_copy_input_stream_impl.h" 14 : #include "source/common/common/empty_string.h" 15 : #include "source/common/common/enum_to_int.h" 16 : #include "source/common/common/macros.h" 17 : #include "source/common/config/utility.h" 18 : #include "source/common/config/well_known_names.h" 19 : #include "source/common/grpc/common.h" 20 : #include "source/common/http/header_map_impl.h" 21 : #include "source/common/http/header_utility.h" 22 : #include "source/common/network/address_impl.h" 23 : #include "source/common/network/socket_impl.h" 24 : #include "source/common/network/utility.h" 25 : #include "source/common/router/router.h" 26 : #include "source/common/runtime/runtime_features.h" 27 : #include "source/common/upstream/host_utility.h" 28 : 29 : #include "absl/strings/match.h" 30 : #include "absl/strings/str_cat.h" 31 : 32 : namespace Envoy { 33 : namespace Upstream { 34 : 35 : Upstream::HealthCheckerSharedPtr TcpHealthCheckerFactory::createCustomHealthChecker( 36 : const envoy::config::core::v3::HealthCheck& config, 37 0 : Server::Configuration::HealthCheckerFactoryContext& context) { 38 0 : return std::make_shared<TcpHealthCheckerImpl>( 39 0 : context.cluster(), config, context.mainThreadDispatcher(), context.runtime(), 40 0 : context.api().randomGenerator(), context.eventLogger()); 41 0 : } 42 : 43 : REGISTER_FACTORY(TcpHealthCheckerFactory, Server::Configuration::CustomHealthCheckerFactory); 44 : 45 : TcpHealthCheckerImpl::TcpHealthCheckerImpl(const Cluster& cluster, 46 : const envoy::config::core::v3::HealthCheck& config, 47 : Event::Dispatcher& dispatcher, Runtime::Loader& runtime, 48 : Random::RandomGenerator& random, 49 : HealthCheckEventLoggerPtr&& event_logger) 50 : : HealthCheckerImplBase(cluster, config, dispatcher, runtime, random, std::move(event_logger)), 51 11 : send_bytes_([&config] { 52 11 : Protobuf::RepeatedPtrField<envoy::config::core::v3::HealthCheck::Payload> send_repeated; 53 11 : if (!config.tcp_health_check().send().text().empty()) { 54 9 : send_repeated.Add()->CopyFrom(config.tcp_health_check().send()); 55 9 : } 56 11 : auto bytes_or_error = PayloadMatcher::loadProtoBytes(send_repeated); 57 11 : THROW_IF_STATUS_NOT_OK(bytes_or_error, throw); 58 11 : return bytes_or_error.value(); 59 12 : }()) { 60 12 : auto bytes_or_error = PayloadMatcher::loadProtoBytes(config.tcp_health_check().receive()); 61 12 : THROW_IF_STATUS_NOT_OK(bytes_or_error, throw); 62 12 : receive_bytes_ = bytes_or_error.value(); 63 12 : } 64 : 65 11 : TcpHealthCheckerImpl::TcpActiveHealthCheckSession::~TcpActiveHealthCheckSession() { 66 11 : ASSERT(client_ == nullptr); 67 11 : } 68 : 69 11 : void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onDeferredDelete() { 70 11 : if (client_) { 71 9 : expect_close_ = true; 72 9 : client_->close(Network::ConnectionCloseType::Abort); 73 9 : } 74 11 : } 75 : 76 10 : void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onData(Buffer::Instance& data) { 77 10 : ENVOY_CONN_LOG(debug, "hc tcp total pending buffer={}", *client_, data.length()); 78 : // TODO(lilika): The TCP health checker does generic pattern matching so we can't differentiate 79 : // between wrong data and not enough data. We could likely do better here and figure out cases in 80 : // which a match is not possible but that is not done now. 81 10 : if (PayloadMatcher::match(parent_.receive_bytes_, data)) { 82 5 : ENVOY_CONN_LOG(debug, "hc tcp healthcheck passed, health_check_address={}", *client_, 83 5 : host_->healthCheckAddress()->asString()); 84 5 : data.drain(data.length()); 85 5 : handleSuccess(false); 86 5 : if (!parent_.reuse_connection_) { 87 2 : expect_close_ = true; 88 2 : client_->close(Network::ConnectionCloseType::Abort); 89 2 : } 90 5 : } 91 10 : } 92 : 93 22 : void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) { 94 22 : if (event == Network::ConnectionEvent::RemoteClose || 95 22 : event == Network::ConnectionEvent::LocalClose) { 96 15 : if (!expect_close_) { 97 2 : ENVOY_CONN_LOG(debug, "hc tcp connection unexpected closed, health_check_address={}", 98 2 : *client_, host_->healthCheckAddress()->asString()); 99 2 : handleFailure(envoy::data::core::v3::NETWORK); 100 2 : } 101 15 : parent_.dispatcher_.deferredDelete(std::move(client_)); 102 15 : } 103 : 104 22 : if (event == Network::ConnectionEvent::Connected && parent_.receive_bytes_.empty()) { 105 : // In this case we are just testing that we can connect, so immediately succeed. Also, since 106 : // we are just doing a connection test, close the connection. 107 : // NOTE(mattklein123): I've seen cases where the kernel will report a successful connection, and 108 : // then proceed to fail subsequent calls (so the connection did not actually succeed). I'm not 109 : // sure what situations cause this. If this turns into a problem, we may need to introduce a 110 : // timer and see if the connection stays alive for some period of time while waiting to read. 111 : // (Though we may never get a FIN and won't know until if/when we try to write). In short, this 112 : // may need to get more complicated but we can start here. 113 : // TODO(mattklein123): If we had a way on the connection interface to do an immediate read (vs. 114 : // evented), that would be a good check to run here to make sure it returns the equivalent of 115 : // EAGAIN. Need to think through how that would look from an interface perspective. 116 : // TODO(mattklein123): In the case that a user configured bytes to write, they will not be 117 : // be written, since we currently have no way to know if the bytes actually get written via 118 : // the connection interface. We might want to figure out how to handle this better later. 119 1 : ENVOY_CONN_LOG(debug, "hc tcp healthcheck passed, health_check_address={}", *client_, 120 1 : host_->healthCheckAddress()->asString()); 121 1 : expect_close_ = true; 122 1 : client_->close(Network::ConnectionCloseType::Abort); 123 1 : handleSuccess(false); 124 1 : } 125 22 : } 126 : 127 : // TODO(lilika) : Support connection pooling 128 15 : void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onInterval() { 129 15 : if (!client_) { 130 15 : client_ = 131 15 : host_ 132 15 : ->createHealthCheckConnection(parent_.dispatcher_, parent_.transportSocketOptions(), 133 15 : parent_.transportSocketMatchMetadata().get()) 134 15 : .connection_; 135 15 : session_callbacks_ = std::make_shared<TcpSessionCallbacks>(*this); 136 15 : client_->addConnectionCallbacks(*session_callbacks_); 137 15 : client_->addReadFilter(session_callbacks_); 138 : 139 15 : expect_close_ = false; 140 15 : client_->connect(); 141 15 : client_->noDelay(true); 142 15 : } 143 : 144 15 : if (!parent_.send_bytes_.empty()) { 145 13 : Buffer::OwnedImpl data; 146 13 : for (const std::vector<uint8_t>& segment : parent_.send_bytes_) { 147 13 : data.add(segment.data(), segment.size()); 148 13 : } 149 : 150 13 : client_->write(data, false); 151 13 : } 152 15 : } 153 : 154 1 : void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onTimeout() { 155 1 : ENVOY_CONN_LOG(debug, "hc tcp connection timeout, health_flags={}, health_check_address={}", 156 1 : *client_, HostUtility::healthFlagsToString(*host_), 157 1 : host_->healthCheckAddress()->asString()); 158 1 : expect_close_ = true; 159 1 : client_->close(Network::ConnectionCloseType::Abort); 160 1 : } 161 : 162 : } // namespace Upstream 163 : } // namespace Envoy