1
#include "source/common/upstream/outlier_detection_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7
#include <vector>
8

            
9
#include "envoy/config/cluster/v3/cluster.pb.h"
10
#include "envoy/config/cluster/v3/outlier_detection.pb.h"
11
#include "envoy/data/cluster/v3/outlier_detection_event.pb.h"
12
#include "envoy/event/dispatcher.h"
13
#include "envoy/stats/scope.h"
14

            
15
#include "source/common/common/assert.h"
16
#include "source/common/common/enum_to_int.h"
17
#include "source/common/common/fmt.h"
18
#include "source/common/common/utility.h"
19
#include "source/common/http/codes.h"
20
#include "source/common/protobuf/utility.h"
21

            
22
namespace Envoy {
23
namespace Upstream {
24
namespace Outlier {
25

            
26
absl::StatusOr<DetectorSharedPtr> DetectorImplFactory::createForCluster(
27
    Cluster& cluster, const envoy::config::cluster::v3::Cluster& cluster_config,
28
    Event::Dispatcher& dispatcher, Runtime::Loader& runtime, EventLoggerSharedPtr event_logger,
29
17793
    Random::RandomGenerator& random) {
30
17793
  if (cluster_config.has_outlier_detection()) {
31

            
32
51
    return DetectorImpl::create(cluster, cluster_config.outlier_detection(), dispatcher, runtime,
33
51
                                dispatcher.timeSource(), std::move(event_logger), random);
34
17778
  } else {
35
17742
    return nullptr;
36
17742
  }
37
17793
}
38

            
39
DetectorHostMonitorImpl::DetectorHostMonitorImpl(std::shared_ptr<DetectorImpl> detector,
40
                                                 HostSharedPtr host)
41
114
    : detector_(detector), host_(host),
42
      // add Success Rate monitors
43
114
      external_origin_sr_monitor_(envoy::data::cluster::v3::SUCCESS_RATE),
44
114
      local_origin_sr_monitor_(envoy::data::cluster::v3::SUCCESS_RATE_LOCAL_ORIGIN) {
45
  // Setup method to call when putResult is invoked. Depending on the config's
46
  // split_external_local_origin_errors_ boolean value different method is called.
47
114
  put_result_func_ = detector->config().splitExternalLocalOriginErrors()
48
114
                         ? &DetectorHostMonitorImpl::putResultWithLocalExternalSplit
49
114
                         : &DetectorHostMonitorImpl::putResultNoLocalExternalSplit;
50
114
}
51

            
52
143
void DetectorHostMonitorImpl::eject(MonotonicTime ejection_time) {
53
143
  ASSERT(!host_.lock()->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK));
54
143
  host_.lock()->healthFlagSet(Host::HealthFlag::FAILED_OUTLIER_CHECK);
55
143
  num_ejections_++;
56
143
  last_ejection_time_ = ejection_time;
57
143
}
58

            
59
101
void DetectorHostMonitorImpl::uneject(MonotonicTime unejection_time) {
60
101
  last_unejection_time_ = (unejection_time);
61
101
}
62

            
63
10
void DetectorHostMonitorImpl::degrade(MonotonicTime degraded_time) {
64
10
  ASSERT(!host_.lock()->healthFlagGet(Host::HealthFlag::DEGRADED_OUTLIER_DETECTION));
65
10
  host_.lock()->healthFlagSet(Host::HealthFlag::DEGRADED_OUTLIER_DETECTION);
66
10
  num_degradations_++;
67
10
  last_degraded_time_ = degraded_time;
68
10
}
69

            
70
9
void DetectorHostMonitorImpl::undegrade(MonotonicTime undegraded_time) {
71
9
  last_undegraded_time_ = undegraded_time;
72
9
}
73

            
74
135789
void DetectorHostMonitorImpl::updateCurrentSuccessRateBucket() {
75
135789
  external_origin_sr_monitor_.updateCurrentSuccessRateBucket();
76
135789
  local_origin_sr_monitor_.updateCurrentSuccessRateBucket();
77
135789
}
78

            
79
4486
void DetectorHostMonitorImpl::putHttpResponseCode(uint64_t response_code) {
80
4486
  external_origin_sr_monitor_.incTotalReqCounter();
81
4486
  if (Http::CodeUtility::is5xx(response_code)) {
82
2362
    std::shared_ptr<DetectorImpl> detector = detector_.lock();
83
2362
    if (!detector) {
84
      // It's possible for the cluster/detector to go away while we still have a host in use.
85
5
      return;
86
5
    }
87
2357
    if (Http::CodeUtility::isGatewayError(response_code)) {
88
1257
      if (++consecutive_gateway_failure_ ==
89
1257
          detector->runtime().snapshot().getInteger(
90
1257
              ConsecutiveGatewayFailureRuntime, detector->config().consecutiveGatewayFailure())) {
91
176
        detector->onConsecutiveGatewayFailure(host_.lock());
92
176
      }
93
1269
    } else {
94
1100
      consecutive_gateway_failure_ = 0;
95
1100
    }
96

            
97
2357
    if (++consecutive_5xx_ == detector->runtime().snapshot().getInteger(
98
2357
                                  Consecutive5xxRuntime, detector->config().consecutive5xx())) {
99
359
      detector->onConsecutive5xx(host_.lock());
100
359
    }
101
2357
  } else {
102
2124
    external_origin_sr_monitor_.incSuccessReqCounter();
103
2124
    consecutive_5xx_ = 0;
104
2124
    consecutive_gateway_failure_ = 0;
105
2124
  }
106
4486
}
107

            
108
658
absl::optional<Http::Code> DetectorHostMonitorImpl::resultToHttpCode(Result result) {
109
658
  Http::Code http_code = Http::Code::InternalServerError;
110

            
111
658
  switch (result) {
112
200
  case Result::ExtOriginRequestSuccess:
113
200
  case Result::LocalOriginConnectSuccessFinal:
114
200
    http_code = Http::Code::OK;
115
200
    break;
116
5
  case Result::LocalOriginTimeout:
117
5
    http_code = Http::Code::GatewayTimeout;
118
5
    break;
119
216
  case Result::LocalOriginConnectFailed:
120
216
    http_code = Http::Code::ServiceUnavailable;
121
216
    break;
122
200
  case Result::ExtOriginRequestFailed:
123
200
    http_code = Http::Code::InternalServerError;
124
200
    break;
125
  case Result::ExtOriginRequestDegraded:
126
    http_code = Http::Code::OK;
127
    break;
128
    // LOCAL_ORIGIN_CONNECT_SUCCESS  is used is 2-layer protocols, like HTTP.
129
    // First connection is established and then higher level protocol runs.
130
    // If error happens in higher layer protocol, it will be mapped to
131
    // HTTP code indicating error. In order not to intervene with result of
132
    // higher layer protocol, this code is not mapped to HTTP code.
133
37
  case Result::LocalOriginConnectSuccess:
134
37
    return absl::nullopt;
135
658
  }
136

            
137
621
  return {http_code};
138
658
}
139

            
140
// Method is called by putResult when external and local origin errors
141
// are not treated differently. All errors are mapped to HTTP codes.
142
// Depending on the value of the parameter *code* the function behaves differently:
143
// - if the *code* is not defined, mapping uses resultToHttpCode method to do mapping.
144
// - if *code* is defined, it is taken as HTTP code and reported as such to outlier detector.
145
void DetectorHostMonitorImpl::putResultNoLocalExternalSplit(Result result,
146
4223
                                                            absl::optional<uint64_t> code) {
147
  // Mark host as degraded if needed, then process normally
148
4223
  if (result == Result::ExtOriginRequestDegraded) {
149
11
    std::shared_ptr<DetectorImpl> detector = detector_.lock();
150
11
    if (detector) {
151
11
      detector->setHostDegraded(host_.lock());
152
11
    }
153
11
  }
154

            
155
4223
  if (code) {
156
3565
    putHttpResponseCode(code.value());
157
3565
  } else {
158
658
    absl::optional<Http::Code> http_code = resultToHttpCode(result);
159
658
    if (http_code) {
160
621
      putHttpResponseCode(enumToInt(http_code.value()));
161
621
    }
162
658
  }
163
4223
}
164

            
165
// Method is called by putResult when external and local origin errors
166
// are treated separately. Local origin errors have separate counters and
167
// separate success rate monitor.
168
void DetectorHostMonitorImpl::putResultWithLocalExternalSplit(Result result,
169
3042
                                                              absl::optional<uint64_t> code) {
170
3042
  switch (result) {
171
  // SUCCESS is used to report success for connection level. Server may still respond with
172
  // error, but connection to server was OK.
173
2251
  case Result::LocalOriginConnectSuccess:
174
2251
  case Result::LocalOriginConnectSuccessFinal:
175
2251
    return localOriginNoFailure();
176
  // Connectivity related errors.
177
20
  case Result::LocalOriginTimeout:
178
491
  case Result::LocalOriginConnectFailed:
179
491
    return localOriginFailure();
180
  // EXT_ORIGIN_REQUEST_FAILED is used when connection to server was successful, but transaction on
181
  // server level failed. Since it it similar to HTTP 5xx, map it to 5xx if HTTP code is not
182
  // provided.
183
200
  case Result::ExtOriginRequestFailed:
184
    // map it to http code and call http handler.
185
200
    putHttpResponseCode(code.value_or(enumToInt(Http::Code::ServiceUnavailable)));
186
200
    break;
187
  // EXT_ORIGIN_REQUEST_SUCCESS is used to report that transaction with upstream server was
188
  // completed successfully. This means that connection and server level transactions were
189
  // successful. Map it to http code 200 OK if HTTP code is not provided.
190
100
  case Result::ExtOriginRequestSuccess:
191
100
    putHttpResponseCode(code.value_or(enumToInt(Http::Code::OK)));
192
100
    break;
193
  case Result::ExtOriginRequestDegraded:
194
    // Mark host as degraded, then process as successful response
195
    std::shared_ptr<DetectorImpl> detector = detector_.lock();
196
    if (detector) {
197
      detector->setHostDegraded(host_.lock());
198
    }
199
    putHttpResponseCode(code.value_or(enumToInt(Http::Code::OK)));
200
    break;
201
3042
  }
202
3042
}
203

            
204
// Method is used by other components to reports success or error.
205
// It calls putResultWithLocalExternalSplit or put putResultNoLocalExternalSplit via
206
// std::function. The setting happens in constructor based on split_external_local_origin_errors
207
// config parameter.
208
7265
void DetectorHostMonitorImpl::putResult(Result result, absl::optional<uint64_t> code) {
209
7265
  put_result_func_(this, result, code);
210
7265
}
211

            
212
491
void DetectorHostMonitorImpl::localOriginFailure() {
213
491
  std::shared_ptr<DetectorImpl> detector = detector_.lock();
214
491
  if (!detector) {
215
    // It's possible for the cluster/detector to go away while we still have a host in use.
216
    return;
217
  }
218
491
  local_origin_sr_monitor_.incTotalReqCounter();
219
491
  if (++consecutive_local_origin_failure_ ==
220
491
      detector->runtime().snapshot().getInteger(
221
491
          ConsecutiveLocalOriginFailureRuntime,
222
491
          detector->config().consecutiveLocalOriginFailure())) {
223
96
    detector->onConsecutiveLocalOriginFailure(host_.lock());
224
96
  }
225
491
}
226

            
227
2251
void DetectorHostMonitorImpl::localOriginNoFailure() {
228
2251
  std::shared_ptr<DetectorImpl> detector = detector_.lock();
229
2251
  if (!detector) {
230
    // It's possible for the cluster/detector to go away while we still have a host in use.
231
    return;
232
  }
233

            
234
2251
  local_origin_sr_monitor_.incTotalReqCounter();
235
2251
  local_origin_sr_monitor_.incSuccessReqCounter();
236

            
237
2251
  resetConsecutiveLocalOriginFailure();
238
2251
}
239

            
240
DetectorConfig::DetectorConfig(const envoy::config::cluster::v3::OutlierDetection& config)
241
    : interval_ms_(
242
101
          static_cast<uint64_t>(PROTOBUF_GET_MS_OR_DEFAULT(config, interval, DEFAULT_INTERVAL_MS))),
243
101
      base_ejection_time_ms_(static_cast<uint64_t>(
244
101
          PROTOBUF_GET_MS_OR_DEFAULT(config, base_ejection_time, DEFAULT_BASE_EJECTION_TIME_MS))),
245
101
      consecutive_5xx_(static_cast<uint64_t>(
246
101
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, consecutive_5xx, DEFAULT_CONSECUTIVE_5XX))),
247
101
      consecutive_gateway_failure_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
248
101
          config, consecutive_gateway_failure, DEFAULT_CONSECUTIVE_GATEWAY_FAILURE))),
