Line data Source code
1 : #include "source/extensions/health_checkers/common/health_checker_base_impl.h"
2 :
3 : #include "envoy/config/core/v3/address.pb.h"
4 : #include "envoy/config/core/v3/health_check.pb.h"
5 : #include "envoy/data/core/v3/health_check_event.pb.h"
6 : #include "envoy/stats/scope.h"
7 :
8 : #include "source/common/network/utility.h"
9 : #include "source/common/router/router.h"
10 :
11 : namespace Envoy {
12 : namespace Upstream {
13 :
14 : HealthCheckerImplBase::HealthCheckerImplBase(const Cluster& cluster,
15 : const envoy::config::core::v3::HealthCheck& config,
16 : Event::Dispatcher& dispatcher,
17 : Runtime::Loader& runtime,
18 : Random::RandomGenerator& random,
19 : HealthCheckEventLoggerPtr&& event_logger)
20 : : always_log_health_check_failures_(config.always_log_health_check_failures()),
21 : cluster_(cluster), dispatcher_(dispatcher),
22 : timeout_(PROTOBUF_GET_MS_REQUIRED(config, timeout)),
23 : unhealthy_threshold_(PROTOBUF_GET_WRAPPED_REQUIRED(config, unhealthy_threshold)),
24 : healthy_threshold_(PROTOBUF_GET_WRAPPED_REQUIRED(config, healthy_threshold)),
25 : stats_(generateStats(cluster.info()->statsScope())), runtime_(runtime), random_(random),
26 : reuse_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, reuse_connection, true)),
27 : event_logger_(std::move(event_logger)), interval_(PROTOBUF_GET_MS_REQUIRED(config, interval)),
28 : no_traffic_interval_(PROTOBUF_GET_MS_OR_DEFAULT(config, no_traffic_interval, 60000)),
29 : no_traffic_healthy_interval_(PROTOBUF_GET_MS_OR_DEFAULT(config, no_traffic_healthy_interval,
30 : no_traffic_interval_.count())),
31 : initial_jitter_(PROTOBUF_GET_MS_OR_DEFAULT(config, initial_jitter, 0)),
32 : interval_jitter_(PROTOBUF_GET_MS_OR_DEFAULT(config, interval_jitter, 0)),
33 : interval_jitter_percent_(config.interval_jitter_percent()),
34 : unhealthy_interval_(
35 : PROTOBUF_GET_MS_OR_DEFAULT(config, unhealthy_interval, interval_.count())),
36 : unhealthy_edge_interval_(
37 : PROTOBUF_GET_MS_OR_DEFAULT(config, unhealthy_edge_interval, unhealthy_interval_.count())),
38 : healthy_edge_interval_(
39 : PROTOBUF_GET_MS_OR_DEFAULT(config, healthy_edge_interval, interval_.count())),
40 : transport_socket_options_(initTransportSocketOptions(config)),
41 : transport_socket_match_metadata_(initTransportSocketMatchMetadata(config)),
42 : member_update_cb_{cluster_.prioritySet().addMemberUpdateCb(
43 0 : [this](const HostVector& hosts_added, const HostVector& hosts_removed) -> void {
44 0 : onClusterMemberUpdate(hosts_added, hosts_removed);
45 50 : })} {}
46 :
47 : std::shared_ptr<const Network::TransportSocketOptionsImpl>
48 : HealthCheckerImplBase::initTransportSocketOptions(
49 48 : const envoy::config::core::v3::HealthCheck& config) {
50 48 : if (config.has_tls_options()) {
51 2 : std::vector<std::string> protocols{config.tls_options().alpn_protocols().begin(),
52 2 : config.tls_options().alpn_protocols().end()};
53 2 : return std::make_shared<const Network::TransportSocketOptionsImpl>(
54 2 : "", std::vector<std::string>{}, std::move(protocols));
55 2 : }
56 :
57 46 : return std::make_shared<const Network::TransportSocketOptionsImpl>();
58 48 : }
59 :
60 : MetadataConstSharedPtr HealthCheckerImplBase::initTransportSocketMatchMetadata(
61 48 : const envoy::config::core::v3::HealthCheck& config) {
62 48 : if (config.has_transport_socket_match_criteria()) {
63 3 : std::shared_ptr<envoy::config::core::v3::Metadata> metadata =
64 3 : std::make_shared<envoy::config::core::v3::Metadata>();
65 3 : (*metadata->mutable_filter_metadata())[Envoy::Config::MetadataFilters::get()
66 3 : .ENVOY_TRANSPORT_SOCKET_MATCH] =
67 3 : config.transport_socket_match_criteria();
68 3 : return metadata;
69 3 : }
70 :
71 45 : return nullptr;
72 48 : }
73 :
74 48 : HealthCheckerImplBase::~HealthCheckerImplBase() {
75 : // First clear callbacks that otherwise will be run from
76 : // ActiveHealthCheckSession::onDeferredDeleteBase(). This prevents invoking a callback on a
77 : // deleted parent object (e.g. Cluster).
78 48 : callbacks_.clear();
79 : // ASSERTs inside the session destructor check to make sure we have been previously deferred
80 : // deleted. Unify that logic here before actual destruction happens.
81 48 : for (auto& session : active_sessions_) {
82 46 : session.second->onDeferredDeleteBase();
83 46 : }
84 48 : }
85 :
86 46 : void HealthCheckerImplBase::decHealthy() { stats_.healthy_.sub(1); }
87 :
88 0 : void HealthCheckerImplBase::decDegraded() { stats_.degraded_.sub(1); }
89 :
90 50 : HealthCheckerStats HealthCheckerImplBase::generateStats(Stats::Scope& scope) {
91 50 : std::string prefix("health_check.");
92 50 : return {ALL_HEALTH_CHECKER_STATS(POOL_COUNTER_PREFIX(scope, prefix),
93 50 : POOL_GAUGE_PREFIX(scope, prefix))};
94 50 : }
95 :
96 46 : void HealthCheckerImplBase::incHealthy() { stats_.healthy_.add(1); }
97 :
98 1 : void HealthCheckerImplBase::incDegraded() { stats_.degraded_.add(1); }
99 :
100 : std::chrono::milliseconds HealthCheckerImplBase::interval(HealthState state,
101 60 : HealthTransition changed_state) const {
102 : // See if the cluster has ever made a connection. If not, we use a much slower interval to keep
103 : // the host info relatively up to date in case we suddenly start sending traffic to this cluster.
104 : // In general host updates are rare and this should greatly smooth out needless health checking.
105 : // If a connection has been established, we choose an interval based on the host's health. Please
106 : // refer to the HealthCheck API documentation for more details.
107 60 : uint64_t base_time_ms;
108 60 : if (cluster_.info()->trafficStats()->upstream_cx_total_.used()) {
109 : // When healthy/unhealthy threshold is configured the health transition of a host will be
110 : // delayed. In this situation Envoy should use the edge interval settings between health checks.
111 : //
112 : // Example scenario for an unhealthy host with healthy_threshold set to 3:
113 : // - check fails, host is still unhealthy and next check happens after unhealthy_interval;
114 : // - check succeeds, host is still unhealthy and next check happens after healthy_edge_interval;
115 : // - check succeeds, host is still unhealthy and next check happens after healthy_edge_interval;
116 : // - check succeeds, host is now healthy and next check happens after interval;
117 : // - check succeeds, host is still healthy and next check happens after interval.
118 4 : switch (state) {
119 0 : case HealthState::Unhealthy:
120 0 : base_time_ms = changed_state == HealthTransition::ChangePending
121 0 : ? unhealthy_edge_interval_.count()
122 0 : : unhealthy_interval_.count();
123 0 : break;
124 4 : default:
125 4 : base_time_ms = changed_state == HealthTransition::ChangePending
126 4 : ? healthy_edge_interval_.count()
127 4 : : interval_.count();
128 4 : break;
129 4 : }
130 56 : } else {
131 56 : base_time_ms =
132 56 : (state == HealthState::Healthy && changed_state != HealthTransition::ChangePending)
133 56 : ? no_traffic_healthy_interval_.count()
134 56 : : no_traffic_interval_.count();
135 56 : }
136 60 : return intervalWithJitter(base_time_ms, interval_jitter_);
137 60 : }
138 :
139 : std::chrono::milliseconds
140 : HealthCheckerImplBase::intervalWithJitter(uint64_t base_time_ms,
141 68 : std::chrono::milliseconds interval_jitter) const {
142 68 : const uint64_t jitter_percent_mod = interval_jitter_percent_ * base_time_ms / 100;
143 68 : if (jitter_percent_mod > 0) {
144 0 : base_time_ms += random_.random() % jitter_percent_mod;
145 0 : }
146 :
147 68 : if (interval_jitter.count() > 0) {
148 36 : base_time_ms += (random_.random() % interval_jitter.count());
149 36 : }
150 :
151 68 : const uint64_t min_interval = runtime_.snapshot().getInteger("health_check.min_interval", 0);
152 68 : const uint64_t max_interval = runtime_.snapshot().getInteger(
153 68 : "health_check.max_interval", std::numeric_limits<uint64_t>::max());
154 :
155 68 : uint64_t final_ms = std::min(base_time_ms, max_interval);
156 : // We force a non-zero final MS, to prevent live lock.
157 68 : final_ms = std::max(uint64_t(1), std::max(final_ms, min_interval));
158 68 : return std::chrono::milliseconds(final_ms);
159 68 : }
160 :
161 46 : void HealthCheckerImplBase::addHosts(const HostVector& hosts) {
162 46 : for (const HostSharedPtr& host : hosts) {
163 46 : if (host->disableActiveHealthCheck()) {
164 0 : continue;
165 0 : }
166 46 : active_sessions_[host] = makeSession(host);
167 46 : host->setHealthChecker(
168 46 : HealthCheckHostMonitorPtr{new HealthCheckHostMonitorImpl(shared_from_this(), host)});
169 46 : active_sessions_[host]->start();
170 46 : }
171 46 : }
172 :
173 : void HealthCheckerImplBase::onClusterMemberUpdate(const HostVector& hosts_added,
174 0 : const HostVector& hosts_removed) {
175 0 : addHosts(hosts_added);
176 0 : for (const HostSharedPtr& host : hosts_removed) {
177 0 : if (host->disableActiveHealthCheck()) {
178 0 : continue;
179 0 : }
180 0 : auto session_iter = active_sessions_.find(host);
181 0 : ASSERT(active_sessions_.end() != session_iter);
182 : // This deletion can happen inline in response to a host failure, so we deferred delete.
183 0 : session_iter->second->onDeferredDeleteBase();
184 0 : dispatcher_.deferredDelete(std::move(session_iter->second));
185 0 : active_sessions_.erase(session_iter);
186 0 : }
187 0 : }
188 :
189 66 : void HealthCheckerImplBase::runCallbacks(HostSharedPtr host, HealthTransition changed_state) {
190 66 : for (const HostStatusCb& cb : callbacks_) {
191 0 : cb(host, changed_state);
192 0 : }
193 66 : }
194 :
195 0 : void HealthCheckerImplBase::HealthCheckHostMonitorImpl::setUnhealthy(UnhealthyType type) {
196 : // This is called cross thread. The cluster/health checker might already be gone.
197 0 : std::shared_ptr<HealthCheckerImplBase> health_checker = health_checker_.lock();
198 0 : if (health_checker) {
199 0 : health_checker->setUnhealthyCrossThread(host_.lock(), type);
200 0 : }
201 0 : }
202 :
203 : void HealthCheckerImplBase::setUnhealthyCrossThread(const HostSharedPtr& host,
204 0 : HealthCheckHostMonitor::UnhealthyType type) {
205 0 : if (type == HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail) {
206 0 : host->healthFlagSet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL);
207 0 : }
208 :
209 : // The threading here is complex. The cluster owns the only strong reference to the health
210 : // checker. It might go away when we post to the main thread from a worker thread. To deal with
211 : // this we use the following sequence of events:
212 : // 1) We capture a weak reference to the health checker and post it from worker thread to main
213 : // thread.
214 : // 2) On the main thread, we make sure it is still valid (as the cluster may have been destroyed).
215 : // 3) Additionally, the host/session may also be gone by then so we check that also.
216 0 : std::weak_ptr<HealthCheckerImplBase> weak_this = shared_from_this();
217 0 : dispatcher_.post([weak_this, host]() -> void {
218 0 : std::shared_ptr<HealthCheckerImplBase> shared_this = weak_this.lock();
219 0 : if (shared_this == nullptr) {
220 0 : return;
221 0 : }
222 :
223 0 : const auto session = shared_this->active_sessions_.find(host);
224 0 : if (session == shared_this->active_sessions_.end()) {
225 0 : return;
226 0 : }
227 :
228 0 : session->second->setUnhealthy(envoy::data::core::v3::PASSIVE, /*retriable=*/false);
229 0 : });
230 0 : }
231 :
232 46 : void HealthCheckerImplBase::start() {
233 46 : for (auto& host_set : cluster_.prioritySet().hostSetsPerPriority()) {
234 46 : addHosts(host_set->hosts());
235 46 : }
236 46 : }
237 :
238 : HealthCheckerImplBase::ActiveHealthCheckSession::ActiveHealthCheckSession(
239 : HealthCheckerImplBase& parent, HostSharedPtr host)
240 : : host_(host), parent_(parent),
241 35 : interval_timer_(parent.dispatcher_.createTimer([this]() -> void { onIntervalBase(); })),
242 13 : timeout_timer_(parent.dispatcher_.createTimer([this]() -> void { onTimeoutBase(); })),
243 46 : time_source_(parent.dispatcher_.timeSource()) {
244 :
245 46 : if (!host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
246 45 : parent.incHealthy();
247 45 : }
248 :
249 46 : if (host->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) {
250 0 : parent.incDegraded();
251 0 : }
252 46 : }
253 :
254 46 : HealthCheckerImplBase::ActiveHealthCheckSession::~ActiveHealthCheckSession() {
255 : // Make sure onDeferredDeleteBase() has been called. We should not reference our parent at this
256 : // point since we may have been deferred deleted.
257 46 : ASSERT(interval_timer_ == nullptr && timeout_timer_ == nullptr);
258 46 : }
259 :
260 46 : void HealthCheckerImplBase::ActiveHealthCheckSession::onDeferredDeleteBase() {
261 : // The session is about to be deferred deleted. Make sure all timers are gone and any
262 : // implementation specific state is destroyed.
263 46 : interval_timer_.reset();
264 46 : timeout_timer_.reset();
265 46 : if (!host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
266 30 : parent_.decHealthy();
267 30 : }
268 46 : if (host_->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) {
269 0 : parent_.decDegraded();
270 0 : }
271 46 : onDeferredDelete();
272 :
273 : // Run callbacks in case something is waiting for health checks to run which will now never run.
274 46 : if (first_check_) {
275 6 : parent_.runCallbacks(host_, HealthTransition::Unchanged);
276 6 : }
277 46 : }
278 :
279 23 : void HealthCheckerImplBase::ActiveHealthCheckSession::handleSuccess(bool degraded) {
280 : // If we are healthy, reset the # of unhealthy to zero.
281 23 : num_unhealthy_ = 0;
282 :
283 23 : HealthTransition changed_state = HealthTransition::Unchanged;
284 :
285 23 : if (host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
286 : // If this is the first time we ever got a check result on this host, we immediately move
287 : // it to healthy. This makes startup faster with a small reduction in overall reliability
288 : // depending on the HC settings.
289 1 : if (first_check_ || ++num_healthy_ == parent_.healthy_threshold_) {
290 : // If the host moves to healthy, clear active HC timeout, which may be toggled off and on
291 : // while the host is unhealthy.
292 1 : host_->healthFlagClear(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
293 : // A host that was told to exclude based on immediate failure, but is now passing, should
294 : // no longer be excluded.
295 1 : host_->healthFlagClear(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL);
296 1 : host_->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
297 1 : parent_.incHealthy();
298 1 : changed_state = HealthTransition::Changed;
299 1 : if (parent_.event_logger_) {
300 1 : parent_.event_logger_->logAddHealthy(parent_.healthCheckerType(), host_, first_check_);
301 1 : }
302 1 : } else {
303 0 : changed_state = HealthTransition::ChangePending;
304 0 : }
305 1 : host_->setLastHcPassTime(time_source_.monotonicTime());
306 1 : }
307 :
308 23 : changed_state = clearPendingFlag(changed_state);
309 :
310 23 : if (degraded != host_->healthFlagGet(Host::HealthFlag::DEGRADED_ACTIVE_HC)) {
311 2 : if (degraded) {
312 1 : host_->healthFlagSet(Host::HealthFlag::DEGRADED_ACTIVE_HC);
313 1 : parent_.incDegraded();
314 1 : if (parent_.event_logger_) {
315 1 : parent_.event_logger_->logDegraded(parent_.healthCheckerType(), host_);
316 1 : }
317 1 : } else {
318 1 : if (parent_.event_logger_) {
319 1 : parent_.event_logger_->logNoLongerDegraded(parent_.healthCheckerType(), host_);
320 1 : }
321 1 : host_->healthFlagClear(Host::HealthFlag::DEGRADED_ACTIVE_HC);
322 1 : }
323 :
324 : // This check ensures that we honor the decision made about Changed vs ChangePending in the
325 : // above block.
326 : // TODO(snowp): should there be degraded_threshold?
327 2 : if (changed_state == HealthTransition::Unchanged) {
328 2 : changed_state = HealthTransition::Changed;
329 2 : }
330 2 : }
331 :
332 23 : parent_.stats_.success_.inc();
333 23 : first_check_ = false;
334 23 : parent_.runCallbacks(host_, changed_state);
335 :
336 23 : timeout_timer_->disableTimer();
337 23 : interval_timer_->enableTimer(parent_.interval(HealthState::Healthy, changed_state));
338 23 : }
339 :
340 : namespace {
341 :
342 73 : bool networkHealthCheckFailureType(envoy::data::core::v3::HealthCheckFailureType type) {
343 73 : return type == envoy::data::core::v3::NETWORK || type == envoy::data::core::v3::NETWORK_TIMEOUT;
344 73 : }
345 :
346 : } // namespace
347 :
348 : HealthTransition HealthCheckerImplBase::ActiveHealthCheckSession::setUnhealthy(
349 37 : envoy::data::core::v3::HealthCheckFailureType type, bool retriable) {
350 : // If we are unhealthy, reset the # of healthy to zero.
351 37 : num_healthy_ = 0;
352 :
353 37 : HealthTransition changed_state = HealthTransition::Unchanged;
354 37 : if (!host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
355 36 : if ((!networkHealthCheckFailureType(type) && !retriable) ||
356 36 : ++num_unhealthy_ == parent_.unhealthy_threshold_) {
357 16 : host_->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
358 16 : parent_.decHealthy();
359 16 : changed_state = HealthTransition::Changed;
360 16 : if (parent_.event_logger_) {
361 16 : parent_.event_logger_->logEjectUnhealthy(parent_.healthCheckerType(), host_, type);
362 16 : }
363 20 : } else {
364 20 : changed_state = HealthTransition::ChangePending;
365 20 : }
366 36 : }
367 :
368 : // In the case of network timeout and if the host is currently failed, set the timeout flag.
369 : // Otherwise clear it. This allows a host to toggle between timeout and failure if it's continuing
370 : // to fail for different reasons.
371 37 : if (type == envoy::data::core::v3::NETWORK_TIMEOUT &&
372 37 : host_->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
373 2 : host_->healthFlagSet(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
374 35 : } else {
375 35 : host_->healthFlagClear(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
376 35 : }
377 :
378 37 : changed_state = clearPendingFlag(changed_state);
379 :
380 37 : if ((first_check_ || parent_.always_log_health_check_failures_) && parent_.event_logger_) {
381 22 : parent_.event_logger_->logUnhealthy(parent_.healthCheckerType(), host_, type, first_check_);
382 22 : }
383 :
384 37 : parent_.stats_.failure_.inc();
385 37 : if (networkHealthCheckFailureType(type)) {
386 28 : parent_.stats_.network_failure_.inc();
387 28 : } else if (type == envoy::data::core::v3::PASSIVE) {
388 0 : parent_.stats_.passive_failure_.inc();
389 0 : }
390 :
391 37 : first_check_ = false;
392 37 : parent_.runCallbacks(host_, changed_state);
393 37 : return changed_state;
394 37 : }
395 :
396 : void HealthCheckerImplBase::ActiveHealthCheckSession::handleFailure(
397 37 : envoy::data::core::v3::HealthCheckFailureType type, bool retriable) {
398 37 : HealthTransition changed_state = setUnhealthy(type, retriable);
399 : // It's possible that the previous call caused this session to be deferred deleted.
400 37 : if (timeout_timer_ != nullptr) {
401 37 : timeout_timer_->disableTimer();
402 37 : }
403 :
404 37 : if (interval_timer_ != nullptr) {
405 37 : interval_timer_->enableTimer(parent_.interval(HealthState::Unhealthy, changed_state));
406 37 : }
407 37 : }
408 :
409 : HealthTransition
410 60 : HealthCheckerImplBase::ActiveHealthCheckSession::clearPendingFlag(HealthTransition changed_state) {
411 60 : if (host_->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC)) {
412 0 : host_->healthFlagClear(Host::HealthFlag::PENDING_ACTIVE_HC);
413 : // Even though the health value of the host might have not changed, we set this to Changed so
414 : // that the cluster can update its list of excluded hosts.
415 0 : return HealthTransition::Changed;
416 0 : }
417 :
418 60 : return changed_state;
419 60 : }
420 :
421 73 : void HealthCheckerImplBase::ActiveHealthCheckSession::onIntervalBase() {
422 73 : onInterval();
423 73 : timeout_timer_->enableTimer(parent_.timeout_);
424 73 : parent_.stats_.attempt_.inc();
425 73 : }
426 :
427 13 : void HealthCheckerImplBase::ActiveHealthCheckSession::onTimeoutBase() {
428 13 : onTimeout();
429 13 : handleFailure(envoy::data::core::v3::NETWORK_TIMEOUT);
430 13 : }
431 :
432 46 : void HealthCheckerImplBase::ActiveHealthCheckSession::onInitialInterval() {
433 46 : if (parent_.initial_jitter_.count() == 0) {
434 38 : onIntervalBase();
435 38 : } else {
436 8 : interval_timer_->enableTimer(
437 8 : std::chrono::milliseconds(parent_.intervalWithJitter(0, parent_.initial_jitter_)));
438 8 : }
439 46 : }
440 :
441 : } // namespace Upstream
442 : } // namespace Envoy
|