1
#include "source/extensions/health_checkers/http/health_checker_impl.h"
2

            
3
#include <cstdint>
4
#include <iterator>
5
#include <memory>
6

            
7
#include "envoy/config/core/v3/health_check.pb.h"
8
#include "envoy/data/core/v3/health_check_event.pb.h"
9
#include "envoy/server/health_checker_config.h"
10
#include "envoy/type/v3/http.pb.h"
11
#include "envoy/type/v3/range.pb.h"
12

            
13
#include "source/common/buffer/zero_copy_input_stream_impl.h"
14
#include "source/common/common/empty_string.h"
15
#include "source/common/common/enum_to_int.h"
16
#include "source/common/common/macros.h"
17
#include "source/common/config/utility.h"
18
#include "source/common/config/well_known_names.h"
19
#include "source/common/grpc/common.h"
20
#include "source/common/http/header_map_impl.h"
21
#include "source/common/http/header_utility.h"
22
#include "source/common/network/address_impl.h"
23
#include "source/common/network/socket_impl.h"
24
#include "source/common/network/utility.h"
25
#include "source/common/router/router.h"
26
#include "source/common/runtime/runtime_features.h"
27
#include "source/common/upstream/host_utility.h"
28

            
29
#include "absl/strings/match.h"
30
#include "absl/strings/str_cat.h"
31

            
32
namespace Envoy {
33
namespace Upstream {
34

            
35
namespace {
36

            
37
envoy::config::core::v3::RequestMethod
38
240
getMethod(const envoy::config::core::v3::RequestMethod config_method) {
39
240
  if (config_method == envoy::config::core::v3::METHOD_UNSPECIFIED) {
40
226
    return envoy::config::core::v3::GET;
41
226
  }
42

            
43
14
  return config_method;
44
240
}
45

            
46
} // namespace
47

            
48
Upstream::HealthCheckerSharedPtr HttpHealthCheckerFactory::createCustomHealthChecker(
49
    const envoy::config::core::v3::HealthCheck& config,
50
144
    Server::Configuration::HealthCheckerFactoryContext& context) {
51
144
  return std::make_shared<ProdHttpHealthCheckerImpl>(context.cluster(), config, context,
52
144
                                                     context.eventLogger());
53
144
}
54

            
55
REGISTER_FACTORY(HttpHealthCheckerFactory, Server::Configuration::CustomHealthCheckerFactory);
56

            
57
HttpHealthCheckerImpl::HttpHealthCheckerImpl(
58
    const Cluster& cluster, const envoy::config::core::v3::HealthCheck& config,
59
    Server::Configuration::HealthCheckerFactoryContext& context,
60
    HealthCheckEventLoggerPtr&& event_logger)
61
240
    : HealthCheckerImplBase(cluster, config, context.mainThreadDispatcher(), context.runtime(),
62
240
                            context.api().randomGenerator(), std::move(event_logger)),
63
240
      path_(config.http_health_check().path()), host_value_(config.http_health_check().host()),
64
240
      method_(getMethod(config.http_health_check().method())),
65
240
      response_buffer_size_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
66
          config.http_health_check(), response_buffer_size, kDefaultMaxBytesInBuffer)),
67
240
      request_headers_parser_(THROW_OR_RETURN_VALUE(
68
          Router::HeaderParser::configure(config.http_health_check().request_headers_to_add(),
69
                                          config.http_health_check().request_headers_to_remove()),
70
          Router::HeaderParserPtr)),
71
240
      http_status_checker_(config.http_health_check().expected_statuses(),
72
240
                           config.http_health_check().retriable_statuses(),
73
240
                           static_cast<uint64_t>(Http::Code::OK)),
74
240
      codec_client_type_(codecClientType(config.http_health_check().codec_client_type())),