249
101
      max_ejection_percent_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
250
101
          config, max_ejection_percent, DEFAULT_MAX_EJECTION_PERCENT))),
251
      always_eject_one_host_(
252
101
          static_cast<bool>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, always_eject_one_host, false))),
253
101
      success_rate_minimum_hosts_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
254
101
          config, success_rate_minimum_hosts, DEFAULT_SUCCESS_RATE_MINIMUM_HOSTS))),
255
101
      success_rate_request_volume_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
256
101
          config, success_rate_request_volume, DEFAULT_SUCCESS_RATE_REQUEST_VOLUME))),
257
101
      success_rate_stdev_factor_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
258
101
          config, success_rate_stdev_factor, DEFAULT_SUCCESS_RATE_STDEV_FACTOR))),
259
101
      failure_percentage_threshold_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
260
101
          config, failure_percentage_threshold, DEFAULT_FAILURE_PERCENTAGE_THRESHOLD))),
261
101
      failure_percentage_minimum_hosts_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
262
101
          config, failure_percentage_minimum_hosts, DEFAULT_FAILURE_PERCENTAGE_MINIMUM_HOSTS))),
263
101
      failure_percentage_request_volume_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
264
101
          config, failure_percentage_request_volume, DEFAULT_FAILURE_PERCENTAGE_REQUEST_VOLUME))),
265
101
      enforcing_consecutive_5xx_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
266
101
          config, enforcing_consecutive_5xx, DEFAULT_ENFORCING_CONSECUTIVE_5XX))),
267
101
      enforcing_consecutive_gateway_failure_(static_cast<uint64_t>(
268
101
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, enforcing_consecutive_gateway_failure,
269
101
                                          DEFAULT_ENFORCING_CONSECUTIVE_GATEWAY_FAILURE))),
