Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/server/overload_manager_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/server/overload_manager_impl.h"
2
3
#include <chrono>
4
5
#include "envoy/common/exception.h"
6
#include "envoy/config/overload/v3/overload.pb.h"
7
#include "envoy/config/overload/v3/overload.pb.validate.h"
8
#include "envoy/stats/scope.h"
9
10
#include "source/common/common/fmt.h"
11
#include "source/common/config/utility.h"
12
#include "source/common/event/scaled_range_timer_manager_impl.h"
13
#include "source/common/protobuf/utility.h"
14
#include "source/common/stats/symbol_table.h"
15
#include "source/server/resource_monitor_config_impl.h"
16
17
#include "absl/container/node_hash_map.h"
18
#include "absl/strings/str_cat.h"
19
20
namespace Envoy {
21
namespace Server {
22
namespace {
23
24
using TriggerPtr = std::unique_ptr<Trigger>;
25
26
class ThresholdTriggerImpl final : public Trigger {
27
public:
28
  ThresholdTriggerImpl(const envoy::config::overload::v3::ThresholdTrigger& config)
29
0
      : threshold_(config.value()), state_(OverloadActionState::inactive()) {}
30
31
0
  bool updateValue(double value) override {
32
0
    const OverloadActionState state = actionState();
33
0
    state_ =
34
0
        value >= threshold_ ? OverloadActionState::saturated() : OverloadActionState::inactive();
35
    // This is a floating point comparison, though state_ is always either
36
    // saturated or inactive so there's no risk due to floating point precision.
37
0
    return state.value() != actionState().value();
38
0
  }
39
40
0
  OverloadActionState actionState() const override { return state_; }
41
42
private:
43
  const double threshold_;
44
  OverloadActionState state_;
45
};
46
47
class ScaledTriggerImpl final : public Trigger {
48
public:
49
  ScaledTriggerImpl(const envoy::config::overload::v3::ScaledTrigger& config)
50
      : scaling_threshold_(config.scaling_threshold()),
51
        saturated_threshold_(config.saturation_threshold()),
52
0
        state_(OverloadActionState::inactive()) {
53
0
    if (scaling_threshold_ >= saturated_threshold_) {
54
0
      throwEnvoyExceptionOrPanic("scaling_threshold must be less than saturation_threshold");
55
0
    }
56
0
  }
57
58
0
  bool updateValue(double value) override {
59
0
    const OverloadActionState old_state = actionState();
60
0
    if (value <= scaling_threshold_) {
61
0
      state_ = OverloadActionState::inactive();
62
0
    } else if (value >= saturated_threshold_) {
63
0
      state_ = OverloadActionState::saturated();
64
0
    } else {
65
0
      state_ = OverloadActionState(
66
0
          UnitFloat((value - scaling_threshold_) / (saturated_threshold_ - scaling_threshold_)));
67
0
    }
68
    // All values of state_ are produced via this same code path. Even if
69
    // old_state and state_ should be approximately equal, there's no harm in
70
    // signaling for a small change if they're not float::operator== equal.
71
0
    return state_.value() != old_state.value();
72
0
  }
73
74
0
  OverloadActionState actionState() const override { return state_; }
75
76
private:
77
  const double scaling_threshold_;
78
  const double saturated_threshold_;
79
  OverloadActionState state_;
80
};
81
82
0
TriggerPtr createTriggerFromConfig(const envoy::config::overload::v3::Trigger& trigger_config) {
83
0
  TriggerPtr trigger;
84
85
0
  switch (trigger_config.trigger_oneof_case()) {
86
0
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::kThreshold:
87
0
    trigger = std::make_unique<ThresholdTriggerImpl>(trigger_config.threshold());
88
0
    break;
89
0
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::kScaled:
90
0
    trigger = std::make_unique<ScaledTriggerImpl>(trigger_config.scaled());
91
0
    break;
92
0
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::TRIGGER_ONEOF_NOT_SET:
93
0
    throwEnvoyExceptionOrPanic(absl::StrCat("action not set for trigger ", trigger_config.name()));
94
0
  }
95
96
0
  return trigger;
97
0
}
98
99
0
Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view name_of_stat) {
100
0
  Stats::StatNameManagedStorage stat_name(name_of_stat, scope.symbolTable());
101
0
  return scope.counterFromStatName(stat_name.statName());
102
0
}
103
104
0
Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view a, absl::string_view b) {
105
0
  return makeCounter(scope, absl::StrCat("overload.", a, ".", b));
106
0
}
107
108
Stats::Gauge& makeGauge(Stats::Scope& scope, absl::string_view a, absl::string_view b,
109
0
                        Stats::Gauge::ImportMode import_mode) {
110
0
  Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", a, ".", b),
111
0
                                          scope.symbolTable());
112
0
  return scope.gaugeFromStatName(stat_name.statName(), import_mode);
113
0
}
114
115
Stats::Histogram& makeHistogram(Stats::Scope& scope, absl::string_view name,
116
10.5k
                                Stats::Histogram::Unit unit) {
117
10.5k
  Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", name), scope.symbolTable());
118
10.5k
  return scope.histogramFromStatName(stat_name.statName(), unit);
119
10.5k
}
120
121
Event::ScaledTimerType parseTimerType(
122
0
    envoy::config::overload::v3::ScaleTimersOverloadActionConfig::TimerType config_timer_type) {
123
0
  using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
124
125
0
  switch (config_timer_type) {
126
0
  case Config::HTTP_DOWNSTREAM_CONNECTION_IDLE:
127
0
    return Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout;
128
0
  case Config::HTTP_DOWNSTREAM_STREAM_IDLE:
129
0
    return Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout;
130
0
  case Config::TRANSPORT_SOCKET_CONNECT:
131
0
    return Event::ScaledTimerType::TransportSocketConnectTimeout;
132
0
  default:
133
0
    throwEnvoyExceptionOrPanic(fmt::format("Unknown timer type {}", config_timer_type));
134
0
  }
135
0
}
136
137
Event::ScaledTimerTypeMap
138
parseTimerMinimums(const ProtobufWkt::Any& typed_config,
139
0
                   ProtobufMessage::ValidationVisitor& validation_visitor) {
140
0
  using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
141
0
  const Config action_config =
142
0
      MessageUtil::anyConvertAndValidate<Config>(typed_config, validation_visitor);
143
144
0
  Event::ScaledTimerTypeMap timer_map;
145
146
0
  for (const auto& scale_timer : action_config.timer_scale_factors()) {
147
0
    const Event::ScaledTimerType timer_type = parseTimerType(scale_timer.timer());
148
149
0
    const Event::ScaledTimerMinimum minimum =
150
0
        scale_timer.has_min_timeout()
151
0
            ? Event::ScaledTimerMinimum(Event::AbsoluteMinimum(std::chrono::milliseconds(
152
0
                  DurationUtil::durationToMilliseconds(scale_timer.min_timeout()))))
153
0
            : Event::ScaledTimerMinimum(
154
0
                  Event::ScaledMinimum(UnitFloat(scale_timer.min_scale().value() / 100.0)));
155
156
0
    auto [_, inserted] = timer_map.insert(std::make_pair(timer_type, minimum));
157
0
    UNREFERENCED_PARAMETER(_);
158
0
    if (!inserted) {
159
0
      throwEnvoyExceptionOrPanic(fmt::format("Found duplicate entry for timer type {}",
160
0
                                             Config::TimerType_Name(scale_timer.timer())));
161
0
    }
162
0
  }
163
164
0
  return timer_map;
165
0
}
166
167
} // namespace
168
169
/**
170
 * Thread-local copy of the state of each configured overload action.
171
 */