75
240
      random_generator_(context.api().randomGenerator()) {
76
  // TODO(boteng): introduce additional validation for the authority and path headers
77
  // based on the default UHV when it is available.
78

            
79
  // Process send payload.
80
240
  if (config.http_health_check().has_send()) {
81
    // Validate that the method supports a request body when payload is specified.
82
    // Use the same logic as HeaderUtility::requestShouldHaveNoBody(), except CONNECT is already
83
    // disallowed by proto validation.
84
13
    if (method_ == envoy::config::core::v3::GET || method_ == envoy::config::core::v3::HEAD ||
85
13
        method_ == envoy::config::core::v3::DELETE || method_ == envoy::config::core::v3::TRACE) {
86
4
      throw EnvoyException(
87
4
          fmt::format("HTTP health check cannot specify a request payload with method '{}'. "
88
4
                      "Only methods that support a request body (POST, PUT, PATCH, OPTIONS) can be "
89
4
                      "used with payload.",
90
4
                      envoy::config::core::v3::RequestMethod_Name(method_)));
91
4
    }
92

            
93
    // Process the payload and store it in the buffer once during construction.
94
9
    auto send_bytes_or_error = PayloadMatcher::loadProtoBytes(config.http_health_check().send());
95
9
    THROW_IF_NOT_OK_REF(send_bytes_or_error.status());
96

            
97
    // Copy the processed payload into the buffer once.
98
9
    for (const auto& segment : send_bytes_or_error.value()) {
99
9
      request_payload_.add(segment.data(), segment.size());
100
9
    }
101
9
  }
102

            
103
236
  auto bytes_or_error = PayloadMatcher::loadProtoBytes(config.http_health_check().receive());
104
236
  THROW_IF_NOT_OK_REF(bytes_or_error.status());
105
236
  receive_bytes_ = bytes_or_error.value();
106
236
  if (config.http_health_check().has_service_name_matcher()) {
107
86
    service_name_matcher_.emplace(config.http_health_check().service_name_matcher(),
108
86
                                  context.serverFactoryContext());
109
86
  }
110

            
111
236
  if (response_buffer_size_ != 0 && !receive_bytes_.empty()) {
112
12
    uint64_t total = 0;
113
12
    for (auto const& bytes : receive_bytes_) {
114
12
      total += bytes.size();
115
12
    }
116
12
    if (total > response_buffer_size_) {
117
3
      throw EnvoyException(fmt::format(
118
3
          "The expected response length '{}' is over than http health response buffer size '{}'",
119
3
          total, response_buffer_size_));
120
3
    }
121
12
  }
122
236
}
123

            
124
HttpHealthCheckerImpl::HttpStatusChecker::HttpStatusChecker(
125
    const Protobuf::RepeatedPtrField<envoy::type::v3::Int64Range>& expected_statuses,
126
    const Protobuf::RepeatedPtrField<envoy::type::v3::Int64Range>& retriable_statuses,
127
253
    uint64_t default_expected_status) {
128
253
  for (const auto& status_range : expected_statuses) {
129
7
    const auto start = static_cast<uint64_t>(status_range.start());
130
7
    const auto end = static_cast<uint64_t>(status_range.end());
131

            
132
7
    validateRange(start, end, "expected");
133

            
134
7
    expected_ranges_.emplace_back(std::make_pair(start, end));
135
7
  }
136

            
137
253
  if (expected_ranges_.empty()) {
138
247
    expected_ranges_.emplace_back(
139
247
        std::make_pair(default_expected_status, default_expected_status + 1));
140
247
  }
141

            
142
253
  for (const auto& status_range : retriable_statuses) {
143
11
    const auto start = static_cast<uint64_t>(status_range.start());
144
11
    const auto end = static_cast<uint64_t>(status_range.end());
145

            
146
11
    validateRange(start, end, "retriable");
147

            
148
11
    retriable_ranges_.emplace_back(std::make_pair(start, end));
149
11
  }
150
253
}
151

            
152
void HttpHealthCheckerImpl::HttpStatusChecker::validateRange(uint64_t start, uint64_t end,
153
18
                                                             absl::string_view range_type) {
154
18
  if (start >= end) {
155
2
    throw EnvoyException(fmt::format("Invalid http {} status range: expecting start < "
156
2
                                     "end, but found start={} and end={}",
157
2
                                     range_type, start, end));
158
2
  }
159

            
160
16
  if (start < 100) {
161
2
    throw EnvoyException(
162
2
        fmt::format("Invalid http {} status range: expecting start >= 100, but found start={}",
163
2
                    range_type, start));
164
2
  }
165

            
166
14
  if (end > 600) {
167
2
    throw EnvoyException(fmt::format(
168
2
        "Invalid http {} status range: expecting end <= 600, but found end={}", range_type, end));
169
2
  }
170
14
}
171

            
172
57
bool HttpHealthCheckerImpl::HttpStatusChecker::inRetriableRanges(uint64_t http_status) const {
173
57
  return inRanges(http_status, retriable_ranges_);
174
57
}
175

            
176
858
bool HttpHealthCheckerImpl::HttpStatusChecker::inExpectedRanges(uint64_t http_status) const {
177
858
  return inRanges(http_status, expected_ranges_);
178
858
}
179

            
180
bool HttpHealthCheckerImpl::HttpStatusChecker::inRanges(
181
915
    uint64_t http_status, const std::vector<std::pair<uint64_t, uint64_t>>& ranges) {
182
915
  for (const auto& range : ranges) {
183
885
    if (http_status >= range.first && http_status < range.second) {
184
810
      return true;
185
810
    }
186
885
  }
187

            
188
105
  return false;
189
915
}
190

            
191
242
Http::Protocol codecClientTypeToProtocol(Http::CodecType codec_client_type) {
192
242
  switch (codec_client_type) {
193
203
  case Http::CodecType::HTTP1:
194
203
    return Http::Protocol::Http11;
195
39
  case Http::CodecType::HTTP2:
196
39
    return Http::Protocol::Http2;
197
  case Http::CodecType::HTTP3:
198
    return Http::Protocol::Http3;
199
242
  }
200
  PANIC_DUE_TO_CORRUPT_ENUM
201
}
202

            
203
Http::Protocol HttpHealthCheckerImpl::protocol() const {
204
  return codecClientTypeToProtocol(codec_client_type_);
205
}
206

            
207
HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HttpActiveHealthCheckSession(
208
    HttpHealthCheckerImpl& parent, const HostSharedPtr& host)
