1
#include "source/extensions/health_checkers/common/health_checker_base_impl.h"
2

            
3
#include "envoy/config/core/v3/address.pb.h"
4
#include "envoy/config/core/v3/health_check.pb.h"
5
#include "envoy/data/core/v3/health_check_event.pb.h"
6
#include "envoy/stats/scope.h"
7

            
8
#include "source/common/network/utility.h"
9
#include "source/common/router/router.h"
10

            
11
namespace Envoy {
12
namespace Upstream {
13

            
14
HealthCheckerImplBase::HealthCheckerImplBase(const Cluster& cluster,
15
                                             const envoy::config::core::v3::HealthCheck& config,
16
                                             Event::Dispatcher& dispatcher,
17
                                             Runtime::Loader& runtime,
18
                                             Random::RandomGenerator& random,
19
                                             HealthCheckEventLoggerPtr&& event_logger)
20
359
    : always_log_health_check_failures_(config.always_log_health_check_failures()),
21
359
      always_log_health_check_success_(config.always_log_health_check_success()), cluster_(cluster),
22
359
      dispatcher_(dispatcher), timeout_(PROTOBUF_GET_MS_REQUIRED(config, timeout)),
23
359
      unhealthy_threshold_(PROTOBUF_GET_WRAPPED_REQUIRED(config, unhealthy_threshold)),
24
359
      healthy_threshold_(PROTOBUF_GET_WRAPPED_REQUIRED(config, healthy_threshold)),
25
359
      stats_(generateStats(cluster.info()->statsScope())), runtime_(runtime), random_(random),
26
359
      reuse_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, reuse_connection, true)),
27
359
      event_logger_(std::move(event_logger)), interval_(PROTOBUF_GET_MS_REQUIRED(config, interval)),
28
359
      no_traffic_interval_(PROTOBUF_GET_MS_OR_DEFAULT(config, no_traffic_interval, 60000)),
29
359
      no_traffic_healthy_interval_(PROTOBUF_GET_MS_OR_DEFAULT(config, no_traffic_healthy_interval,
30
359
                                                              no_traffic_interval_.count())),
31
359
      initial_jitter_(PROTOBUF_GET_MS_OR_DEFAULT(config, initial_jitter, 0)),
32
359
      interval_jitter_(PROTOBUF_GET_MS_OR_DEFAULT(config, interval_jitter, 0)),
33
359
      interval_jitter_percent_(config.interval_jitter_percent()),
34
359
      unhealthy_interval_(
35
359
          PROTOBUF_GET_MS_OR_DEFAULT(config, unhealthy_interval, interval_.count())),
36
359
      unhealthy_edge_interval_(
37
359
          PROTOBUF_GET_MS_OR_DEFAULT(config, unhealthy_edge_interval, unhealthy_interval_.count())),
38
359
      healthy_edge_interval_(
39
359
          PROTOBUF_GET_MS_OR_DEFAULT(config, healthy_edge_interval, interval_.count())),
40
359
      transport_socket_options_(initTransportSocketOptions(config)),
41
359
      transport_socket_match_metadata_(initTransportSocketMatchMetadata(config)),
42
359
      member_update_cb_{cluster_.prioritySet().addMemberUpdateCb(
43
507
          [this](const HostVector& hosts_added, const HostVector& hosts_removed) {
44
253
            onClusterMemberUpdate(hosts_added, hosts_removed);
45
507
          })} {}