172
class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState {
173
public:
174
  explicit ThreadLocalOverloadStateImpl(
175
      const NamedOverloadActionSymbolTable& action_symbol_table,
176
      std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>&
177
          proactive_resources)
178
      : action_symbol_table_(action_symbol_table),
179
        actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())),
180
5.29k
        proactive_resources_(proactive_resources) {}
181
182
7.25k
  const OverloadActionState& getState(const std::string& action) override {
183
7.25k
    if (const auto symbol = action_symbol_table_.lookup(action); symbol != absl::nullopt) {
184
0
      return actions_[symbol->index()];
185
0
    }
186
7.25k
    return always_inactive_;
187
7.25k
  }
188
189
0
  void setState(NamedOverloadActionSymbolTable::Symbol action, OverloadActionState state) {
190
0
    actions_[action.index()] = state;
191
0
  }
192
193
  bool tryAllocateResource(OverloadProactiveResourceName resource_name,
194
0
                           int64_t increment) override {
195
0
    const auto proactive_resource = proactive_resources_->find(resource_name);
196
0
    if (proactive_resource == proactive_resources_->end()) {
197
0
      ENVOY_LOG_MISC(warn, "Failed to allocate resource usage, resource monitor is not configured");
198
0
      return false;
199
0
    }
200
201
0
    return proactive_resource->second.tryAllocateResource(increment);
202
0
  }