209
242
    : ActiveHealthCheckSession(parent, host), parent_(parent),
210
242
      response_body_(std::make_unique<Buffer::OwnedImpl>()),
211
      hostname_(
212
242
          HealthCheckerFactory::getHostname(host, parent_.host_value_, parent_.cluster_.info())),
213
242
      local_connection_info_provider_(std::make_shared<Network::ConnectionInfoSetterImpl>(
214
242
          Network::Utility::getCanonicalIpv4LoopbackAddress(),
215
242
          Network::Utility::getCanonicalIpv4LoopbackAddress())),
216
242
      protocol_(codecClientTypeToProtocol(parent_.codec_client_type_)), expect_reset_(false),
217
242
      reuse_connection_(false), request_in_flight_(false) {}
218

            
219
242
HttpHealthCheckerImpl::HttpActiveHealthCheckSession::~HttpActiveHealthCheckSession() {
220
242
  ASSERT(client_ == nullptr);
221
242
}
222

            
223
242
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onDeferredDelete() {
224
242
  if (client_) {
225
    // If there is an active request it will get reset, so make sure we ignore the reset.
226
136
    expect_reset_ = true;
227
136
    client_->close(Network::ConnectionCloseType::Abort);
228
136
  }
229
242
}
230

            
231
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::decodeHeaders(
232
844
    Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
233
844
  ASSERT(!response_headers_);
234
844
  response_headers_ = std::move(headers);
235
844
  if (end_stream) {
236
88
    onResponseComplete();
237
88
  }
238
844
}
239

            
240
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::decodeData(Buffer::Instance& data,
241
800
                                                                     bool end_stream) {
242
800
  if (parent_.response_buffer_size_ != 0) {
243
800
    if (!parent_.receive_bytes_.empty() &&
244
800
        response_body_->length() < parent_.response_buffer_size_) {
245
14
      response_body_->move(data, parent_.response_buffer_size_ - response_body_->length());
246
14
    }
247
800
  } else {
248
    if (!parent_.receive_bytes_.empty()) {
249
      response_body_->move(data, data.length());
250
    }
251
  }
252

            
253
800
  if (end_stream) {
254
109
    onResponseComplete();
255
109
  }
256
800
}
257

            
258
393
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onEvent(Network::ConnectionEvent event) {
259
393
  if (event == Network::ConnectionEvent::RemoteClose ||
260
393
      event == Network::ConnectionEvent::LocalClose) {
261
    // For the raw disconnect event, we are either between intervals in which case we already have
262
    // a timer setup, or we did the close or got a reset, in which case we already setup a new
263
    // timer. There is nothing to do here other than blow away the client.
264
277
    response_headers_.reset();
265
277
    response_body_->drain(response_body_->length());
266
277
    parent_.dispatcher_.deferredDelete(std::move(client_));
267
277
  }
268
393
}
269

            
270
// TODO(lilika) : Support connection pooling
271
953
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onInterval() {
272
953
  if (!client_) {
273
277
    Upstream::Host::CreateConnectionData conn =
274
277
        host_->createHealthCheckConnection(parent_.dispatcher_, parent_.transportSocketOptions(),
275
277
                                           parent_.transportSocketMatchMetadata().get());
276
277
    client_.reset(parent_.createCodecClient(conn));
277
277
    client_->addConnectionCallbacks(connection_callback_impl_);
278
277
    client_->setCodecConnectionCallbacks(http_connection_callback_impl_);
279
277
    expect_reset_ = false;
280
277
    reuse_connection_ = parent_.reuse_connection_;
281
277
  }
282

            
283
953
  Http::RequestEncoder* request_encoder = &client_->newStream(*this);
284
953
  request_encoder->getStream().addCallbacks(*this);
285
953
  request_in_flight_ = true;
286

            
287
953
  const auto request_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(
288
953
      {{Http::Headers::get().Method, envoy::config::core::v3::RequestMethod_Name(parent_.method_)},
289
953
       {Http::Headers::get().Host, hostname_},
290
953
       {Http::Headers::get().Path, parent_.path_},
291
953
       {Http::Headers::get().UserAgent, Http::Headers::get().UserAgentValues.EnvoyHealthChecker}});
292
953
  Router::FilterUtility::setUpstreamScheme(
293
953
      *request_headers,
294
      // Here there is no downstream connection so scheme will be based on
295
      // upstream crypto
296
953
      false, host_->transportSocketFactory().implementsSecureTransport(), true);
297
953
  StreamInfo::StreamInfoImpl stream_info(protocol_, parent_.dispatcher_.timeSource(),
298
953
                                         local_connection_info_provider_,
299
953
                                         StreamInfo::FilterState::LifeSpan::FilterChain);
300
953
  stream_info.setUpstreamInfo(std::make_shared<StreamInfo::UpstreamInfoImpl>());
301
953
  stream_info.upstreamInfo()->setUpstreamHost(host_);
302
953
  parent_.request_headers_parser_->evaluateHeaders(*request_headers, stream_info);
303

            
304
  // Check if we have a payload to send.
305
953
  const bool has_payload = parent_.request_payload_.length() > 0;
306
953
  if (has_payload) {
307
    // Set Content-Length header for the payload.
308
9
    request_headers->setContentLength(parent_.request_payload_.length());
309
9
  }
310

            
311
953
  auto status = request_encoder->encodeHeaders(*request_headers, !has_payload);
312
  // Encoding will only fail if required request headers are missing.
313
953
  ASSERT(status.ok());
314

            
315
  // Send the payload as request body if specified.
316
953
  if (has_payload) {
317
    // Copy the payload buffer to send (we need to preserve the original for reuse).
318
9
    Buffer::OwnedImpl payload_copy;
319
9
    payload_copy.add(parent_.request_payload_);
320
9
    request_encoder->encodeData(payload_copy, true);
321
9
  }
322
953
}
323

            
324
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onResetStream(Http::StreamResetReason,
325
110
                                                                        absl::string_view) {
326
110
  request_in_flight_ = false;
327
110
  ENVOY_CONN_LOG(debug, "connection/stream error health_flags={}", *client_,
328
110
                 HostUtility::healthFlagsToString(*host_));
329
110
  if (expect_reset_) {
330
73
    return;
331
73
  }
332

            
333
37
  if (client_ && !reuse_connection_) {
334
2
    client_->close(Network::ConnectionCloseType::Abort);
335
2
  }
336

            
337
37
  handleFailure(envoy::data::core::v3::NETWORK);
338
37
}
339

            
340
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onGoAway(
341
9
    Http::GoAwayErrorCode error_code) {
342
9
  ENVOY_CONN_LOG(debug, "connection going away goaway_code={}, health_flags={}", *client_,
343
9
                 static_cast<int>(error_code), HostUtility::healthFlagsToString(*host_));
344

            
345
9
  if (request_in_flight_ && error_code == Http::GoAwayErrorCode::NoError) {
346
    // The server is starting a graceful shutdown. Allow the in flight request
347
    // to finish without treating this as a health check error, and then
348
    // reconnect.
349
5
    reuse_connection_ = false;
350
5
    return;
351
5
  }
352

            
353
4
  if (request_in_flight_) {
354
    // Record this as a failed health check.
355
3
    handleFailure(envoy::data::core::v3::NETWORK);
356
3
  }
357

            
358
4
  if (client_) {
359
4
    expect_reset_ = true;
360
4
    client_->close(Network::ConnectionCloseType::Abort);
361
4
  }
362
4
}
363

            
364
HttpHealthCheckerImpl::HttpActiveHealthCheckSession::HealthCheckResult
365
843
HttpHealthCheckerImpl::HttpActiveHealthCheckSession::healthCheckResult() {
366
843
  const uint64_t response_code = Http::Utility::getResponseStatus(*response_headers_);
367
843
  ENVOY_CONN_LOG(debug, "hc response_code={} health_flags={}", *client_, response_code,
368
843
                 HostUtility::healthFlagsToString(*host_));
369

            
370
843
  if (!parent_.receive_bytes_.empty()) {
371
    // If the expected response is set, check the first 1024 bytes of actual response if contains
372
    // the expected response.
373
9
    if (!PayloadMatcher::match(parent_.receive_bytes_, *response_body_)) {
374
2
      if (response_headers_->EnvoyImmediateHealthCheckFail() != nullptr) {
375
1
        host_->healthFlagSet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL);
376
1
      }
377
2
      return HealthCheckResult::Failed;
378
2
    }
379
7
    ENVOY_CONN_LOG(debug, "hc http response body healthcheck passed", *client_);
380
7
  }