270
101
      enforcing_success_rate_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
271
101
          config, enforcing_success_rate, DEFAULT_ENFORCING_SUCCESS_RATE))),
272
101
      enforcing_failure_percentage_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
273
101
          config, enforcing_failure_percentage, DEFAULT_ENFORCING_FAILURE_PERCENTAGE))),
274
101
      enforcing_failure_percentage_local_origin_(static_cast<uint64_t>(
275
101
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, enforcing_failure_percentage_local_origin,
276
101
                                          DEFAULT_ENFORCING_FAILURE_PERCENTAGE_LOCAL_ORIGIN))),
277
101
      split_external_local_origin_errors_(config.split_external_local_origin_errors()),
278
101
      consecutive_local_origin_failure_(static_cast<uint64_t>(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
279
101
          config, consecutive_local_origin_failure, DEFAULT_CONSECUTIVE_LOCAL_ORIGIN_FAILURE))),
280
101
      enforcing_consecutive_local_origin_failure_(static_cast<uint64_t>(
281
101
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, enforcing_consecutive_local_origin_failure,
282
101
                                          DEFAULT_ENFORCING_CONSECUTIVE_LOCAL_ORIGIN_FAILURE))),
283
101
      enforcing_local_origin_success_rate_(static_cast<uint64_t>(
284
101
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, enforcing_local_origin_success_rate,
285
101
                                          DEFAULT_ENFORCING_LOCAL_ORIGIN_SUCCESS_RATE))),
286
      // If max_ejection_time was not specified in the config, apply the default or
287
      // base_ejection_time whatever is larger.
288
101
      max_ejection_time_ms_(static_cast<uint64_t>(PROTOBUF_GET_MS_OR_DEFAULT(
289
101
          config, max_ejection_time,
290
101
          std::max(DEFAULT_MAX_EJECTION_TIME_MS, base_ejection_time_ms_)))),
291
101
      max_ejection_time_jitter_ms_(static_cast<uint64_t>(PROTOBUF_GET_MS_OR_DEFAULT(
292
101
          config, max_ejection_time_jitter, DEFAULT_MAX_EJECTION_TIME_JITTER_MS))),
293
101
      successful_active_health_check_uneject_host_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
294
          config, successful_active_health_check_uneject_host, true)),
295
101
      detect_degraded_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, detect_degraded_hosts, false)) {}
296

            
297
DetectorImpl::DetectorImpl(const Cluster& cluster,
298
                           const envoy::config::cluster::v3::OutlierDetection& config,
299
                           Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
300
                           TimeSource& time_source, EventLoggerSharedPtr event_logger,
301
                           Random::RandomGenerator& random)
302
101
    : config_(config), dispatcher_(dispatcher), runtime_(runtime), time_source_(time_source),
303
101
      stats_(generateStats(cluster.info()->statsScope())),
304
135728
      interval_timer_(dispatcher.createTimer([this]() -> void { onIntervalTimer(); })),
305
101
      event_logger_(event_logger), random_generator_(random) {
306
  // Insert success rate initial numbers for each type of SR detector
307
101
  external_origin_sr_num_ = {-1, -1};
308
101
  local_origin_sr_num_ = {-1, -1};
309
101
}
310

            
311
101
DetectorImpl::~DetectorImpl() {
312
114
  for (const auto& host : host_monitors_) {
313
100
    if (host.first->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
314
35
      ASSERT(ejections_active_helper_.value() > 0);
315
35
      ejections_active_helper_.dec();
316
35
    }
317
100
  }
318
101
}
319

            
320
absl::StatusOr<std::shared_ptr<DetectorImpl>>
321
DetectorImpl::create(Cluster& cluster, const envoy::config::cluster::v3::OutlierDetection& config,
322
                     Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
323
                     TimeSource& time_source, EventLoggerSharedPtr event_logger,
324
101
                     Random::RandomGenerator& random) {
325
101
  std::shared_ptr<DetectorImpl> detector(
326
101
      new DetectorImpl(cluster, config, dispatcher, runtime, time_source, event_logger, random));
327

            
328
101
  if (detector->config().maxEjectionTimeMs() < detector->config().baseEjectionTimeMs()) {
329
1
    return absl::InvalidArgumentError(
330
1
        "outlier detector's max_ejection_time cannot be smaller than base_ejection_time");
331
1
  }
332
100
  detector->initialize(cluster);
333

            
334
100
  return detector;
335
101
}
336

            
337
100
void DetectorImpl::initialize(Cluster& cluster) {
338
149
  for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
339
149
    for (const HostSharedPtr& host : host_set->hosts()) {
340
71
      addHostMonitor(host);
341
71
    }
342
149
  }
343

            
344
100
  if (config_.successfulActiveHealthCheckUnejectHost() && cluster.healthChecker() != nullptr) {
345
15
    cluster.healthChecker()->addHostCheckCompleteCb(
346
18
        [this](HostSharedPtr host, HealthTransition, HealthState current_check_result) {
347
          // If the host is ejected by outlier detection and active health check succeeds,
348
          // we should treat this host as healthy.
349
4
          if (current_check_result == HealthState::Healthy &&
350
4
              !host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) &&
351
4
              host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
352
2
            host->healthFlagClear(Host::HealthFlag::FAILED_OUTLIER_CHECK);
353
2
            unejectHost(host);
354
2
          }
355
4
        });
356
15
  }
357
100
  member_update_cb_ = cluster.prioritySet().addMemberUpdateCb(
358
118
      [this](const HostVector& hosts_added, const HostVector& hosts_removed) {
359
69
        for (const HostSharedPtr& host : hosts_added) {
360
43
          addHostMonitor(host);
361
43
        }
362

            
363
69
        for (const HostSharedPtr& host : hosts_removed) {
364
14
          ASSERT(host_monitors_.count(host) == 1);
365
14
          if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
366
7
            ASSERT(ejections_active_helper_.value() > 0);
367
7
            ejections_active_helper_.dec();
368
7
          }
369

            
370
14
          host_monitors_.erase(host);
371
14
        }
372
69
      });
373

            
374
100
  armIntervalTimer();