203
204
  bool tryDeallocateResource(OverloadProactiveResourceName resource_name,
205
0
                             int64_t decrement) override {
206
0
    const auto proactive_resource = proactive_resources_->find(resource_name);
207
0
    if (proactive_resource == proactive_resources_->end()) {
208
0
      ENVOY_LOG_MISC(warn,
209
0
                     "Failed to deallocate resource usage, resource monitor is not configured");
210
0
      return false;
211
0
    }
212
213
0
    return proactive_resource->second.tryDeallocateResource(decrement);
214
0
  }
215
216
5.31k
  bool isResourceMonitorEnabled(OverloadProactiveResourceName resource_name) override {
217
5.31k
    const auto proactive_resource = proactive_resources_->find(resource_name);
218
5.31k
    return proactive_resource != proactive_resources_->end();
219
5.31k
  }
220
221
  ProactiveResourceMonitorOptRef
222
0
  getProactiveResourceMonitorForTest(OverloadProactiveResourceName resource_name) override {
223
0
    const auto proactive_resource = proactive_resources_->find(resource_name);
224
0
    if (proactive_resource == proactive_resources_->end()) {
225
0
      ENVOY_LOG_MISC(warn, "Failed to get resource usage, resource monitor is not configured");
226
0
      return makeOptRefFromPtr<ProactiveResourceMonitor>(nullptr);
227
0
    }
228
0
    return proactive_resource->second.getProactiveResourceMonitorForTest();
229
0
  }
230
231
private:
232
  static const OverloadActionState always_inactive_;
233
  const NamedOverloadActionSymbolTable& action_symbol_table_;
234
  std::vector<OverloadActionState> actions_;
235
  std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>
236
      proactive_resources_;
237
};
238
239
const OverloadActionState ThreadLocalOverloadStateImpl::always_inactive_{UnitFloat::min()};
240
241
NamedOverloadActionSymbolTable::Symbol
242
26.6k
NamedOverloadActionSymbolTable::get(absl::string_view string) {
243
26.6k
  if (auto it = table_.find(string); it != table_.end()) {
244
0
    return Symbol(it->second);
245
0
  }
246
247
26.6k
  size_t index = table_.size();
248
249
26.6k
  names_.emplace_back(string);
250
26.6k
  table_.emplace(std::make_pair(string, index));
251
252
26.6k
  return Symbol(index);
253
26.6k
}
254
255
absl::optional<NamedOverloadActionSymbolTable::Symbol>
256
7.25k
NamedOverloadActionSymbolTable::lookup(absl::string_view string) const {
257
7.25k
  if (auto it = table_.find(string); it != table_.end()) {
258
0
    return Symbol(it->second);
259
0
  }
260
7.25k
  return absl::nullopt;
261
7.25k
}
262
263
0
const absl::string_view NamedOverloadActionSymbolTable::name(Symbol symbol) const {
264
0
  return names_.at(symbol.index());
265
0
}
266
267
OverloadAction::OverloadAction(const envoy::config::overload::v3::OverloadAction& config,
268
                               Stats::Scope& stats_scope)
269
    : state_(OverloadActionState::inactive()),
270
      active_gauge_(
271
          makeGauge(stats_scope, config.name(), "active", Stats::Gauge::ImportMode::NeverImport)),