381

            
382
841
  if (!parent_.http_status_checker_.inExpectedRanges(response_code)) {
383
    // If the HTTP response code would indicate failure AND the immediate health check
384
    // failure header is set, exclude the host from LB.
385
    // TODO(mattklein123): We could consider doing this check for any HTTP response code, but this
386
    // seems like the least surprising behavior and we could consider relaxing this in the future.
387
    // TODO(mattklein123): This will not force a host set rebuild of the host was already failed.
388
    // This is something we could do in the future but seems unnecessary right now.
389
44
    if (response_headers_->EnvoyImmediateHealthCheckFail() != nullptr) {
390
3
      host_->healthFlagSet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL);
391
3
    }
392

            
393
44
    if (parent_.http_status_checker_.inRetriableRanges(response_code)) {
394
4
      return HealthCheckResult::Retriable;
395
40
    } else {
396
40
      return HealthCheckResult::Failed;
397
40
    }
398
44
  }
399

            
400
797
  const auto degraded = response_headers_->EnvoyDegraded() != nullptr;
401

            
402
797
  if (parent_.service_name_matcher_.has_value() &&
403
797
      parent_.runtime_.snapshot().featureEnabled("health_check.verify_cluster", 100UL)) {
404
21
    parent_.stats_.verify_cluster_.inc();
405
21
    std::string service_cluster_healthchecked =
406
21
        response_headers_->EnvoyUpstreamHealthCheckedCluster()
407
21
            ? std::string(response_headers_->getEnvoyUpstreamHealthCheckedClusterValue())
408
21
            : EMPTY_STRING;
409
21
    if (parent_.service_name_matcher_->match(service_cluster_healthchecked)) {
410
17
      return degraded ? HealthCheckResult::Degraded : HealthCheckResult::Succeeded;
411
17
    } else {
412
4
      return HealthCheckResult::Failed;
413
4
    }
414
21
  }