46

            
47
std::shared_ptr<const Network::TransportSocketOptionsImpl>
48
HealthCheckerImplBase::initTransportSocketOptions(
49
359
    const envoy::config::core::v3::HealthCheck& config) {
50
359
  if (config.has_tls_options()) {
51
1
    std::vector<std::string> protocols{config.tls_options().alpn_protocols().begin(),
52
1
                                       config.tls_options().alpn_protocols().end()};
53
1
    return std::make_shared<const Network::TransportSocketOptionsImpl>(
54
1
        "", std::vector<std::string>{}, std::move(protocols));
55
1
  }
56

            
57
358
  return std::make_shared<const Network::TransportSocketOptionsImpl>();
58
359
}
59

            
60
MetadataConstSharedPtr HealthCheckerImplBase::initTransportSocketMatchMetadata(
61
359
    const envoy::config::core::v3::HealthCheck& config) {
62
359
  if (config.has_transport_socket_match_criteria()) {
63
8
    std::shared_ptr<envoy::config::core::v3::Metadata> metadata =
64
8
        std::make_shared<envoy::config::core::v3::Metadata>();
65
8
    (*metadata->mutable_filter_metadata())[Envoy::Config::MetadataFilters::get()
66
8
                                               .ENVOY_TRANSPORT_SOCKET_MATCH] =
67
8
        config.transport_socket_match_criteria();
68
8
    return metadata;
69
8
  }
70

            
71
351
  return nullptr;
72
359
}
73

            
74
359
HealthCheckerImplBase::~HealthCheckerImplBase() {
75
  // First clear callbacks that otherwise will be run from
76
  // ActiveHealthCheckSession::onDeferredDeleteBase(). This prevents invoking a callback on a
77
  // deleted parent object (e.g. Cluster).
78
359
  callbacks_.clear();
79
  // ASSERTs inside the session destructor check to make sure we have been previously deferred
80
  // deleted. Unify that logic here before actual destruction happens.
81
372
  for (auto& session : active_sessions_) {
82
324
    session.second->onDeferredDeleteBase();
83
324
  }
84
359
}
85

            
86
265
void HealthCheckerImplBase::decHealthy() { stats_.healthy_.sub(1); }
87

            
88
void HealthCheckerImplBase::decDegraded() { stats_.degraded_.sub(1); }
89

            
90
359
HealthCheckerStats HealthCheckerImplBase::generateStats(Stats::Scope& scope) {
91
359
  std::string prefix("health_check.");
92
359
  return {ALL_HEALTH_CHECKER_STATS(POOL_COUNTER_PREFIX(scope, prefix),
93
359
                                   POOL_GAUGE_PREFIX(scope, prefix))};
94
359
}
95

            
96
265
void HealthCheckerImplBase::incHealthy() { stats_.healthy_.add(1); }
97

            
98
1
void HealthCheckerImplBase::incDegraded() { stats_.degraded_.add(1); }
99

            
100
std::chrono::milliseconds HealthCheckerImplBase::interval(HealthState state,
101
1062
                                                          HealthTransition changed_state) const {
102
  // See if the cluster has ever made a connection. If not, we use a much slower interval to keep
103
  // the host info relatively up to date in case we suddenly start sending traffic to this cluster.
104
  // In general host updates are rare and this should greatly smooth out needless health checking.
105
  // If a connection has been established, we choose an interval based on the host's health. Please
106
  // refer to the HealthCheck API documentation for more details.
107
1062
  uint64_t base_time_ms;
108
1062
  if (cluster_.info()->trafficStats()->upstream_cx_total_.used()) {
109
    // When healthy/unhealthy threshold is configured the health transition of a host will be
110
    // delayed. In this situation Envoy should use the edge interval settings between health checks.
111
    //
112
    // Example scenario for an unhealthy host with healthy_threshold set to 3:
113
    // - check fails, host is still unhealthy and next check happens after unhealthy_interval;
114
    // - check succeeds, host is still unhealthy and next check happens after healthy_edge_interval;
115
    // - check succeeds, host is still unhealthy and next check happens after healthy_edge_interval;
116
    // - check succeeds, host is now healthy and next check happens after interval;
117
    // - check succeeds, host is still healthy and next check happens after interval.
118
338
    switch (state) {
119
46
    case HealthState::Unhealthy:
120
46
      base_time_ms = changed_state == HealthTransition::ChangePending
121
46
                         ? unhealthy_edge_interval_.count()
122
46
                         : unhealthy_interval_.count();
123
46
      break;
124
292
    default:
125
292
      base_time_ms = changed_state == HealthTransition::ChangePending
126
292
                         ? healthy_edge_interval_.count()
127
292
                         : interval_.count();
128
292
      break;
129
338
    }
130
724
  } else {
131
724
    base_time_ms =
132
724
        (state == HealthState::Healthy && changed_state != HealthTransition::ChangePending)
133
724
            ? no_traffic_healthy_interval_.count()
134
724
            : no_traffic_interval_.count();
135
724
  }
136
1062
  return intervalWithJitter(base_time_ms, interval_jitter_);
137
1062
}
138

            
139
std::chrono::milliseconds
140
HealthCheckerImplBase::intervalWithJitter(uint64_t base_time_ms,
141
1063
                                          std::chrono::milliseconds interval_jitter) const {
142
1063
  const uint64_t jitter_percent_mod = interval_jitter_percent_ * base_time_ms / 100;
143
1063
  if (jitter_percent_mod > 0) {
144
426
    base_time_ms += random_.random() % jitter_percent_mod;
145
426
  }
146

            
147
1063
  if (interval_jitter.count() > 0) {
148
352
    base_time_ms += (random_.random() % interval_jitter.count());
149
352
  }
150

            
151
1063
  const uint64_t min_interval = runtime_.snapshot().getInteger("health_check.min_interval", 0);
152
1063
  const uint64_t max_interval = runtime_.snapshot().getInteger(
153
1063
      "health_check.max_interval", std::numeric_limits<uint64_t>::max());
154

            
155
1063
  uint64_t final_ms = std::min(base_time_ms, max_interval);
156
  // We force a non-zero final MS, to prevent live lock.
157
1063
  final_ms = std::max(uint64_t(1), std::max(final_ms, min_interval));
158
1063
  return std::chrono::milliseconds(final_ms);
159
1063
}
160

            
161
591
void HealthCheckerImplBase::addHosts(const HostVector& hosts) {
162
597
  for (const HostSharedPtr& host : hosts) {
163
356
    if (host->disableActiveHealthCheck()) {
164
4
      continue;
165
4
    }
166
352
    active_sessions_[host] = makeSession(host);
167
352
    host->setHealthChecker(
168
352
        HealthCheckHostMonitorPtr{new HealthCheckHostMonitorImpl(shared_from_this(), host)});
169
352
    active_sessions_[host]->start();
170
352
  }
171
591
}
172

            
173
void HealthCheckerImplBase::onClusterMemberUpdate(const HostVector& hosts_added,
174
253
                                                  const HostVector& hosts_removed) {
175
253
  addHosts(hosts_added);
176
253
  for (const HostSharedPtr& host : hosts_removed) {
177
30
    if (host->disableActiveHealthCheck()) {
178
2
      continue;
179
2
    }
180
28
    auto session_iter = active_sessions_.find(host);
181
28
    ASSERT(active_sessions_.end() != session_iter);
182
    // This deletion can happen inline in response to a host failure, so we deferred delete.
183
28
    session_iter->second->onDeferredDeleteBase();
184
28
    dispatcher_.deferredDelete(std::move(session_iter->second));
185
28
    active_sessions_.erase(session_iter);
186
28
  }
187
253
}
188

            
189
void HealthCheckerImplBase::runCallbacks(HostSharedPtr host, HealthTransition changed_state,
190
1138
                                         HealthState current_check_result) {
191
1376
  for (const HostStatusCb& cb : callbacks_) {
192
1206
    cb(host, changed_state, current_check_result);
193
1206
  }
194
1138
}
195

            
196
4
void HealthCheckerImplBase::HealthCheckHostMonitorImpl::setUnhealthy(UnhealthyType type) {
197
  // This is called cross thread. The cluster/health checker might already be gone.
198
4
  std::shared_ptr<HealthCheckerImplBase> health_checker = health_checker_.lock();
199
4
  if (health_checker) {
200
4
    health_checker->setUnhealthyCrossThread(host_.lock(), type);
201
4
  }
202
4
}
203

            
204
void HealthCheckerImplBase::setUnhealthyCrossThread(const HostSharedPtr& host,
205
4
                                                    HealthCheckHostMonitor::UnhealthyType type) {
206
4
  if (type == HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail) {
207
4
    host->healthFlagSet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL);
208
4
  }