272
      scale_percent_gauge_(makeGauge(stats_scope, config.name(), "scale_percent",
273
0
                                     Stats::Gauge::ImportMode::NeverImport)) {
274
0
  for (const auto& trigger_config : config.triggers()) {
275
0
    if (!triggers_.try_emplace(trigger_config.name(), createTriggerFromConfig(trigger_config))
276
0
             .second) {
277
0
      throwEnvoyExceptionOrPanic(
278
0
          absl::StrCat("Duplicate trigger resource for overload action ", config.name()));
279
0
    }
280
0
  }
281
282
0
  active_gauge_.set(0);
283
0
  scale_percent_gauge_.set(0);
284
0
}
285
286
0
bool OverloadAction::updateResourcePressure(const std::string& name, double pressure) {
287
0
  const OverloadActionState old_state = getState();
288
289
0
  auto it = triggers_.find(name);
290
0
  ASSERT(it != triggers_.end());
291
0
  if (!it->second->updateValue(pressure)) {
292
0
    return false;
293
0
  }
294
0
  const auto trigger_new_state = it->second->actionState();
295
0
  active_gauge_.set(trigger_new_state.isSaturated() ? 1 : 0);
296
0
  scale_percent_gauge_.set(trigger_new_state.value().value() * 100);
297
298
0
  {
299
    // Compute the new state as the maximum over all trigger states.
300
0
    OverloadActionState new_state = OverloadActionState::inactive();
301
0
    for (auto& trigger : triggers_) {
302
0
      const auto trigger_state = trigger.second->actionState();
303
0
      if (trigger_state.value() > new_state.value()) {
304
0
        new_state = trigger_state;
305
0
      }
306
0
    }
307
0
    state_ = new_state;
308
0
  }
309
310
0
  return state_.value() != old_state.value();
311
0
}
312
313
0
OverloadActionState OverloadAction::getState() const { return state_; }
314
315
LoadShedPointImpl::LoadShedPointImpl(const envoy::config::overload::v3::LoadShedPoint& config,
316
                                     Stats::Scope& stats_scope,
317
                                     Random::RandomGenerator& random_generator)
318
    : scale_percent_(makeGauge(stats_scope, config.name(), "scale_percent",
319
                               Stats::Gauge::ImportMode::NeverImport)),
320
0
      random_generator_(random_generator) {
321
0
  for (const auto& trigger_config : config.triggers()) {
322
0
    if (!triggers_.try_emplace(trigger_config.name(), createTriggerFromConfig(trigger_config))
323
0
             .second) {
324
0
      throwEnvoyExceptionOrPanic(
325
0
          absl::StrCat("Duplicate trigger resource for LoadShedPoint ", config.name()));
326
0
    }
327
0
  }
328
0
};
329
330
void LoadShedPointImpl::updateResource(absl::string_view resource_name,
331
0
                                       double resource_utilization) {
332
0
  auto it = triggers_.find(resource_name);
333
0
  if (it == triggers_.end()) {
334
0
    return;
335
0
  }
336
337
0
  it->second->updateValue(resource_utilization);
338
0
  updateProbabilityShedLoad();
339
0
}
340
341
0
void LoadShedPointImpl::updateProbabilityShedLoad() {
342
0
  float max_unit_float = 0.0f;
343
0
  for (const auto& trigger : triggers_) {
344
0
    max_unit_float = std::max(trigger.second->actionState().value().value(), max_unit_float);
345
0
  }
346
347
0
  probability_shed_load_.store(max_unit_float);
348
349
  // Update stats.
350
0
  scale_percent_.set(100 * max_unit_float);
351
0
}
352
353
0
bool LoadShedPointImpl::shouldShedLoad() {
354
0
  float unit_float_probability_shed_load = probability_shed_load_.load();
355
  // This should be ok as we're using unit float which saturates at 1.0f.
356
0
  if (unit_float_probability_shed_load == 1.0f) {
357
0
    return true;
358
0
  }
359
360
0
  return random_generator_.bernoulli(UnitFloat(unit_float_probability_shed_load));
361
0
}
362
363
OverloadManagerImpl::OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
364
                                         ThreadLocal::SlotAllocator& slot_allocator,
365
                                         const envoy::config::overload::v3::OverloadManager& config,
366
                                         ProtobufMessage::ValidationVisitor& validation_visitor,