375
100
}
376

            
377
114
void DetectorImpl::addHostMonitor(HostSharedPtr host) {
378
114
  ASSERT(host_monitors_.count(host) == 0);
379
114
  DetectorHostMonitorImpl* monitor = new DetectorHostMonitorImpl(shared_from_this(), host);
380
114
  host_monitors_[host] = monitor;
381
114
  host->setOutlierDetector(DetectorHostMonitorPtr{monitor});
382
114
}
383

            
384
135814
void DetectorImpl::armIntervalTimer() {
385
135814
  interval_timer_->enableTimer(std::chrono::milliseconds(
386
135814
      runtime_.snapshot().getInteger(IntervalMsRuntime, config_.intervalMs())));
387
135814
}
388

            
389
void DetectorImpl::checkHostForUneject(HostSharedPtr host, DetectorHostMonitorImpl* monitor,
390
135789
                                       MonotonicTime now) {
391
135789
  if (!host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
392
134241
    return;
393
134241
  }
394

            
395
1548
  const std::chrono::milliseconds base_eject_time = std::chrono::milliseconds(
396
1548
      runtime_.snapshot().getInteger(BaseEjectionTimeMsRuntime, config_.baseEjectionTimeMs()));
397
1548
  const std::chrono::milliseconds max_eject_time = std::chrono::milliseconds(
398
1548
      runtime_.snapshot().getInteger(MaxEjectionTimeMsRuntime, config_.maxEjectionTimeMs()));
399
1548
  const std::chrono::milliseconds jitter = monitor->getJitter();
400
1548
  ASSERT(monitor->numEjections() > 0);
401
1548
  if ((min(base_eject_time * monitor->ejectTimeBackoff(), max_eject_time) + jitter) <=
402
1548
      (now - monitor->lastEjectionTime().value())) {
403
99
    unejectHost(host);
404
99
  }
405
1548
}
406

            
407
void DetectorImpl::checkHostForUndegrade(HostSharedPtr host, DetectorHostMonitorImpl* monitor,
408
135789
                                         MonotonicTime now) {
409
135789
  if (!config_.detectDegraded() ||
410
135789
      !host->healthFlagGet(Host::HealthFlag::DEGRADED_OUTLIER_DETECTION)) {
411
135779
    return;
412
135779
  }
413

            
414
10
  const std::chrono::milliseconds base_eject_time = std::chrono::milliseconds(
415
10
      runtime_.snapshot().getInteger(BaseEjectionTimeMsRuntime, config_.baseEjectionTimeMs()));
416
10
  const std::chrono::milliseconds max_eject_time = std::chrono::milliseconds(
417
10
      runtime_.snapshot().getInteger(MaxEjectionTimeMsRuntime, config_.maxEjectionTimeMs()));
418
10
  const std::chrono::milliseconds jitter = monitor->getJitter();
419
10
  ASSERT(monitor->numDegradations() > 0);
420
10
  if ((std::min(base_eject_time * monitor->degradeTimeBackoff(), max_eject_time) + jitter) <=
421
10
      (now - monitor->lastDegradedTime().value())) {
422
9
    host->healthFlagClear(Host::HealthFlag::DEGRADED_OUTLIER_DETECTION);
423
9
    monitor->undegrade(time_source_.monotonicTime());
424
9
    runCallbacks(host);
425

            
426
9
    if (event_logger_) {
427
9
      event_logger_->logUneject(host);
428
9
    }
429
9
  }
430
10
}
431

            
432
101
void DetectorImpl::unejectHost(HostSharedPtr host) {
433
101
  ejections_active_helper_.dec();
434
101
  host->healthFlagClear(Host::HealthFlag::FAILED_OUTLIER_CHECK);
435
  // Reset the consecutive failure counters to avoid re-ejection on very few new errors due
436
  // to the non-triggering counter being close to its trigger value.
437
101
  host_monitors_[host]->resetConsecutive5xx();
438
101
  host_monitors_[host]->resetConsecutiveGatewayFailure();
439
101
  host_monitors_[host]->resetConsecutiveLocalOriginFailure();
440
101
  host_monitors_[host]->uneject(time_source_.monotonicTime());
441
101
  runCallbacks(host);
442

            
443
101
  if (event_logger_) {
444
101
    event_logger_->logUneject(host);
445
101
  }
446
101
}
447

            
448
622
bool DetectorImpl::enforceEjection(envoy::data::cluster::v3::OutlierEjectionType type) {
449
622
  switch (type) {
450
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
451
345
  case envoy::data::cluster::v3::CONSECUTIVE_5XX:
452
345
    return runtime_.snapshot().featureEnabled(EnforcingConsecutive5xxRuntime,
453
345
                                              config_.enforcingConsecutive5xx());
454
175
  case envoy::data::cluster::v3::CONSECUTIVE_GATEWAY_FAILURE:
455
175
    return runtime_.snapshot().featureEnabled(EnforcingConsecutiveGatewayFailureRuntime,
456
175
                                              config_.enforcingConsecutiveGatewayFailure());
457
1
  case envoy::data::cluster::v3::SUCCESS_RATE:
458
1
    return runtime_.snapshot().featureEnabled(EnforcingSuccessRateRuntime,
459
1
                                              config_.enforcingSuccessRate());
460
94
  case envoy::data::cluster::v3::CONSECUTIVE_LOCAL_ORIGIN_FAILURE:
461
94
    return runtime_.snapshot().featureEnabled(EnforcingConsecutiveLocalOriginFailureRuntime,
462
94
                                              config_.enforcingConsecutiveLocalOriginFailure());
463
2
  case envoy::data::cluster::v3::SUCCESS_RATE_LOCAL_ORIGIN:
464
2
    return runtime_.snapshot().featureEnabled(EnforcingLocalOriginSuccessRateRuntime,
465
2
                                              config_.enforcingLocalOriginSuccessRate());
466
4
  case envoy::data::cluster::v3::FAILURE_PERCENTAGE:
467
4
    return runtime_.snapshot().featureEnabled(EnforcingFailurePercentageRuntime,
468
4
                                              config_.enforcingFailurePercentage());
469
1
  case envoy::data::cluster::v3::FAILURE_PERCENTAGE_LOCAL_ORIGIN:
470
1
    return runtime_.snapshot().featureEnabled(EnforcingFailurePercentageLocalOriginRuntime,
471
1
                                              config_.enforcingFailurePercentageLocalOrigin());
472
  case envoy::data::cluster::v3::DEGRADED:
473
    // Degradation uses its own code path, not the ejection helpers
474
    IS_ENVOY_BUG("enforceEjection() should not be called for DEGRADED type");
475
622
  }
476

            
477
  PANIC_DUE_TO_CORRUPT_ENUM;
478
}
479

            
480
143
void DetectorImpl::updateEnforcedEjectionStats(envoy::data::cluster::v3::OutlierEjectionType type) {
481
143
  stats_.ejections_enforced_total_.inc();
482
143
  switch (type) {
483
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
484
1
  case envoy::data::cluster::v3::SUCCESS_RATE:
485
1
    stats_.ejections_enforced_success_rate_.inc();
486
1
    break;
487
123
  case envoy::data::cluster::v3::CONSECUTIVE_5XX:
488
123
    stats_.ejections_enforced_consecutive_5xx_.inc();
489
123
    break;
490
9
  case envoy::data::cluster::v3::CONSECUTIVE_GATEWAY_FAILURE:
491
9
    stats_.ejections_enforced_consecutive_gateway_failure_.inc();
492
9
    break;
493
4
  case envoy::data::cluster::v3::CONSECUTIVE_LOCAL_ORIGIN_FAILURE:
494
4
    stats_.ejections_enforced_consecutive_local_origin_failure_.inc();
495
4
    break;
496
1
  case envoy::data::cluster::v3::SUCCESS_RATE_LOCAL_ORIGIN:
497
1
    stats_.ejections_enforced_local_origin_success_rate_.inc();
498
1
    break;
499
4
  case envoy::data::cluster::v3::FAILURE_PERCENTAGE:
500
4
    stats_.ejections_enforced_failure_percentage_.inc();
501
4
    break;
502
1
  case envoy::data::cluster::v3::FAILURE_PERCENTAGE_LOCAL_ORIGIN:
503
1
    stats_.ejections_enforced_local_origin_failure_percentage_.inc();
504
1
    break;
505
  case envoy::data::cluster::v3::DEGRADED:
506
    // Degradation uses its own code path, not the ejection helpers
507
    IS_ENVOY_BUG("updateEnforcedEjectionStats() should not be called for DEGRADED type");
508
    return;
509
143
  }
510
143
}
511

            
512
639
void DetectorImpl::updateDetectedEjectionStats(envoy::data::cluster::v3::OutlierEjectionType type) {
513
639
  switch (type) {
514
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
515
1
  case envoy::data::cluster::v3::SUCCESS_RATE:
516
1
    stats_.ejections_detected_success_rate_.inc();
517
1
    break;
518
352
  case envoy::data::cluster::v3::CONSECUTIVE_5XX:
519
352
    stats_.ejections_detected_consecutive_5xx_.inc();
520
352
    break;
521
175
  case envoy::data::cluster::v3::CONSECUTIVE_GATEWAY_FAILURE:
522
175
    stats_.ejections_detected_consecutive_gateway_failure_.inc();
523
175
    break;
524
94
  case envoy::data::cluster::v3::CONSECUTIVE_LOCAL_ORIGIN_FAILURE:
525
94
    stats_.ejections_detected_consecutive_local_origin_failure_.inc();
526
94
    break;
527
2
  case envoy::data::cluster::v3::SUCCESS_RATE_LOCAL_ORIGIN:
528
2
    stats_.ejections_detected_local_origin_success_rate_.inc();
529
2
    break;
530
4
  case envoy::data::cluster::v3::FAILURE_PERCENTAGE:
531
4
    stats_.ejections_detected_failure_percentage_.inc();
532
4
    break;
533
1
  case envoy::data::cluster::v3::FAILURE_PERCENTAGE_LOCAL_ORIGIN:
534
1
    stats_.ejections_detected_local_origin_failure_percentage_.inc();
535
1
    break;
536
10
  case envoy::data::cluster::v3::DEGRADED:
537
10
    stats_.ejections_detected_degradation_.inc();
538
10
    break;
539
639
  }
540
639
}
541

            
542
void DetectorImpl::ejectHost(HostSharedPtr host,
543
629
                             envoy::data::cluster::v3::OutlierEjectionType type) {
544
629
  uint64_t max_ejection_percent = std::min<uint64_t>(
545
629
      100, runtime_.snapshot().getInteger(MaxEjectionPercentRuntime, config_.maxEjectionPercent()));
546
629
  double ejected_percent = 100.0 * (ejections_active_helper_.value() + 1) / host_monitors_.size();
547
  // Note this is not currently checked per-priority level, so it is possible
548
  // for outlier detection to eject all hosts at any given priority level.
549
629
  bool should_eject = (ejected_percent <= max_ejection_percent);
550
629
  if (config_.alwaysEjectOneHost()) {
551
3
    should_eject = (ejections_active_helper_.value() == 0) || should_eject;
552
3
  }
553
629
  if (should_eject) {
554
622
    if (type == envoy::data::cluster::v3::CONSECUTIVE_5XX ||
555
622
        type == envoy::data::cluster::v3::SUCCESS_RATE) {
556
      // Deprecated counter, preserving old behaviour until it's removed.
557
346
      stats_.ejections_total_.inc();
558
346
    }
559
622
    if (enforceEjection(type)) {
560
143
      ejections_active_helper_.inc();
561
143
      updateEnforcedEjectionStats(type);
562
143
      host_monitors_[host]->eject(time_source_.monotonicTime());
563
143
      const std::chrono::milliseconds base_eject_time = std::chrono::milliseconds(
564
143
          runtime_.snapshot().getInteger(BaseEjectionTimeMsRuntime, config_.baseEjectionTimeMs()));
565
143
      const std::chrono::milliseconds max_eject_time = std::chrono::milliseconds(
566
143
          runtime_.snapshot().getInteger(MaxEjectionTimeMsRuntime, config_.maxEjectionTimeMs()));
567

            
568
      // Generate random jitter so that not all hosts uneject at the same time,
569
      // which could possibly generate a connection storm.
570

            
571
      // Retrieve max_eject_time_jitter configuration and then calculate the jitter.
572
143
      const uint64_t max_eject_time_jitter = runtime_.snapshot().getInteger(
573
143
          MaxEjectionTimeJitterMsRuntime, config_.maxEjectionTimeJitterMs());
574

            
575
143
      const std::chrono::milliseconds jitter =
576
143
          std::chrono::milliseconds(random_generator_() % (max_eject_time_jitter + 1));
577

            
578
      // Save the jitter on the current host_monitor.
579
143
      host_monitors_[host]->setJitter(jitter);
580

            
581
143
      if ((host_monitors_[host]->ejectTimeBackoff() * base_eject_time) <
582
143
          (max_eject_time + base_eject_time)) {
583
127
        host_monitors_[host]->ejectTimeBackoff()++;
584
127
      }
585

            
586
143
      runCallbacks(host);
587
143
      if (event_logger_) {
588
125
        event_logger_->logEject(host, *this, type, true);
589
125
      }
590
497
    } else {
591
479
      if (event_logger_) {
592
479
        event_logger_->logEject(host, *this, type, false);
593
479
      }
594
479
    }
595
622
  } else {
596
7
    stats_.ejections_overflow_.inc();
597
7
  }
598
629
}
599

            
600
101
DetectionStats DetectorImpl::generateStats(Stats::Scope& scope) {
601
101
  std::string prefix("outlier_detection.");
602
101
  return {ALL_OUTLIER_DETECTION_STATS(POOL_COUNTER_PREFIX(scope, prefix),
603
101
                                      POOL_GAUGE_PREFIX(scope, prefix))};
604
101
}
605

            
606
void DetectorImpl::notifyMainThreadConsecutiveError(
607
631
    HostSharedPtr host, envoy::data::cluster::v3::OutlierEjectionType type) {
608
  // This event will come from all threads, so we synchronize with a post to the main thread.
609
  // NOTE: Unfortunately consecutive errors are complicated from a threading perspective because
610
  //       we catch consecutive errors on worker threads and then post back to the main thread.
611
  //       Clusters can get removed, and this means there is a race condition with this
612
  //       reverse post. The way we handle this is as follows:
613
  //       1) The only strong pointer to the detector is owned by the cluster.
614
  //       2) We post a weak pointer to the main thread.
615
  //       3) If when running on the main thread the weak pointer can be converted to a strong
616
  //          pointer, the detector/cluster must still exist so we can safely fire callbacks.
617
  //          Otherwise we do nothing since the detector/cluster is already gone.
618
631
  std::weak_ptr<DetectorImpl> weak_this = shared_from_this();
619
632
  dispatcher_.post([weak_this, host, type]() -> void {
620
632
    std::shared_ptr<DetectorImpl> shared_this = weak_this.lock();
621
632
    if (shared_this) {
622
631
      shared_this->onConsecutiveErrorWorker(host, type);
623
631
    }
624
632
  });
625
631
}
626

            
627
359
void DetectorImpl::onConsecutive5xx(HostSharedPtr host) {
628
359
  notifyMainThreadConsecutiveError(host, envoy::data::cluster::v3::CONSECUTIVE_5XX);
629
359
}
630

            
631
176
void DetectorImpl::onConsecutiveGatewayFailure(HostSharedPtr host) {
632
176
  notifyMainThreadConsecutiveError(host, envoy::data::cluster::v3::CONSECUTIVE_GATEWAY_FAILURE);
633
176
}
634

            
635
96
void DetectorImpl::onConsecutiveLocalOriginFailure(HostSharedPtr host) {
636
96
  notifyMainThreadConsecutiveError(host,
637
96
                                   envoy::data::cluster::v3::CONSECUTIVE_LOCAL_ORIGIN_FAILURE);
638
96
}
639

            
640
10
void DetectorImpl::notifyMainThreadHostDegraded(HostSharedPtr host) {
641
  // This event will come from all threads, so we synchronize with a post to the main thread.
642
  // Similar to consecutive error handling, we use weak pointers to handle the case where
643
  // the cluster/detector is destroyed before the callback runs.
644
10
  std::weak_ptr<DetectorImpl> weak_this = shared_from_this();
645
10
  dispatcher_.post([weak_this, host]() -> void {
646
10
    std::shared_ptr<DetectorImpl> shared_this = weak_this.lock();
647
10
    if (shared_this) {
648
10
      shared_this->setHostDegradedMainThread(host);
649
10
    }
650
10
  });
651
10
}
652

            
653
11
void DetectorImpl::setHostDegraded(HostSharedPtr host) {
654
  // Only mark as degraded if the feature is enabled
655
11
  if (!config_.detectDegraded()) {
656
1
    return;
657
1
  }
658
10
  notifyMainThreadHostDegraded(host);
659
10
}
660

            
661
10
void DetectorImpl::setHostDegradedMainThread(HostSharedPtr host) {
662
10
  if (!host->healthFlagGet(Host::HealthFlag::DEGRADED_OUTLIER_DETECTION)) {
663
10
    updateDetectedEjectionStats(envoy::data::cluster::v3::DEGRADED);
664

            
665
    // Use the degrade() method which tracks timing
666
10
    host_monitors_[host]->degrade(time_source_.monotonicTime());
667

            
668
10
    const std::chrono::milliseconds base_eject_time = std::chrono::milliseconds(
669
10
        runtime_.snapshot().getInteger(BaseEjectionTimeMsRuntime, config_.baseEjectionTimeMs()));
670
10
    const std::chrono::milliseconds max_eject_time = std::chrono::milliseconds(
671
10
        runtime_.snapshot().getInteger(MaxEjectionTimeMsRuntime, config_.maxEjectionTimeMs()));
672

            
673
    // Generate random jitter to prevent connection storms when hosts undegrade
674
10
    const uint64_t max_eject_time_jitter = runtime_.snapshot().getInteger(
675
10
        MaxEjectionTimeJitterMsRuntime, config_.maxEjectionTimeJitterMs());
676
10
    const std::chrono::milliseconds jitter =
677
10
        std::chrono::milliseconds(random_generator_() % (max_eject_time_jitter + 1));
678
10
    host_monitors_[host]->setJitter(jitter);
679

            
680
10
    if ((host_monitors_[host]->degradeTimeBackoff() * base_eject_time) <
681
10
        (max_eject_time + base_eject_time)) {
682
10
      host_monitors_[host]->degradeTimeBackoff()++;
683
10
    }
684

            
685
    // Log degradation event
686
    // Use DEGRADED type to distinguish from actual ejections
687
    // The enforced=true since degradation is always enforced (host is deprioritized)
688
10
    if (event_logger_) {
689
10
      event_logger_->logEject(host, *this, envoy::data::cluster::v3::DEGRADED, true);
690
10
    }
691

            
692
10
    runCallbacks(host);
693
10
  }
694
10
}
695

            
696
void DetectorImpl::onConsecutiveErrorWorker(HostSharedPtr host,
697
631
                                            envoy::data::cluster::v3::OutlierEjectionType type) {
698
  // Ejections come in cross thread. There is a chance that the host has already been removed from
699
  // the set. If so, just ignore it.
700
631
  if (host_monitors_.count(host) == 0) {
701
1
    return;
702
1
  }
703
630
  if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
704
9
    return;
705
9
  }