209

            
210
  // The threading here is complex. The cluster owns the only strong reference to the health
211
  // checker. It might go away when we post to the main thread from a worker thread. To deal with
212
  // this we use the following sequence of events:
213
  // 1) We capture a weak reference to the health checker and post it from worker thread to main
214
  //    thread.
215
  // 2) On the main thread, we make sure it is still valid (as the cluster may have been destroyed).
216
  // 3) Additionally, the host/session may also be gone by then so we check that also.
217
4
  std::weak_ptr<HealthCheckerImplBase> weak_this = shared_from_this();
218
4
  dispatcher_.post([weak_this, host]() -> void {
219
4
    std::shared_ptr<HealthCheckerImplBase> shared_this = weak_this.lock();
220
4
    if (shared_this == nullptr) {
221
1
      return;
222
1
    }
223

            
224
3
    const auto session = shared_this->active_sessions_.find(host);
225
3
    if (session == shared_this->active_sessions_.end()) {
226
1
      return;
227
1
    }
228

            
229
2
    session->second->setUnhealthy(envoy::data::core::v3::PASSIVE, /*retriable=*/false);
230
2
  });
231
4
}
232

            
233
336
void HealthCheckerImplBase::start() {
234
338
  for (auto& host_set : cluster_.prioritySet().hostSetsPerPriority()) {
235
338
    addHosts(host_set->hosts());
236
338
  }
237
336
}
238

            
239
HealthCheckerImplBase::ActiveHealthCheckSession::ActiveHealthCheckSession(
240
    HealthCheckerImplBase& parent, HostSharedPtr host)