367
                                         Api::Api& api, const Server::Options& options)
368
    : dispatcher_(dispatcher), time_source_(api.timeSource()), tls_(slot_allocator),
369
      refresh_interval_(
370
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))),
371
      refresh_interval_delays_(makeHistogram(stats_scope, "refresh_interval_delay",
372
                                             Stats::Histogram::Unit::Milliseconds)),
373
      proactive_resources_(
374
          std::make_unique<
375
10.5k
              absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>()) {
376
10.5k
  Configuration::ResourceMonitorFactoryContextImpl context(dispatcher, options, api,
377
10.5k
                                                           validation_visitor);
378
  // We should hide impl details from users, for them there should be no distinction between
379
  // proactive and regular resource monitors in configuration API. But internally we will maintain
380
  // two distinct collections of proactive and regular resources. Proactive resources are not
381
  // subject to periodic flushes and can be recalculated/updated on demand by invoking
382
  // `tryAllocateResource/tryDeallocateResource` via thread local overload state.
383
10.5k
  for (const auto& resource : config.resource_monitors()) {
384
90
    const auto& name = resource.name();
385
    // Check if it is a proactive resource.
386
90
    auto proactive_resource_it =
387
90
        OverloadProactiveResources::get().proactive_action_name_to_resource_.find(name);
388
90
    ENVOY_LOG(debug, "Evaluating resource {}", name);
389
90
    bool result = false;
390
90
    if (proactive_resource_it !=
391
90
        OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
392
0
      ENVOY_LOG(debug, "Adding proactive resource monitor for {}", name);
393
0
      auto& factory =
394
0
          Config::Utility::getAndCheckFactory<Configuration::ProactiveResourceMonitorFactory>(
395
0
              resource);
396
0
      auto config =
397
0
          Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
398
0
      auto monitor = factory.createProactiveResourceMonitor(*config, context);
399
0
      result =
400
0
          proactive_resources_
401
0
              ->try_emplace(proactive_resource_it->second, name, std::move(monitor), stats_scope)
402
0
              .second;
403
90
    } else {
404
90
      ENVOY_LOG(debug, "Adding resource monitor for {}", name);
405
90
      auto& factory =
406
90
          Config::Utility::getAndCheckFactory<Configuration::ResourceMonitorFactory>(resource);
407
90
      auto config =
408
90
          Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
409
90
      auto monitor = factory.createResourceMonitor(*config, context);
410
90
      result = resources_.try_emplace(name, name, std::move(monitor), *this, stats_scope).second;
411
90
    }
412
90
    if (!result) {
413
0
      throwEnvoyExceptionOrPanic(absl::StrCat("Duplicate resource monitor ", name));
414
0
    }
415
90
  }
416
417
10.5k
  for (const auto& action : config.actions()) {
418
0
    const auto& name = action.name();
419
0
    const auto symbol = action_symbol_table_.get(name);
420
0
    ENVOY_LOG(debug, "Adding overload action {}", name);
421
422
    // Validate that this is a well known overload action.
423
0
    if (Runtime::runtimeFeatureEnabled(
424
0
            "envoy.reloadable_features.overload_manager_error_unknown_action")) {
425
0
      auto& well_known_actions = OverloadActionNames::get().WellKnownActions;
426
0
      if (std::find(well_known_actions.begin(), well_known_actions.end(), name) ==
427
0
          well_known_actions.end()) {
428
0
        throwEnvoyExceptionOrPanic(absl::StrCat("Unknown Overload Manager Action ", name));
429
0
      }
430
0
    }
431
432
    // TODO: use in place construction once https://github.com/abseil/abseil-cpp/issues/388 is
433
    // addressed
434
    // We cannot currently use in place construction as the OverloadAction constructor may throw,
435
    // causing an inconsistent internal state of the actions_ map, which on destruction results in
436
    // an invalid free.
437
0
    auto result = actions_.try_emplace(symbol, OverloadAction(action, stats_scope));
438
0
    if (!result.second) {
439
0
      throwEnvoyExceptionOrPanic(absl::StrCat("Duplicate overload action ", name));
440
0
    }
441
442
0
    if (name == OverloadActionNames::get().ReduceTimeouts) {
443
0
      timer_minimums_ = std::make_shared<const Event::ScaledTimerTypeMap>(
444
0
          parseTimerMinimums(action.typed_config(), validation_visitor));
445
0
    } else if (name == OverloadActionNames::get().ResetStreams) {
446
0
      if (!config.has_buffer_factory_config()) {
447
0
        throwEnvoyExceptionOrPanic(
448
0
            fmt::format("Overload action \"{}\" requires buffer_factory_config.", name));
449
0
      }
450
0
      makeCounter(api.rootScope(), OverloadActionStatsNames::get().ResetStreamsCount);
451
0
    } else if (action.has_typed_config()) {
452
0
      throwEnvoyExceptionOrPanic(fmt::format(
453
0
          "Overload action \"{}\" has an unexpected value for the typed_config field", name));
454
0
    }
455
456
0
    for (const auto& trigger : action.triggers()) {
457
0
      const std::string& resource = trigger.name();
458
0
      auto proactive_resource_it =
459
0
          OverloadProactiveResources::get().proactive_action_name_to_resource_.find(resource);
460
461
0
      if (resources_.find(resource) == resources_.end() &&
462
0
          proactive_resource_it ==
463
0
              OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
464
0
        throwEnvoyExceptionOrPanic(
465
0
            fmt::format("Unknown trigger resource {} for overload action {}", resource, name));
466
0
      }
467
0
      resource_to_actions_.insert(std::make_pair(resource, symbol));
468
0
    }
469
0
  }
470
471
  // Validate the trigger resources for Load shedPoints.
472
10.5k
  for (const auto& point : config.loadshed_points()) {
473
0
    for (const auto& trigger : point.triggers()) {
474
0
      if (!resources_.contains(trigger.name())) {
475
0
        throwEnvoyExceptionOrPanic(fmt::format("Unknown trigger resource {} for loadshed point {}",
476
0
                                               trigger.name(), point.name()));
477
0
      }
478
0
    }
479
480
0
    const auto result = loadshed_points_.try_emplace(
481
0
        point.name(),
482
0
        std::make_unique<LoadShedPointImpl>(point, api.rootScope(), api.randomGenerator()));
483
484
0
    if (!result.second) {
485
0
      throwEnvoyExceptionOrPanic(absl::StrCat("Duplicate loadshed point ", point.name()));
486
0
    }
487
0
  }
488
10.5k
}
489
490
2.64k
void OverloadManagerImpl::start() {
491
2.64k
  ASSERT(!started_);
492
2.64k
  started_ = true;
493
494
5.29k
  tls_.set([this](Event::Dispatcher&) {
495
5.29k
    return std::make_shared<ThreadLocalOverloadStateImpl>(action_symbol_table_,
496
5.29k
                                                          proactive_resources_);
497
5.29k
  });
498
499
2.64k
  if (resources_.empty()) {
500
2.64k
    return;
501
2.64k
  }
502
503
0
  timer_ = dispatcher_.createTimer([this]() -> void {
504
    // Guarantee that all resource updates get flushed after no more than one refresh_interval_.
505
0
    flushResourceUpdates();
506
507
    // Start a new flush epoch. If all resource updates complete before this callback runs, the last
508
    // resource update will call flushResourceUpdates to flush the whole batch early.
509
0
    ++flush_epoch_;
510
0
    flush_awaiting_updates_ = resources_.size();
511
512
0
    for (auto& resource : resources_) {
513
0
      resource.second.update(flush_epoch_);
514
0
    }
515
516
    // Record delay.
517
0
    auto now = time_source_.monotonicTime();
518
0
    std::chrono::milliseconds delay =
519
0
        std::chrono::duration_cast<std::chrono::milliseconds>(now - time_resources_last_measured_);
520
0
    refresh_interval_delays_.recordValue(delay.count());
521
0
    time_resources_last_measured_ = now;
522
523
0
    timer_->enableTimer(refresh_interval_);
524
0
  });
525
526
0
  time_resources_last_measured_ = time_source_.monotonicTime();
527
0
  timer_->enableTimer(refresh_interval_);
528
0
}
529
530
5.34k
void OverloadManagerImpl::stop() {
531
  // Disable any pending timeouts.
532
5.34k
  if (timer_) {
533
0
    timer_->disableTimer();
534
0
  }
535
536
  // Clear the resource map to block on any pending updates.
537
5.34k
  resources_.clear();
538
539
  // TODO(nezdolik): wrap proactive monitors into atomic? and clear it here
540
5.34k
}
541
542
bool OverloadManagerImpl::registerForAction(const std::string& action,
543
                                            Event::Dispatcher& dispatcher,
544
26.6k
                                            OverloadActionCb callback) {
545
26.6k
  ASSERT(!started_);
546
26.6k
  const auto symbol = action_symbol_table_.get(action);
547
548
26.6k
  if (actions_.find(symbol) == actions_.end()) {
549
26.6k
    ENVOY_LOG(debug, "No overload action is configured for {}.", action);
550
26.6k
    return false;
551
26.6k
  }
552
553
0
  action_to_callbacks_.emplace(std::piecewise_construct, std::forward_as_tuple(symbol),
554
0
                               std::forward_as_tuple(dispatcher, callback));
555
0
  return true;
556
26.6k
}
557
558
8.93k
ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { return *tls_; }
559
5.33k
Event::ScaledRangeTimerManagerFactory OverloadManagerImpl::scaledTimerFactory() {
560
5.33k
  return [this](Event::Dispatcher& dispatcher) {
561
5.33k
    auto manager = createScaledRangeTimerManager(dispatcher, timer_minimums_);
562
5.33k
    registerForAction(OverloadActionNames::get().ReduceTimeouts, dispatcher,
563
5.33k
                      [manager = manager.get()](OverloadActionState scale_state) {
564
0
                        manager->setScaleFactor(
565
                            // The action state is 0 for no overload up to 1 for maximal overload,
566
                            // but the scale factor for timers is 1 for no scaling and 0 for maximal
567
                            // scaling, so invert the value to pass in (1-value).
568
0
                            scale_state.value().invert());
569
0
                      });
570
5.33k
    return manager;
571
5.33k
  };
572
5.33k
}
573
574
8.09k
LoadShedPoint* OverloadManagerImpl::getLoadShedPoint(absl::string_view point_name) {
575
8.09k
  if (auto it = loadshed_points_.find(point_name); it != loadshed_points_.end()) {
576
0
    return it->second.get();
577
0
  }
578
8.09k
  return nullptr;
579
8.09k
}
580
581
Event::ScaledRangeTimerManagerPtr OverloadManagerImpl::createScaledRangeTimerManager(
582
    Event::Dispatcher& dispatcher,
583
5.33k
    const Event::ScaledTimerTypeMapConstSharedPtr& timer_minimums) const {
584
5.33k
  return std::make_unique<Event::ScaledRangeTimerManagerImpl>(dispatcher, timer_minimums);
585
5.33k
}
586
587
void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure,
588
0
                                                 FlushEpochId flush_epoch) {
589
0
  auto [start, end] = resource_to_actions_.equal_range(resource);
590
591
0
  std::for_each(start, end, [&](ResourceToActionMap::value_type& entry) {
592
0
    const NamedOverloadActionSymbolTable::Symbol action = entry.second;
593
0
    auto action_it = actions_.find(action);
594
0
    ASSERT(action_it != actions_.end());
595
0
    const OverloadActionState old_state = action_it->second.getState();
596
0
    if (action_it->second.updateResourcePressure(resource, pressure)) {
597
0
      const auto state = action_it->second.getState();
598
599
0
      if (old_state.isSaturated() != state.isSaturated()) {
600
0
        ENVOY_LOG(debug, "Overload action {} became {}", action_symbol_table_.name(action),
601
0
                  (state.isSaturated() ? "saturated" : "scaling"));
602
0
      }
603
604
      // Record the updated value to be sent to workers on the next thread-local-state flush, along
605
      // with any update callbacks. This might overwrite a previous action state change caused by a
606
      // pressure update for a different resource that hasn't been flushed yet. That's okay because
607
      // the state recorded here includes the information from all previous resource updates. So
608
      // even if resource 1 causes an action to have value A, and a later update to resource 2
609
      // causes the action to have value B, B would have been the result for whichever order the
610
      // updates to resources 1 and 2 came in.
611
0
      state_updates_to_flush_.insert_or_assign(action, state);
612
0
      auto [callbacks_start, callbacks_end] = action_to_callbacks_.equal_range(action);
613
0
      std::for_each(callbacks_start, callbacks_end, [&](ActionToCallbackMap::value_type& cb_entry) {
614
0
        callbacks_to_flush_.insert_or_assign(&cb_entry.second, state);
615
0
      });
616
0
    }
617
0
  });
618
619
0
  for (auto& loadshed_point : loadshed_points_) {
620
0
    loadshed_point.second->updateResource(resource, pressure);
621
0
  }
622
623
  // Eagerly flush updates if this is the last call to updateResourcePressure expected for the
624
  // current epoch. This assert is always valid because flush_awaiting_updates_ is initialized
625
  // before each batch of updates, and even if a resource monitor performs a double update, or a
626
  // previous update callback is late, the logic in OverloadManager::Resource::update() will prevent
627
  // unexpected calls to this function.
628
0
  ASSERT(flush_awaiting_updates_ > 0);
629
0
  --flush_awaiting_updates_;
630
0
  if (flush_epoch == flush_epoch_ && flush_awaiting_updates_ == 0) {
631
0
    flushResourceUpdates();
632
0
  }
633
0
}
634
635
0
void OverloadManagerImpl::flushResourceUpdates() {
636
0
  if (!state_updates_to_flush_.empty()) {
637
0
    auto shared_updates = std::make_shared<
638
0
        absl::flat_hash_map<NamedOverloadActionSymbolTable::Symbol, OverloadActionState>>();
639
0
    std::swap(*shared_updates, state_updates_to_flush_);
640
641
0
    tls_.runOnAllThreads(
642
0
        [updates = std::move(shared_updates)](OptRef<ThreadLocalOverloadStateImpl> overload_state) {
643
0
          for (const auto& [action, state] : *updates) {
644
0
            overload_state->setState(action, state);
645
0
          }
646
0
        });
647
0
  }
648
649
0
  for (const auto& [cb, state] : callbacks_to_flush_) {
650
0
    cb->dispatcher_.post([cb = cb, state = state]() { cb->callback_(state); });
651
0
  }
652
0
  callbacks_to_flush_.clear();
653
0
}
654
655
OverloadManagerImpl::Resource::Resource(const std::string& name, ResourceMonitorPtr monitor,
656
                                        OverloadManagerImpl& manager, Stats::Scope& stats_scope)