706

            
707
  // We also reset the appropriate counter here to allow the monitor to detect a bout of consecutive
708
  // error responses even if the monitor is not charged with an interleaved non-error code.
709
621
  updateDetectedEjectionStats(type);
710
621
  ejectHost(host, type);
711

            
712
  // reset counters
713
621
  switch (type) {
714
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
715
  case envoy::data::cluster::v3::SUCCESS_RATE:
716
    FALLTHRU;
717
  case envoy::data::cluster::v3::SUCCESS_RATE_LOCAL_ORIGIN:
718
    FALLTHRU;
719
  case envoy::data::cluster::v3::FAILURE_PERCENTAGE:
720
    FALLTHRU;
721
  case envoy::data::cluster::v3::FAILURE_PERCENTAGE_LOCAL_ORIGIN:
722
    FALLTHRU;
723
  case envoy::data::cluster::v3::DEGRADED:
724
    IS_ENVOY_BUG("unexpected non-consecutive error");
725
    return;
726
352
  case envoy::data::cluster::v3::CONSECUTIVE_5XX:
727
352
    stats_.ejections_consecutive_5xx_.inc(); // Deprecated
728
352
    host_monitors_[host]->resetConsecutive5xx();
729
352
    break;
730
175
  case envoy::data::cluster::v3::CONSECUTIVE_GATEWAY_FAILURE:
731
175
    host_monitors_[host]->resetConsecutiveGatewayFailure();
732
175
    break;
733
94
  case envoy::data::cluster::v3::CONSECUTIVE_LOCAL_ORIGIN_FAILURE:
734
94
    host_monitors_[host]->resetConsecutiveLocalOriginFailure();
735
94
    break;
736
621
  }
