1
#include "source/extensions/health_checkers/tcp/health_checker_impl.h"
2

            
3
#include <cstdint>
4
#include <iterator>
5
#include <memory>
6

            
7
#include "envoy/config/core/v3/health_check.pb.h"
8
#include "envoy/data/core/v3/health_check_event.pb.h"
9
#include "envoy/server/health_checker_config.h"
10
#include "envoy/type/v3/http.pb.h"
11
#include "envoy/type/v3/range.pb.h"
12

            
13
#include "source/common/buffer/zero_copy_input_stream_impl.h"
14
#include "source/common/common/empty_string.h"
15
#include "source/common/common/enum_to_int.h"
16
#include "source/common/common/macros.h"
17
#include "source/common/config/utility.h"
18
#include "source/common/config/well_known_names.h"
19
#include "source/common/grpc/common.h"
20
#include "source/common/http/header_map_impl.h"
21
#include "source/common/http/header_utility.h"
22
#include "source/common/network/address_impl.h"
23
#include "source/common/network/socket_impl.h"
24
#include "source/common/network/utility.h"
25
#include "source/common/router/router.h"
26
#include "source/common/runtime/runtime_features.h"
27
#include "source/common/upstream/host_utility.h"
28
#include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
29

            
30
#include "absl/strings/match.h"
31
#include "absl/strings/str_cat.h"
32

            
33
namespace Envoy {
34
namespace Upstream {
35

            
36
Upstream::HealthCheckerSharedPtr TcpHealthCheckerFactory::createCustomHealthChecker(
37
    const envoy::config::core::v3::HealthCheck& config,
38
22
    Server::Configuration::HealthCheckerFactoryContext& context) {
39
22
  return std::make_shared<TcpHealthCheckerImpl>(
40
22
      context.cluster(), config, context.mainThreadDispatcher(), context.runtime(),
41
22
      context.api().randomGenerator(), context.eventLogger());
42
22
}
43

            
44
REGISTER_FACTORY(TcpHealthCheckerFactory, Server::Configuration::CustomHealthCheckerFactory);
45

            
46
TcpHealthCheckerImpl::TcpHealthCheckerImpl(const Cluster& cluster,
47
                                           const envoy::config::core::v3::HealthCheck& config,
48
                                           Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
49
                                           Random::RandomGenerator& random,
50
                                           HealthCheckEventLoggerPtr&& event_logger)
51
35
    : HealthCheckerImplBase(cluster, config, dispatcher, runtime, random, std::move(event_logger)),
52
35
      send_bytes_([&config] {
53
35
        if (!config.tcp_health_check().send().text().empty()) {
54
31
          auto bytes_or_error = PayloadMatcher::loadProtoBytes(config.tcp_health_check().send());
55
31
          THROW_IF_NOT_OK_REF(bytes_or_error.status());
56
31
          return bytes_or_error.value();
57
31
        }
58
4
        return PayloadMatcher::MatchSegments{};
59
35
      }()),
60
35
      proxy_protocol_config_(config.tcp_health_check().has_proxy_protocol_config()
61
35
                                 ? std::make_unique<envoy::config::core::v3::ProxyProtocolConfig>(
62
3
                                       config.tcp_health_check().proxy_protocol_config())
63
35
                                 : nullptr) {
64
35
  auto bytes_or_error = PayloadMatcher::loadProtoBytes(config.tcp_health_check().receive());
65
35
  THROW_IF_NOT_OK_REF(bytes_or_error.status());
66
35
  receive_bytes_ = bytes_or_error.value();
67
35
}
68

            
69
35
TcpHealthCheckerImpl::TcpActiveHealthCheckSession::~TcpActiveHealthCheckSession() {
70
35
  ASSERT(client_ == nullptr);
71
35
}
72

            
73
35
void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onDeferredDelete() {
74
35
  if (client_) {
75
13
    expect_close_ = true;
76
13
    client_->close(Network::ConnectionCloseType::Abort);
77
13
  }
78
35
}
79

            
80
21
void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onData(Buffer::Instance& data) {
81
21
  ENVOY_CONN_LOG(debug, "hc tcp total pending buffer={}", *client_, data.length());
82
  // TODO(lilika): The TCP health checker does generic pattern matching so we can't differentiate
83
  // between wrong data and not enough data. We could likely do better here and figure out cases in
84
  // which a match is not possible but that is not done now.
85
21
  if (PayloadMatcher::match(parent_.receive_bytes_, data)) {
86
13
    ENVOY_CONN_LOG(debug, "hc tcp healthcheck passed, health_check_address={}", *client_,
87
13
                   host_->healthCheckAddress()->asString());
88
13
    data.drain(data.length());
89
13
    handleSuccess(false);
90
13
    if (!parent_.reuse_connection_) {
91
3
      expect_close_ = true;
92
3
      client_->close(Network::ConnectionCloseType::Abort);
93
3
    }
94
13
  }
95
21
}
96

            
97
83
void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) {
98
83
  if (event == Network::ConnectionEvent::RemoteClose ||
99
83
      event == Network::ConnectionEvent::LocalClose) {
100
44
    if (!expect_close_) {
101
15
      ENVOY_CONN_LOG(debug, "hc tcp connection unexpected closed, health_check_address={}",
102
15
                     *client_, host_->healthCheckAddress()->asString());
103
15
      handleFailure(envoy::data::core::v3::NETWORK);
104
15
    }
105
44
    parent_.dispatcher_.deferredDelete(std::move(client_));
106
44
  }
107

            
108
83
  if (event == Network::ConnectionEvent::Connected && parent_.receive_bytes_.empty()) {
109
    // In this case we are just testing that we can connect, so immediately succeed. Also, since
110
    // we are just doing a connection test, close the connection.
111
    // NOTE(mattklein123): I've seen cases where the kernel will report a successful connection, and
112
    // then proceed to fail subsequent calls (so the connection did not actually succeed). I'm not
113
    // sure what situations cause this. If this turns into a problem, we may need to introduce a
114
    // timer and see if the connection stays alive for some period of time while waiting to read.
115
    // (Though we may never get a FIN and won't know until if/when we try to write). In short, this
116
    // may need to get more complicated but we can start here.
117
    // TODO(mattklein123): If we had a way on the connection interface to do an immediate read (vs.
118
    // evented), that would be a good check to run here to make sure it returns the equivalent of
119
    // EAGAIN. Need to think through how that would look from an interface perspective.
120
    // TODO(mattklein123): In the case that a user configured bytes to write, they will not be
121
    // be written, since we currently have no way to know if the bytes actually get written via
122
    // the connection interface. We might want to figure out how to handle this better later.
123
3
    ENVOY_CONN_LOG(debug, "hc tcp healthcheck passed, health_check_address={}", *client_,
124
3
                   host_->healthCheckAddress()->asString());
125
3
    expect_close_ = true;
126
3
    client_->close(Network::ConnectionCloseType::Abort);
127
3
    handleSuccess(false);
128
3
  }
129
83
}
130

            
131
// TODO(lilika) : Support connection pooling
132
46
void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onInterval() {
133
46
  if (!client_) {
134
44
    client_ =
135
44
        host_
136
44
            ->createHealthCheckConnection(parent_.dispatcher_, parent_.transportSocketOptions(),
137
44
                                          parent_.transportSocketMatchMetadata().get())
138
44
            .connection_;
139
44
    session_callbacks_ = std::make_shared<TcpSessionCallbacks>(*this);
140
44
    client_->addConnectionCallbacks(*session_callbacks_);
141
44
    client_->addReadFilter(session_callbacks_);
142

            
143
44
    expect_close_ = false;
144
44
    client_->connect();
145
44
    client_->noDelay(true);
146
44
  }
147

            
148
46
  Buffer::OwnedImpl data;
149
46
  bool should_write_data = false;
150

            
151
46
  if (parent_.proxy_protocol_config_ != nullptr) {
152
3
    if (parent_.proxy_protocol_config_->version() ==
153
3
        envoy::config::core::v3::ProxyProtocolConfig::V1) {
154
1
      auto src_addr = client_->connectionInfoProvider().localAddress()->ip();
155
1
      auto dst_addr = client_->connectionInfoProvider().remoteAddress()->ip();
156
1
      Extensions::Common::ProxyProtocol::generateV1Header(*src_addr, *dst_addr, data);
157
2
    } else if (parent_.proxy_protocol_config_->version() ==
158
2
               envoy::config::core::v3::ProxyProtocolConfig::V2) {
159
2
      Extensions::Common::ProxyProtocol::generateV2LocalHeader(data);
160
2
    }
161
3
    should_write_data = true;
162
3
  }
163
46
  if (!parent_.send_bytes_.empty()) {
164
40
    for (const std::vector<uint8_t>& segment : parent_.send_bytes_) {
165
40
      data.add(segment.data(), segment.size());
166
40
    }
167
40
    should_write_data = true;
168
40
  }
169
46
  if (should_write_data) {
170
40
    client_->write(data, false);
171
40
  }
172
46
}
173

            
174
10
void TcpHealthCheckerImpl::TcpActiveHealthCheckSession::onTimeout() {
175
10
  ENVOY_CONN_LOG(debug, "hc tcp connection timeout, health_flags={}, health_check_address={}",
176
10
                 *client_, HostUtility::healthFlagsToString(*host_),
177
10
                 host_->healthCheckAddress()->asString());
178
10
  expect_close_ = true;
179
10
  client_->close(Network::ConnectionCloseType::Abort);
180
10
}
181

            
182
} // namespace Upstream
183
} // namespace Envoy