657
    : name_(name), monitor_(std::move(monitor)), manager_(manager),
658
      pressure_gauge_(
659
          makeGauge(stats_scope, name, "pressure", Stats::Gauge::ImportMode::NeverImport)),
660
      failed_updates_counter_(makeCounter(stats_scope, name, "failed_updates")),
661
0
      skipped_updates_counter_(makeCounter(stats_scope, name, "skipped_updates")) {}
662
663
0
void OverloadManagerImpl::Resource::update(FlushEpochId flush_epoch) {
664
0
  if (!pending_update_) {
665
0
    pending_update_ = true;
666
0
    flush_epoch_ = flush_epoch;
667
0
    monitor_->updateResourceUsage(*this);
668
0
    return;
669
0
  }
670
0
  ENVOY_LOG(debug, "Skipping update for resource {} which has pending update", name_);
671
0
  skipped_updates_counter_.inc();
672
0
}
673
674
0
void OverloadManagerImpl::Resource::onSuccess(const ResourceUsage& usage) {
675
0
  pending_update_ = false;
676
0
  manager_.updateResourcePressure(name_, usage.resource_pressure_, flush_epoch_);
677
0
  pressure_gauge_.set(usage.resource_pressure_ * 100); // convert to percent
678
0
}
679
680
0
void OverloadManagerImpl::Resource::onFailure(const EnvoyException& error) {
681
0
  pending_update_ = false;
682
0
  ENVOY_LOG(info, "Failed to update resource {}: {}", name_, error.what());
683
0
  failed_updates_counter_.inc();
684
0
}
685
686
} // namespace Server
687
} // namespace Envoy