415

            
416
776
  return degraded ? HealthCheckResult::Degraded : HealthCheckResult::Succeeded;
417
797
}
418

            
419
843
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onResponseComplete() {
420
843
  request_in_flight_ = false;
421

            
422
843
  switch (healthCheckResult()) {
423
792
  case HealthCheckResult::Succeeded:
424
792
    handleSuccess(false);
425
792
    break;
426
1
  case HealthCheckResult::Degraded:
427
1
    handleSuccess(true);
428
1
    break;
429
46
  case HealthCheckResult::Failed:
430
46
    handleFailure(envoy::data::core::v3::ACTIVE, /*retriable=*/false);
431
46
    break;
432
4
  case HealthCheckResult::Retriable:
433
4
    handleFailure(envoy::data::core::v3::ACTIVE, /*retriable=*/true);
434
4
    break;
435
843
  }
436

            
437
843
  if (shouldClose()) {
438
8
    client_->close(Network::ConnectionCloseType::Abort);
439
8
  }
440

            
441
843
  response_headers_.reset();
442
843
  response_body_->drain(response_body_->length());
443
843
}
444

            
445
// It is possible for this session to have been deferred destroyed inline in handleFailure()
446
// above so make sure we still have a connection that we might need to close.
447
843
bool HttpHealthCheckerImpl::HttpActiveHealthCheckSession::shouldClose() const {
448
843
  if (client_ == nullptr) {
449
6
    return false;
450
6
  }
451

            
452
837
  if (!reuse_connection_) {
453
4
    return true;
454
4
  }
455

            
456
833
  return Http::HeaderUtility::shouldCloseConnection(client_->protocol(), *response_headers_);
457
837
}
458

            
459
13
void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onTimeout() {
460
13
  request_in_flight_ = false;
461
13
  if (client_) {
462
12
    ENVOY_CONN_LOG(debug, "connection/stream timeout health_flags={}", *client_,
463
12
                   HostUtility::healthFlagsToString(*host_));
464

            
465
    // If there is an active request it will get reset, so make sure we ignore the reset.
466
12
    expect_reset_ = true;
467

            
468
12
    client_->close(Network::ConnectionCloseType::Abort);
469
12
  }
470
13
}
471

            
472
Http::CodecType
473
240
HttpHealthCheckerImpl::codecClientType(const envoy::type::v3::CodecClientType& type) {
474
240
  switch (type) {
475
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
476
  case envoy::type::v3::HTTP3:
477
    return Http::CodecType::HTTP3;
478
36
  case envoy::type::v3::HTTP2:
479
36
    return Http::CodecType::HTTP2;
480
204
  case envoy::type::v3::HTTP1:
481
204
    return Http::CodecType::HTTP1;
482
240
  }
483
  PANIC_DUE_TO_CORRUPT_ENUM
484
}
485

            
486
Http::CodecClient*
487
175
ProdHttpHealthCheckerImpl::createCodecClient(Upstream::Host::CreateConnectionData& data) {
488
175
  return new Http::CodecClientProd(codec_client_type_, std::move(data.connection_),
489
175
                                   data.host_description_, dispatcher_, random_generator_,
490
175
                                   transportSocketOptions());
491
175
}
492

            
493
} // namespace Upstream
494
} // namespace Envoy