737
621
}
738

            
739
DetectorImpl::EjectionPair DetectorImpl::successRateEjectionThreshold(
740
    double success_rate_sum, const std::vector<HostSuccessRatePair>& valid_success_rate_hosts,
741
4
    double success_rate_stdev_factor) {
742
  // This function is using mean and standard deviation as statistical measures for outlier
743
  // detection. First the mean is calculated by dividing the sum of success rate data over the
744
  // number of data points. Then variance is calculated by taking the mean of the
745
  // squared difference of data points to the mean of the data. Then standard deviation is
746
  // calculated by taking the square root of the variance. Then the outlier threshold is
747
  // calculated as the difference between the mean and the product of the standard
748
  // deviation and a constant factor.
749
  //
750
  // For example with a data set that looks like success_rate_data = {50, 100, 100, 100, 100} the
751
  // math would work as follows:
752
  // success_rate_sum = 450
753
  // mean = 90
754
  // variance = 400
755
  // stdev = 20
756
  // threshold returned = 52
757
4
  double mean = success_rate_sum / valid_success_rate_hosts.size();
758
4
  double variance = 0;
759
4
  std::for_each(valid_success_rate_hosts.begin(), valid_success_rate_hosts.end(),
760
20
                [&variance, mean](HostSuccessRatePair v) {
761
20
                  variance += std::pow(v.success_rate_ - mean, 2);
762
20
                });
763
4
  variance /= valid_success_rate_hosts.size();
764
4
  double stdev = std::sqrt(variance);
765

            
766
4
  return {mean, (mean - (success_rate_stdev_factor * stdev))};
767
4
}
768

            
769
void DetectorImpl::processSuccessRateEjections(
770
271428
    DetectorHostMonitor::SuccessRateMonitorType monitor_type) {
771
271428
  uint64_t success_rate_minimum_hosts = runtime_.snapshot().getInteger(
772
271428
      SuccessRateMinimumHostsRuntime, config_.successRateMinimumHosts());
773
271428
  uint64_t success_rate_request_volume = runtime_.snapshot().getInteger(
774
271428
      SuccessRateRequestVolumeRuntime, config_.successRateRequestVolume());
775
271428
  uint64_t failure_percentage_minimum_hosts = runtime_.snapshot().getInteger(
776
271428
      FailurePercentageMinimumHostsRuntime, config_.failurePercentageMinimumHosts());
777
271428
  uint64_t failure_percentage_request_volume = runtime_.snapshot().getInteger(
778
271428
      FailurePercentageRequestVolumeRuntime, config_.failurePercentageRequestVolume());
779

            
780
271428
  std::vector<HostSuccessRatePair> valid_success_rate_hosts;
781
271428
  std::vector<HostSuccessRatePair> valid_failure_percentage_hosts;
782
271428
  double success_rate_sum = 0;
783

            
784
  // Reset the Detector's success rate mean and stdev.
785
271428
  getSRNums(monitor_type) = {-1, -1};
786

            
787
  // Exit early if there are not enough hosts.
788
271428
  if (host_monitors_.size() < success_rate_minimum_hosts &&
789
271428
      host_monitors_.size() < failure_percentage_minimum_hosts) {
790
271376
    return;
791
271376
  }
792

            
793
  // reserve upper bound of vector size to avoid reallocation.
794
52
  valid_success_rate_hosts.reserve(host_monitors_.size());
795
52
  valid_failure_percentage_hosts.reserve(host_monitors_.size());
796

            
797
178
  for (const auto& host : host_monitors_) {
798
    // Don't do work if the host is already ejected.
799
178
    if (!host.first->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
800
163
      absl::optional<std::pair<double, uint64_t>> host_success_rate_and_volume =
801
163
          host.second->getSRMonitor(monitor_type)
802
163
              .successRateAccumulator()
803
163
              .getSuccessRateAndVolume();
804

            
805
163
      if (!host_success_rate_and_volume) {
806
118
        continue;
807
118
      }
808
45
      double success_rate = host_success_rate_and_volume.value().first;
809
45
      double request_volume = host_success_rate_and_volume.value().second;
810

            
811
45
      if (request_volume >=
812
45
          std::min(success_rate_request_volume, failure_percentage_request_volume)) {
813
29
        host.second->successRate(monitor_type, success_rate);
814
29
      }
815

            
816
45
      if (request_volume >= success_rate_request_volume) {
817
22
        valid_success_rate_hosts.emplace_back(HostSuccessRatePair(host.first, success_rate));
818
22
        success_rate_sum += success_rate;
819
22
      }
820
45
      if (request_volume >= failure_percentage_request_volume) {
821
29
        valid_failure_percentage_hosts.emplace_back(HostSuccessRatePair(host.first, success_rate));
822
29
      }
823
45
    }
824
178
  }
825

            
826
52
  if (!valid_success_rate_hosts.empty() &&
827
52
      valid_success_rate_hosts.size() >= success_rate_minimum_hosts) {
828
3
    const double success_rate_stdev_factor =
829
3
        runtime_.snapshot().getInteger(SuccessRateStdevFactorRuntime,
830
3
                                       config_.successRateStdevFactor()) /
831
3
        1000.0;
832
3
    getSRNums(monitor_type) = successRateEjectionThreshold(
833
3
        success_rate_sum, valid_success_rate_hosts, success_rate_stdev_factor);
834
3
    const double success_rate_ejection_threshold = getSRNums(monitor_type).ejection_threshold_;
835
15
    for (const auto& host_success_rate_pair : valid_success_rate_hosts) {
836
15
      if (host_success_rate_pair.success_rate_ < success_rate_ejection_threshold) {
837
3
        stats_.ejections_success_rate_.inc(); // Deprecated.
838
3
        const envoy::data::cluster::v3::OutlierEjectionType type =
839
3
            host_monitors_[host_success_rate_pair.host_]
840
3
                ->getSRMonitor(monitor_type)
841
3
                .getEjectionType();
842
3
        updateDetectedEjectionStats(type);
843
3
        ejectHost(host_success_rate_pair.host_, type);
844
3
      }
845
15
    }
846
3
  }
847

            
848
52
  if (!valid_failure_percentage_hosts.empty() &&
849
52
      valid_failure_percentage_hosts.size() >= failure_percentage_minimum_hosts) {
850
9
    const double failure_percentage_threshold = runtime_.snapshot().getInteger(
851
9
        FailurePercentageThresholdRuntime, config_.failurePercentageThreshold());
852

            
853
25
    for (const auto& host_success_rate_pair : valid_failure_percentage_hosts) {
854
25
      if ((100.0 - host_success_rate_pair.success_rate_) >= failure_percentage_threshold) {
855
        // We should eject.
856

            
857
        // The ejection type returned by the SuccessRateMonitor's getEjectionType() will be a
858
        // SUCCESS_RATE type, so we need to figure it out for ourselves.
859
5
        const envoy::data::cluster::v3::OutlierEjectionType type =
860
5
            (monitor_type == DetectorHostMonitor::SuccessRateMonitorType::ExternalOrigin)
861
5
                ? envoy::data::cluster::v3::FAILURE_PERCENTAGE
862
5
                : envoy::data::cluster::v3::FAILURE_PERCENTAGE_LOCAL_ORIGIN;
863
5
        updateDetectedEjectionStats(type);
864
5
        ejectHost(host_success_rate_pair.host_, type);
865
5
      }
866
25
    }
867
9
  }
868
52
}
869

            
870
135714
void DetectorImpl::onIntervalTimer() {
871
135714
  MonotonicTime now = time_source_.monotonicTime();
872

            
873
135789
  for (auto host : host_monitors_) {
874
135789
    checkHostForUneject(host.first, host.second, now);
875
135789
    checkHostForUndegrade(host.first, host.second, now);
876

            
877
    // Need to update the writer bucket to keep the data valid.
878
135789
    host.second->updateCurrentSuccessRateBucket();
879
    // Refresh host success rate stat for the /clusters endpoint. If there is a new valid value, it
880
    // will get updated in processSuccessRateEjections().
881
135789
    host.second->successRate(DetectorHostMonitor::SuccessRateMonitorType::LocalOrigin, -1);
882
135789
    host.second->successRate(DetectorHostMonitor::SuccessRateMonitorType::ExternalOrigin, -1);
883
135789
  }
884

            
885
135714
  processSuccessRateEjections(DetectorHostMonitor::SuccessRateMonitorType::ExternalOrigin);
886
135714
  processSuccessRateEjections(DetectorHostMonitor::SuccessRateMonitorType::LocalOrigin);
887

            
888
  // Decrement time backoff for all hosts which have not been ejected.
889
135789
  for (auto host : host_monitors_) {
890
135789
    if (!host.first->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
891
134333
      auto& monitor = host.second;
892
      // Node is healthy and was not ejected since the last check.
893
134333
      if (monitor->lastUnejectionTime().has_value() &&
894
134333
          ((now - monitor->lastUnejectionTime().value()) >=
895
157
           std::chrono::milliseconds(
896
157
               runtime_.snapshot().getInteger(IntervalMsRuntime, config_.intervalMs())))) {
897
58
        if (monitor->ejectTimeBackoff() != 0) {
898
10
          monitor->ejectTimeBackoff()--;
899
10
        }
900
58
      }
901
134333
    }
902
135789
  }
903

            
904
  // Decrement degrade backoff for all hosts which have not been degraded.
905
  // Uses the same algorithm as ejection backoff.
906
135789
  for (auto host : host_monitors_) {
907
135789
    if (!host.first->healthFlagGet(Host::HealthFlag::DEGRADED_OUTLIER_DETECTION)) {
908
135788
      auto& monitor = host.second;
909
      // Node is healthy and was not degraded since the last check.
910
135788
      if (monitor->lastUndegradedTime().has_value() &&
911
135788
          ((now - monitor->lastUndegradedTime().value()) >=
912
10
           std::chrono::milliseconds(
913
10
               runtime_.snapshot().getInteger(IntervalMsRuntime, config_.intervalMs())))) {
914
1
        if (monitor->degradeTimeBackoff() != 0) {
915
1
          monitor->degradeTimeBackoff()--;
916
1
        }
917
1
      }
918
135788
    }
919
135789
  }
920

            
921
135714
  armIntervalTimer();
922
135714
}
923

            
924
263
void DetectorImpl::runCallbacks(HostSharedPtr host) {
925
281
  for (const ChangeStateCb& cb : callbacks_) {
926
280
    cb(host);
927
280
  }
928
263
}
929

            
930
void EventLoggerImpl::logEject(const HostDescriptionConstSharedPtr& host, Detector& detector,
931
3
                               envoy::data::cluster::v3::OutlierEjectionType type, bool enforced) {
932
3
  envoy::data::cluster::v3::OutlierDetectionEvent event;
933
3
  event.set_type(type);
934

            
935
3
  absl::optional<MonotonicTime> time = host->outlierDetector().lastUnejectionTime();
936
3
  setCommonEventParams(event, host, time);
937

            
938
3
  event.set_action(envoy::data::cluster::v3::EJECT);
939

            
940
3
  event.set_enforced(enforced);
941

            
942
3
  if ((type == envoy::data::cluster::v3::SUCCESS_RATE) ||
943
3
      (type == envoy::data::cluster::v3::SUCCESS_RATE_LOCAL_ORIGIN)) {
944
1
    const DetectorHostMonitor::SuccessRateMonitorType monitor_type =
945
1
        (type == envoy::data::cluster::v3::SUCCESS_RATE)
946
1
            ? DetectorHostMonitor::SuccessRateMonitorType::ExternalOrigin
947
1
            : DetectorHostMonitor::SuccessRateMonitorType::LocalOrigin;
948
1
    event.mutable_eject_success_rate_event()->set_cluster_average_success_rate(
949
1
        detector.successRateAverage(monitor_type));
950
1
    event.mutable_eject_success_rate_event()->set_cluster_success_rate_ejection_threshold(
951
1
        detector.successRateEjectionThreshold(monitor_type));
952
1
    event.mutable_eject_success_rate_event()->set_host_success_rate(
953
1
        host->outlierDetector().successRate(monitor_type));
954
2
  } else if ((type == envoy::data::cluster::v3::FAILURE_PERCENTAGE) ||
955
2
             (type == envoy::data::cluster::v3::FAILURE_PERCENTAGE_LOCAL_ORIGIN)) {
956
1
    const DetectorHostMonitor::SuccessRateMonitorType monitor_type =
957
1
        (type == envoy::data::cluster::v3::FAILURE_PERCENTAGE)
958
1
            ? DetectorHostMonitor::SuccessRateMonitorType::ExternalOrigin
959
1
            : DetectorHostMonitor::SuccessRateMonitorType::LocalOrigin;
960
1
    event.mutable_eject_failure_percentage_event()->set_host_success_rate(
961
1
        host->outlierDetector().successRate(monitor_type));
962
1
  } else {
963
1
    event.mutable_eject_consecutive_event();
964
1
  }
965

            
966
3
  std::string json;
967
3
#ifdef ENVOY_ENABLE_YAML
968
3
  json = MessageUtil::getJsonStringFromMessageOrError(event, /* pretty_print */ false,
969
3
                                                      /* always_print_primitive_fields */ true);
970
#else
971
  IS_ENVOY_BUG("attempting outlier logging with JSON support removed");
972
#endif
973
3
  file_->write(fmt::format("{}\n", json));
974
3
}
975

            
976
3
void EventLoggerImpl::logUneject(const HostDescriptionConstSharedPtr& host) {
977
3
  envoy::data::cluster::v3::OutlierDetectionEvent event;
978

            
979
3
  absl::optional<MonotonicTime> time = host->outlierDetector().lastEjectionTime();
980
3
  setCommonEventParams(event, host, time);
981

            
982
3
  event.set_action(envoy::data::cluster::v3::UNEJECT);
983

            
984
3
  std::string json;
985
3
#ifdef ENVOY_ENABLE_YAML
986
3
  json = MessageUtil::getJsonStringFromMessageOrError(event, /* pretty_print */ false,
987
3
                                                      /* always_print_primitive_fields */ true);
988
#else
989
  IS_ENVOY_BUG("attempting outlier logging with JSON support removed");
990
#endif
991
3
  file_->write(fmt::format("{}\n", json));
992
3
}
993

            
994
void EventLoggerImpl::setCommonEventParams(envoy::data::cluster::v3::OutlierDetectionEvent& event,
995
                                           const HostDescriptionConstSharedPtr& host,
996
6
                                           absl::optional<MonotonicTime> time) {
997
6
  MonotonicTime monotonic_now = time_source_.monotonicTime();
998
6
  if (time) {
999
4
    std::chrono::seconds secsFromLastAction =
4
        std::chrono::duration_cast<std::chrono::seconds>(monotonic_now - time.value());
4
    event.mutable_secs_since_last_action()->set_value(secsFromLastAction.count());
4
  }
6
  event.set_cluster_name(host->cluster().name());
6
  event.set_upstream_url(host->address()->asString());
6
  event.set_num_ejections(host->outlierDetector().numEjections());
6
  TimestampUtil::systemClockToTimestamp(time_source_.systemTime(), *event.mutable_timestamp());
6
}
271806
SuccessRateAccumulatorBucket* SuccessRateAccumulator::updateCurrentWriter() {
  // Right now current is being written to and backup is not. Flush the backup and swap.
271806
  backup_success_rate_bucket_->success_request_counter_ = 0;
271806
  backup_success_rate_bucket_->total_request_counter_ = 0;
271806
  current_success_rate_bucket_.swap(backup_success_rate_bucket_);
271806
  return current_success_rate_bucket_.get();
271806
}
163
absl::optional<std::pair<double, uint64_t>> SuccessRateAccumulator::getSuccessRateAndVolume() {
163
  if (!backup_success_rate_bucket_->total_request_counter_) {
118
    return absl::nullopt;
118
  }
45
  double success_rate = backup_success_rate_bucket_->success_request_counter_ * 100.0 /
45
                        backup_success_rate_bucket_->total_request_counter_;
45
  return {{success_rate, backup_success_rate_bucket_->total_request_counter_}};
163
}
} // namespace Outlier
} // namespace Upstream
} // namespace Envoy