241
353
    : host_(host), parent_(parent),
242
943
      interval_timer_(parent.dispatcher_.createTimer([this]() -> void { onIntervalBase(); })),
243
353
      timeout_timer_(parent.dispatcher_.createTimer([this]() -> void { onTimeoutBase(); })),
244
353
      time_source_(parent.dispatcher_.timeSource()) {
245

            
246
353
  if (!host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
247
165
    parent.incHealthy();
248
165
  }
249

            
250
353
  if (host->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) {
251
    parent.incDegraded();
252
  }
253
353
}
254

            
255
353
HealthCheckerImplBase::ActiveHealthCheckSession::~ActiveHealthCheckSession() {
256
  // Make sure onDeferredDeleteBase() has been called. We should not reference our parent at this
257
  // point since we may have been deferred deleted.
258
353
  ASSERT(interval_timer_ == nullptr && timeout_timer_ == nullptr);
259
353
}
260

            
261
353
void HealthCheckerImplBase::ActiveHealthCheckSession::onDeferredDeleteBase() {
262
353
  HealthState state = HealthState::Unhealthy;
263
  // The session is about to be deferred deleted. Make sure all timers are gone and any
264
  // implementation specific state is destroyed.
265
353
  interval_timer_.reset();
266
353
  timeout_timer_.reset();
267
353
  if (!host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
268
161
    parent_.decHealthy();
269
161
    state = HealthState::Healthy;
270
161
  }
271
353
  if (host_->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) {
272
    parent_.decDegraded();
273
  }
274
353
  onDeferredDelete();
275

            
276
  // Run callbacks in case something is waiting for health checks to run which will now never run.
277
353
  if (first_check_) {
278
65
    parent_.runCallbacks(host_, HealthTransition::Unchanged, state);
279
65
  }
280
353
}
281

            
282
869
void HealthCheckerImplBase::ActiveHealthCheckSession::handleSuccess(bool degraded) {
283
  // If we are healthy, reset the # of unhealthy to zero.
284
869
  num_unhealthy_ = 0;
285

            
286
869
  HealthTransition changed_state = HealthTransition::Unchanged;
287

            
288
869
  if (host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
289
    // If this is the first time we ever got a check result on this host, we immediately move
290
    // it to healthy. This makes startup faster with a small reduction in overall reliability
291
    // depending on the HC settings.
292
121
    if (first_check_ || ++num_healthy_ == parent_.healthy_threshold_) {
293
      // If the host moves to healthy, clear active HC timeout, which may be toggled off and on
294
      // while the host is unhealthy.
295
100
      host_->healthFlagClear(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
296
      // A host that was told to exclude based on immediate failure, but is now passing, should
297
      // no longer be excluded.
298
100
      host_->healthFlagClear(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL);
299
100
      host_->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
300
100
      parent_.incHealthy();
301
100
      changed_state = HealthTransition::Changed;
302
100
      if (parent_.event_logger_) {
303
23
        parent_.event_logger_->logAddHealthy(parent_.healthCheckerType(), host_, first_check_);
304
23
      }
305
101
    } else {
306
21
      changed_state = HealthTransition::ChangePending;
307
21
    }
308
121
    host_->setLastHcPassTime(time_source_.monotonicTime());
309
121
  }
310

            
311
869
  if (changed_state != HealthTransition::Changed && parent_.always_log_health_check_success_ &&
312
869
      parent_.event_logger_) {
313
1
    parent_.event_logger_->logSuccessfulHealthCheck(parent_.healthCheckerType(), host_);
314
1
  }
315

            
316
869
  changed_state = clearPendingFlag(changed_state);
317

            
318
869
  if (degraded != host_->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) {
319
2
    if (degraded) {
320
1
      host_->healthFlagSet(Host::HealthFlag::DEGRADED_ACTIVE_HC);
321
1
      parent_.incDegraded();
322
1
      if (parent_.event_logger_) {
323
1
        parent_.event_logger_->logDegraded(parent_.healthCheckerType(), host_);
324
1
      }
325
1
    } else {
326
1
      if (parent_.event_logger_) {
327
1
        parent_.event_logger_->logNoLongerDegraded(parent_.healthCheckerType(), host_);
328
1
      }
329
1
      host_->healthFlagClear(Host::HealthFlag::DEGRADED_ACTIVE_HC);
330
1
    }
331

            
332
    // This check ensures that we honor the decision made about Changed vs ChangePending in the
333
    // above block.
334
    // TODO(snowp): should there be degraded_threshold?
335
2
    if (changed_state == HealthTransition::Unchanged) {
336
2
      changed_state = HealthTransition::Changed;
337
2
    }
338
2
  }
339

            
340
869
  parent_.stats_.success_.inc();
341
869
  first_check_ = false;
342
869
  parent_.runCallbacks(host_, changed_state, HealthState::Healthy);
343

            
344
869
  timeout_timer_->disableTimer();
345
869
  interval_timer_->enableTimer(parent_.interval(HealthState::Healthy, changed_state));
346
869
}
347

            
348
namespace {
349

            
350
329
bool networkHealthCheckFailureType(envoy::data::core::v3::HealthCheckFailureType type) {
351
329
  return type == envoy::data::core::v3::NETWORK || type == envoy::data::core::v3::NETWORK_TIMEOUT;
352
329
}
353

            
354
} // namespace
355

            
356
HealthTransition HealthCheckerImplBase::ActiveHealthCheckSession::setUnhealthy(
357
204
    envoy::data::core::v3::HealthCheckFailureType type, bool retriable) {
358
  // If we are unhealthy, reset the # of healthy to zero.
359
204
  num_healthy_ = 0;
360

            
361
204
  HealthTransition changed_state = HealthTransition::Unchanged;
362
204
  if (!host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
363
125
    if ((!networkHealthCheckFailureType(type) && !retriable) ||
364
125
        ++num_unhealthy_ == parent_.unhealthy_threshold_) {
365
104
      host_->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
366
104
      parent_.decHealthy();
367
104
      changed_state = HealthTransition::Changed;
368
104
      if (parent_.event_logger_) {
369
75
        parent_.event_logger_->logEjectUnhealthy(parent_.healthCheckerType(), host_, type);
370
75
      }
371
106
    } else {
372
21
      changed_state = HealthTransition::ChangePending;
373
21
    }
374
125
  }
375

            
376
  // In the case of network timeout and if the host is currently failed, set the timeout flag.
377
  // Otherwise clear it. This allows a host to toggle between timeout and failure if it's continuing
378
  // to fail for different reasons.
379
204
  if (type == envoy::data::core::v3::NETWORK_TIMEOUT &&
380
204
      host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
381
26
    host_->healthFlagSet(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
382
178
  } else {
383
178
    host_->healthFlagClear(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
384
178
  }
385

            
386
204
  changed_state = clearPendingFlag(changed_state);
387

            
388
204
  if ((first_check_ || parent_.always_log_health_check_failures_) && parent_.event_logger_) {
389
70
    parent_.event_logger_->logUnhealthy(parent_.healthCheckerType(), host_, type, first_check_);
390
70
  }
391

            
392
204
  parent_.stats_.failure_.inc();
393
204
  if (networkHealthCheckFailureType(type)) {
394
106
    parent_.stats_.network_failure_.inc();
395
119
  } else if (type == envoy::data::core::v3::PASSIVE) {
396
2
    parent_.stats_.passive_failure_.inc();
397
2
  }
398

            
399
204
  first_check_ = false;
400
204
  parent_.runCallbacks(host_, changed_state, HealthState::Unhealthy);
401
204
  return changed_state;
402
204
}
403

            
404
void HealthCheckerImplBase::ActiveHealthCheckSession::handleFailure(
405
202
    envoy::data::core::v3::HealthCheckFailureType type, bool retriable) {
406
202
  HealthTransition changed_state = setUnhealthy(type, retriable);
407
  // It's possible that the previous call caused this session to be deferred deleted.
408
202
  if (timeout_timer_ != nullptr) {
409
193
    timeout_timer_->disableTimer();
410
193
  }
411

            
412
202
  if (interval_timer_ != nullptr) {
413
193
    interval_timer_->enableTimer(parent_.interval(HealthState::Unhealthy, changed_state));
414
193
  }
415
202
}
416

            
417
HealthTransition
418
1073
HealthCheckerImplBase::ActiveHealthCheckSession::clearPendingFlag(HealthTransition changed_state) {
419
1073
  if (host_->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC)) {
420
7
    host_->healthFlagClear(Host::HealthFlag::PENDING_ACTIVE_HC);
421
    // Even though the health value of the host might have not changed, we set this to Changed so
422
    // that the cluster can update its list of excluded hosts.
423
7
    return HealthTransition::Changed;
424
7
  }
425

            
426
1066
  return changed_state;
427
1073
}
428

            
429
1144
void HealthCheckerImplBase::ActiveHealthCheckSession::onIntervalBase() {
430
1144
  onInterval();
431
1144
  timeout_timer_->enableTimer(parent_.timeout_);
432
1144
  parent_.stats_.attempt_.inc();
433
1144
}
434

            
435
36
void HealthCheckerImplBase::ActiveHealthCheckSession::onTimeoutBase() {
436
36
  onTimeout();
437
36
  handleFailure(envoy::data::core::v3::NETWORK_TIMEOUT);
438
36
}
439

            
440
352
void HealthCheckerImplBase::ActiveHealthCheckSession::onInitialInterval() {
441
352
  if (parent_.initial_jitter_.count() == 0) {
442
351
    onIntervalBase();
443
351
  } else {
444
1
    interval_timer_->enableTimer(
445
1
        std::chrono::milliseconds(parent_.intervalWithJitter(0, parent_.initial_jitter_)));
446
1
  }
447
352
}
448

            
449
} // namespace Upstream
450
} // namespace Envoy