Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/server/overload_manager_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/server/overload_manager_impl.h"
2
3
#include <chrono>
4
5
#include "envoy/common/exception.h"
6
#include "envoy/config/overload/v3/overload.pb.h"
7
#include "envoy/config/overload/v3/overload.pb.validate.h"
8
#include "envoy/stats/scope.h"
9
10
#include "source/common/common/fmt.h"
11
#include "source/common/config/utility.h"
12
#include "source/common/event/scaled_range_timer_manager_impl.h"
13
#include "source/common/protobuf/utility.h"
14
#include "source/common/stats/symbol_table.h"
15
#include "source/server/resource_monitor_config_impl.h"
16
17
#include "absl/container/node_hash_map.h"
18
#include "absl/strings/str_cat.h"
19
20
namespace Envoy {
21
namespace Server {
22
namespace {
23
24
using TriggerPtr = std::unique_ptr<Trigger>;
25
26
class ThresholdTriggerImpl final : public Trigger {
27
public:
28
  ThresholdTriggerImpl(const envoy::config::overload::v3::ThresholdTrigger& config)
29
0
      : threshold_(config.value()), state_(OverloadActionState::inactive()) {}
30
31
0
  bool updateValue(double value) override {
32
0
    const OverloadActionState state = actionState();
33
0
    state_ =
34
0
        value >= threshold_ ? OverloadActionState::saturated() : OverloadActionState::inactive();
35
    // This is a floating point comparison, though state_ is always either
36
    // saturated or inactive so there's no risk due to floating point precision.
37
0
    return state.value() != actionState().value();
38
0
  }
39
40
0
  OverloadActionState actionState() const override { return state_; }
41
42
private:
43
  const double threshold_;
44
  OverloadActionState state_;
45
};
46
47
class ScaledTriggerImpl final : public Trigger {
48
public:
49
  static absl::StatusOr<std::unique_ptr<ScaledTriggerImpl>>
50
0
  create(const envoy::config::overload::v3::ScaledTrigger& config) {
51
0
    if (config.scaling_threshold() >= config.saturation_threshold()) {
52
0
      return absl::InvalidArgumentError("scaling_threshold must be less than saturation_threshold");
53
0
    }
54
0
    return std::unique_ptr<ScaledTriggerImpl>(new ScaledTriggerImpl(config));
55
0
  }
56
57
0
  bool updateValue(double value) override {
58
0
    const OverloadActionState old_state = actionState();
59
0
    if (value <= scaling_threshold_) {
60
0
      state_ = OverloadActionState::inactive();
61
0
    } else if (value >= saturated_threshold_) {
62
0
      state_ = OverloadActionState::saturated();
63
0
    } else {
64
0
      state_ = OverloadActionState(
65
0
          UnitFloat((value - scaling_threshold_) / (saturated_threshold_ - scaling_threshold_)));
66
0
    }
67
    // All values of state_ are produced via this same code path. Even if
68
    // old_state and state_ should be approximately equal, there's no harm in
69
    // signaling for a small change if they're not float::operator== equal.
70
0
    return state_.value() != old_state.value();
71
0
  }
72
73
0
  OverloadActionState actionState() const override { return state_; }
74
75
private:
76
  ScaledTriggerImpl(const envoy::config::overload::v3::ScaledTrigger& config)
77
      : scaling_threshold_(config.scaling_threshold()),
78
        saturated_threshold_(config.saturation_threshold()),
79
0
        state_(OverloadActionState::inactive()) {}
80
81
  const double scaling_threshold_;
82
  const double saturated_threshold_;
83
  OverloadActionState state_;
84
};
85
86
absl::StatusOr<TriggerPtr>
87
0
createTriggerFromConfig(const envoy::config::overload::v3::Trigger& trigger_config) {
88
0
  TriggerPtr trigger;
89
90
0
  switch (trigger_config.trigger_oneof_case()) {
91
0
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::kThreshold:
92
0
    trigger = std::make_unique<ThresholdTriggerImpl>(trigger_config.threshold());
93
0
    break;
94
0
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::kScaled: {
95
0
    auto trigger_or_error = ScaledTriggerImpl::create(trigger_config.scaled());
96
0
    RETURN_IF_NOT_OK(trigger_or_error.status());
97
0
    trigger = std::move(trigger_or_error.value());
98
0
    break;
99
0
  }
100
0
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::TRIGGER_ONEOF_NOT_SET:
101
0
    return absl::InvalidArgumentError(
102
0
        absl::StrCat("action not set for trigger ", trigger_config.name()));
103
0
  }
104
105
0
  return trigger;
106
0
}
107
108
0
Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view name_of_stat) {
109
0
  Stats::StatNameManagedStorage stat_name(name_of_stat, scope.symbolTable());
110
0
  return scope.counterFromStatName(stat_name.statName());
111
0
}
112
113
0
Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view a, absl::string_view b) {
114
0
  return makeCounter(scope, absl::StrCat("overload.", a, ".", b));
115
0
}
116
117
Stats::Gauge& makeGauge(Stats::Scope& scope, absl::string_view a, absl::string_view b,
118
0
                        Stats::Gauge::ImportMode import_mode) {
119
0
  Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", a, ".", b),
120
0
                                          scope.symbolTable());
121
0
  return scope.gaugeFromStatName(stat_name.statName(), import_mode);
122
0
}
123
124
Stats::Histogram& makeHistogram(Stats::Scope& scope, absl::string_view name,
125
8.66k
                                Stats::Histogram::Unit unit) {
126
8.66k
  Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", name), scope.symbolTable());
127
8.66k
  return scope.histogramFromStatName(stat_name.statName(), unit);
128
8.66k
}
129
130
absl::StatusOr<Event::ScaledTimerType> parseTimerType(
131
0
    envoy::config::overload::v3::ScaleTimersOverloadActionConfig::TimerType config_timer_type) {
132
0
  using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
133
134
0
  switch (config_timer_type) {
135
0
  case Config::HTTP_DOWNSTREAM_CONNECTION_IDLE:
136
0
    return Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout;
137
0
  case Config::HTTP_DOWNSTREAM_STREAM_IDLE:
138
0
    return Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout;
139
0
  case Config::TRANSPORT_SOCKET_CONNECT:
140
0
    return Event::ScaledTimerType::TransportSocketConnectTimeout;
141
0
  default:
142
0
    return absl::InvalidArgumentError(
143
0
        fmt::format("Unknown timer type {}", static_cast<int>(config_timer_type)));
144
0
  }
145
0
}
146
147
absl::StatusOr<Event::ScaledTimerTypeMap>
148
parseTimerMinimums(const ProtobufWkt::Any& typed_config,
149
0
                   ProtobufMessage::ValidationVisitor& validation_visitor) {
150
0
  using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
151
0
  const Config action_config =
152
0
      MessageUtil::anyConvertAndValidate<Config>(typed_config, validation_visitor);
153
154
0
  Event::ScaledTimerTypeMap timer_map;
155
156
0
  for (const auto& scale_timer : action_config.timer_scale_factors()) {
157
0
    auto timer_or_error = parseTimerType(scale_timer.timer());
158
0
    RETURN_IF_NOT_OK(timer_or_error.status());
159
0
    const Event::ScaledTimerType& timer_type = *timer_or_error;
160
161
0
    const Event::ScaledTimerMinimum minimum =
162
0
        scale_timer.has_min_timeout()
163
0
            ? Event::ScaledTimerMinimum(Event::AbsoluteMinimum(std::chrono::milliseconds(
164
0
                  DurationUtil::durationToMilliseconds(scale_timer.min_timeout()))))
165
0
            : Event::ScaledTimerMinimum(
166
0
                  Event::ScaledMinimum(UnitFloat(scale_timer.min_scale().value() / 100.0)));
167
168
0
    auto [_, inserted] = timer_map.insert(std::make_pair(timer_type, minimum));
169
0
    UNREFERENCED_PARAMETER(_);
170
0
    if (!inserted) {
171
0
      return absl::InvalidArgumentError(fmt::format("Found duplicate entry for timer type {}",
172
0
                                                    Config::TimerType_Name(scale_timer.timer())));
173
0
    }
174
0
  }
175
176
0
  return timer_map;
177
0
}
178
179
} // namespace
180
181
/**
182
 * Thread-local copy of the state of each configured overload action.
183
 */
184
class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState {
185
public:
186
  explicit ThreadLocalOverloadStateImpl(
187
      const NamedOverloadActionSymbolTable& action_symbol_table,
188
      std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>&
189
          proactive_resources)
190
      : action_symbol_table_(action_symbol_table),
191
        actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())),
192
3.95k
        proactive_resources_(proactive_resources) {}
193
194
5.85k
  const OverloadActionState& getState(const std::string& action) override {
195
5.85k
    if (const auto symbol = action_symbol_table_.lookup(action); symbol != absl::nullopt) {
196
0
      return actions_[symbol->index()];
197
0
    }
198
5.85k
    return always_inactive_;
199
5.85k
  }
200
201
0
  void setState(NamedOverloadActionSymbolTable::Symbol action, OverloadActionState state) {
202
0
    actions_[action.index()] = state;
203
0
  }
204
205
  bool tryAllocateResource(OverloadProactiveResourceName resource_name,
206
0
                           int64_t increment) override {
207
0
    const auto proactive_resource = proactive_resources_->find(resource_name);
208
0
    if (proactive_resource == proactive_resources_->end()) {
209
0
      ENVOY_LOG_MISC(warn, "Failed to allocate resource usage, resource monitor is not configured");
210
0
      return false;
211
0
    }
212
213
0
    return proactive_resource->second.tryAllocateResource(increment);
214
0
  }
215
216
  bool tryDeallocateResource(OverloadProactiveResourceName resource_name,
217
0
                             int64_t decrement) override {
218
0
    const auto proactive_resource = proactive_resources_->find(resource_name);
219
0
    if (proactive_resource == proactive_resources_->end()) {
220
0
      ENVOY_LOG_MISC(warn,
221
0
                     "Failed to deallocate resource usage, resource monitor is not configured");
222
0
      return false;
223
0
    }
224
225
0
    return proactive_resource->second.tryDeallocateResource(decrement);
226
0
  }
227
228
3.96k
  bool isResourceMonitorEnabled(OverloadProactiveResourceName resource_name) override {
229
3.96k
    const auto proactive_resource = proactive_resources_->find(resource_name);
230
3.96k
    return proactive_resource != proactive_resources_->end();
231
3.96k
  }
232
233
  ProactiveResourceMonitorOptRef
234
0
  getProactiveResourceMonitorForTest(OverloadProactiveResourceName resource_name) override {
235
0
    const auto proactive_resource = proactive_resources_->find(resource_name);
236
0
    if (proactive_resource == proactive_resources_->end()) {
237
0
      ENVOY_LOG_MISC(warn, "Failed to get resource usage, resource monitor is not configured");
238
0
      return makeOptRefFromPtr<ProactiveResourceMonitor>(nullptr);
239
0
    }
240
0
    return proactive_resource->second.getProactiveResourceMonitorForTest();
241
0
  }
242
243
private:
244
  static const OverloadActionState always_inactive_;
245
  const NamedOverloadActionSymbolTable& action_symbol_table_;
246
  std::vector<OverloadActionState> actions_;
247
  std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>
248
      proactive_resources_;
249
};
250
251
const OverloadActionState ThreadLocalOverloadStateImpl::always_inactive_{UnitFloat::min()};
252
253
NamedOverloadActionSymbolTable::Symbol
254
22.2k
NamedOverloadActionSymbolTable::get(absl::string_view string) {
255
22.2k
  if (auto it = table_.find(string); it != table_.end()) {
256
0
    return Symbol(it->second);
257
0
  }
258
259
22.2k
  size_t index = table_.size();
260
261
22.2k
  names_.emplace_back(string);
262
22.2k
  table_.emplace(std::make_pair(string, index));
263
264
22.2k
  return Symbol(index);
265
22.2k
}
266
267
absl::optional<NamedOverloadActionSymbolTable::Symbol>
268
5.85k
NamedOverloadActionSymbolTable::lookup(absl::string_view string) const {
269
5.85k
  if (auto it = table_.find(string); it != table_.end()) {
270
0
    return Symbol(it->second);
271
0
  }
272
5.85k
  return absl::nullopt;
273
5.85k
}
274
275
0
const absl::string_view NamedOverloadActionSymbolTable::name(Symbol symbol) const {
276
0
  return names_.at(symbol.index());
277
0
}
278
279
absl::StatusOr<std::unique_ptr<OverloadAction>>
280
OverloadAction::create(const envoy::config::overload::v3::OverloadAction& config,
281
0
                       Stats::Scope& stats_scope) {
282
0
  absl::Status creation_status = absl::OkStatus();
283
0
  auto ret =
284
0
      std::unique_ptr<OverloadAction>(new OverloadAction(config, stats_scope, creation_status));
285
0
  RETURN_IF_NOT_OK(creation_status);
286
0
  return ret;
287
0
}
288
289
OverloadAction::OverloadAction(const envoy::config::overload::v3::OverloadAction& config,
290
                               Stats::Scope& stats_scope, absl::Status& creation_status)
291
    : state_(OverloadActionState::inactive()),
292
      active_gauge_(
293
          makeGauge(stats_scope, config.name(), "active", Stats::Gauge::ImportMode::NeverImport)),
294
      scale_percent_gauge_(makeGauge(stats_scope, config.name(), "scale_percent",
295
0
                                     Stats::Gauge::ImportMode::NeverImport)) {
296
0
  for (const auto& trigger_config : config.triggers()) {
297
0
    absl::StatusOr<TriggerPtr> trigger_or_error = createTriggerFromConfig(trigger_config);
298
0
    SET_AND_RETURN_IF_NOT_OK(trigger_or_error.status(), creation_status);
299
0
    if (!triggers_.try_emplace(trigger_config.name(), std::move(*trigger_or_error)).second) {
300
0
      creation_status = absl::InvalidArgumentError(
301
0
          absl::StrCat("Duplicate trigger resource for overload action ", config.name()));
302
0
      return;
303
0
    }
304
0
  }
305
306
0
  active_gauge_.set(0);
307
0
  scale_percent_gauge_.set(0);
308
0
}
309
310
0
bool OverloadAction::updateResourcePressure(const std::string& name, double pressure) {
311
0
  const OverloadActionState old_state = getState();
312
313
0
  auto it = triggers_.find(name);
314
0
  ASSERT(it != triggers_.end());
315
0
  if (!it->second->updateValue(pressure)) {
316
0
    return false;
317
0
  }
318
0
  const auto trigger_new_state = it->second->actionState();
319
0
  active_gauge_.set(trigger_new_state.isSaturated() ? 1 : 0);
320
0
  scale_percent_gauge_.set(trigger_new_state.value().value() * 100);
321
322
0
  {
323
    // Compute the new state as the maximum over all trigger states.
324
0
    OverloadActionState new_state = OverloadActionState::inactive();
325
0
    for (auto& trigger : triggers_) {
326
0
      const auto trigger_state = trigger.second->actionState();
327
0
      if (trigger_state.value() > new_state.value()) {
328
0
        new_state = trigger_state;
329
0
      }
330
0
    }
331
0
    state_ = new_state;
332
0
  }
333
334
0
  return state_.value() != old_state.value();
335
0
}
336
337
0
OverloadActionState OverloadAction::getState() const { return state_; }
338
339
absl::StatusOr<std::unique_ptr<LoadShedPointImpl>>
340
LoadShedPointImpl::create(const envoy::config::overload::v3::LoadShedPoint& config,
341
0
                          Stats::Scope& stats_scope, Random::RandomGenerator& random_generator) {
342
0
  absl::Status creation_status = absl::OkStatus();
343
0
  auto ret = std::unique_ptr<LoadShedPointImpl>(
344
0
      new LoadShedPointImpl(config, stats_scope, random_generator, creation_status));
345
0
  RETURN_IF_NOT_OK(creation_status);
346
0
  return ret;
347
0
}
348
LoadShedPointImpl::LoadShedPointImpl(const envoy::config::overload::v3::LoadShedPoint& config,
349
                                     Stats::Scope& stats_scope,
350
                                     Random::RandomGenerator& random_generator,
351
                                     absl::Status& creation_status)
352
    : scale_percent_(makeGauge(stats_scope, config.name(), "scale_percent",
353
                               Stats::Gauge::ImportMode::NeverImport)),
354
      shed_load_counter_(makeCounter(stats_scope, config.name(), "shed_load_count")),
355
0
      random_generator_(random_generator) {
356
0
  for (const auto& trigger_config : config.triggers()) {
357
0
    auto trigger_or_error = createTriggerFromConfig(trigger_config);
358
0
    SET_AND_RETURN_IF_NOT_OK(trigger_or_error.status(), creation_status);
359
0
    if (!triggers_.try_emplace(trigger_config.name(), std::move(*trigger_or_error)).second) {
360
0
      creation_status = absl::InvalidArgumentError(
361
0
          absl::StrCat("Duplicate trigger resource for LoadShedPoint ", config.name()));
362
0
      return;
363
0
    }
364
0
  }
365
0
};
366
367
void LoadShedPointImpl::updateResource(absl::string_view resource_name,
368
0
                                       double resource_utilization) {
369
0
  auto it = triggers_.find(resource_name);
370
0
  if (it == triggers_.end()) {
371
0
    return;
372
0
  }
373
374
0
  it->second->updateValue(resource_utilization);
375
0
  updateProbabilityShedLoad();
376
0
}
377
378
0
void LoadShedPointImpl::updateProbabilityShedLoad() {
379
0
  float max_unit_float = 0.0f;
380
0
  for (const auto& trigger : triggers_) {
381
0
    max_unit_float = std::max(trigger.second->actionState().value().value(), max_unit_float);
382
0
  }
383
384
0
  probability_shed_load_.store(max_unit_float);
385
386
  // Update stats.
387
0
  scale_percent_.set(100 * max_unit_float);
388
0
}
389
390
0
bool LoadShedPointImpl::shouldShedLoad() {
391
0
  float unit_float_probability_shed_load = probability_shed_load_.load();
392
  // This should be ok as we're using unit float which saturates at 1.0f.
393
0
  if (unit_float_probability_shed_load == 1.0f) {
394
0
    shed_load_counter_.inc();
395
0
    return true;
396
0
  }
397
398
0
  if (random_generator_.bernoulli(UnitFloat(unit_float_probability_shed_load))) {
399
0
    shed_load_counter_.inc();
400
0
    return true;
401
0
  }
402
0
  return false;
403
0
}
404
405
absl::StatusOr<std::unique_ptr<OverloadManagerImpl>>
406
OverloadManagerImpl::create(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
407
                            ThreadLocal::SlotAllocator& slot_allocator,
408
                            const envoy::config::overload::v3::OverloadManager& config,
409
                            ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api,
410
8.66k
                            const Server::Options& options) {
411
8.66k
  absl::Status creation_status = absl::OkStatus();
412
8.66k
  auto ret = std::unique_ptr<OverloadManagerImpl>(
413
8.66k
      new OverloadManagerImpl(dispatcher, stats_scope, slot_allocator, config, validation_visitor,
414
8.66k
                              api, options, creation_status));
415
8.66k
  RETURN_IF_NOT_OK(creation_status);
416
8.66k
  return ret;
417
8.66k
}
418
OverloadManagerImpl::OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
419
                                         ThreadLocal::SlotAllocator& slot_allocator,
420
                                         const envoy::config::overload::v3::OverloadManager& config,
421
                                         ProtobufMessage::ValidationVisitor& validation_visitor,
422
                                         Api::Api& api, const Server::Options& options,
423
                                         absl::Status& creation_status)
424
    : dispatcher_(dispatcher), time_source_(api.timeSource()), tls_(slot_allocator),
425
      refresh_interval_(
426
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))),
427
      refresh_interval_delays_(makeHistogram(stats_scope, "refresh_interval_delay",
428
                                             Stats::Histogram::Unit::Milliseconds)),
429
      proactive_resources_(
430
          std::make_unique<
431
8.66k
              absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>()) {
432
8.66k
  Configuration::ResourceMonitorFactoryContextImpl context(dispatcher, options, api,
433
8.66k
                                                           validation_visitor);
434
  // We should hide impl details from users, for them there should be no distinction between
435
  // proactive and regular resource monitors in configuration API. But internally we will maintain
436
  // two distinct collections of proactive and regular resources. Proactive resources are not
437
  // subject to periodic flushes and can be recalculated/updated on demand by invoking
438
  // `tryAllocateResource/tryDeallocateResource` via thread local overload state.
439
8.66k
  for (const auto& resource : config.resource_monitors()) {
440
54
    const auto& name = resource.name();
441
    // Check if it is a proactive resource.
442
54
    auto proactive_resource_it =
443
54
        OverloadProactiveResources::get().proactive_action_name_to_resource_.find(name);
444
54
    ENVOY_LOG(debug, "Evaluating resource {}", name);
445
54
    bool result = false;
446
54
    if (proactive_resource_it !=
447
54
        OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
448
0
      ENVOY_LOG(debug, "Adding proactive resource monitor for {}", name);
449
0
      auto& factory =
450
0
          Config::Utility::getAndCheckFactory<Configuration::ProactiveResourceMonitorFactory>(
451
0
              resource);
452
0
      auto config =
453
0
          Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
454
0
      auto monitor = factory.createProactiveResourceMonitor(*config, context);
455
0
      result =
456
0
          proactive_resources_
457
0
              ->try_emplace(proactive_resource_it->second, name, std::move(monitor), stats_scope)
458
0
              .second;
459
54
    } else {
460
54
      ENVOY_LOG(debug, "Adding resource monitor for {}", name);
461
54
      auto& factory =
462
54
          Config::Utility::getAndCheckFactory<Configuration::ResourceMonitorFactory>(resource);
463
54
      auto config =
464
54
          Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
465
54
      auto monitor = factory.createResourceMonitor(*config, context);
466
54
      result = resources_.try_emplace(name, name, std::move(monitor), *this, stats_scope).second;
467
54
    }
468
54
    if (!result) {
469
0
      creation_status =
470
0
          absl::InvalidArgumentError(absl::StrCat("Duplicate resource monitor ", name));
471
0
      return;
472
0
    }
473
54
  }
474
475
8.66k
  for (const auto& action : config.actions()) {
476
0
    const auto& name = action.name();
477
0
    const auto symbol = action_symbol_table_.get(name);
478
0
    ENVOY_LOG(debug, "Adding overload action {}", name);
479
480
    // Validate that this is a well known overload action.
481
0
    const auto& well_known_actions = OverloadActionNames::get().WellKnownActions;
482
0
    if (std::find(well_known_actions.begin(), well_known_actions.end(), name) ==
483
0
        well_known_actions.end()) {
484
0
      creation_status =
485
0
          absl::InvalidArgumentError(absl::StrCat("Unknown Overload Manager Action ", name));
486
0
      return;
487
0
    }
488
489
    // TODO: use in place construction once https://github.com/abseil/abseil-cpp/issues/388 is
490
    // addressed
491
    // We cannot currently use in place construction as the OverloadAction constructor may fail,
492
    // causing an inconsistent internal state of the actions_ map, which on destruction results in
493
    // an invalid free.
494
0
    auto action_or_error = OverloadAction::create(action, stats_scope);
495
0
    SET_AND_RETURN_IF_NOT_OK(action_or_error.status(), creation_status);
496
0
    auto result = actions_.try_emplace(symbol, std::move(*action_or_error));
497
0
    if (!result.second) {
498
0
      creation_status =
499
0
          absl::InvalidArgumentError(absl::StrCat("Duplicate overload action ", name));
500
0
      return;
501
0
    }
502
503
0
    if (name == OverloadActionNames::get().ReduceTimeouts) {
504
0
      auto timer_or_error = parseTimerMinimums(action.typed_config(), validation_visitor);
505
0
      SET_AND_RETURN_IF_NOT_OK(timer_or_error.status(), creation_status);
506
0
      timer_minimums_ =
507
0
          std::make_shared<const Event::ScaledTimerTypeMap>(std::move(*timer_or_error));
508
0
    } else if (name == OverloadActionNames::get().ResetStreams) {
509
0
      if (!config.has_buffer_factory_config()) {
510
0
        creation_status = absl::InvalidArgumentError(
511
0
            fmt::format("Overload action \"{}\" requires buffer_factory_config.", name));
512
0
        return;
513
0
      }
514
0
      makeCounter(api.rootScope(), OverloadActionStatsNames::get().ResetStreamsCount);
515
0
    } else if (action.has_typed_config()) {
516
0
      creation_status = absl::InvalidArgumentError(fmt::format(
517
0
          "Overload action \"{}\" has an unexpected value for the typed_config field", name));
518
0
      return;
519
0
    }
520
521
0
    for (const auto& trigger : action.triggers()) {
522
0
      const std::string& resource = trigger.name();
523
0
      auto proactive_resource_it =
524
0
          OverloadProactiveResources::get().proactive_action_name_to_resource_.find(resource);
525
526
0
      if (resources_.find(resource) == resources_.end() &&
527
0
          proactive_resource_it ==
528
0
              OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
529
0
        creation_status = absl::InvalidArgumentError(
530
0
            fmt::format("Unknown trigger resource {} for overload action {}", resource, name));
531
0
        return;
532
0
      }
533
0
      resource_to_actions_.insert(std::make_pair(resource, symbol));
534
0
    }
535
0
  }
536
537
  // Validate the trigger resources for Load shedPoints.
538
8.66k
  for (const auto& point : config.loadshed_points()) {
539
0
    for (const auto& trigger : point.triggers()) {
540
0
      if (!resources_.contains(trigger.name())) {
541
0
        creation_status = absl::InvalidArgumentError(fmt::format(
542
0
            "Unknown trigger resource {} for loadshed point {}", trigger.name(), point.name()));
543
0
        return;
544
0
      }
545
0
    }
546
547
0
    auto load_shed_or_error =
548
0
        LoadShedPointImpl::create(point, api.rootScope(), api.randomGenerator());
549
0
    SET_AND_RETURN_IF_NOT_OK(load_shed_or_error.status(), creation_status);
550
0
    const auto result = loadshed_points_.try_emplace(point.name(), *std::move(load_shed_or_error));
551
552
0
    if (!result.second) {
553
0
      creation_status =
554
0
          absl::InvalidArgumentError(absl::StrCat("Duplicate loadshed point ", point.name()));
555
0
      return;
556
0
    }
557
0
  }
558
8.66k
}
559
560
1.97k
void OverloadManagerImpl::start() {
561
1.97k
  ASSERT(!started_);
562
1.97k
  started_ = true;
563
564
3.95k
  tls_.set([this](Event::Dispatcher&) {
565
3.95k
    return std::make_shared<ThreadLocalOverloadStateImpl>(action_symbol_table_,
566
3.95k
                                                          proactive_resources_);
567
3.95k
  });
568
569
1.97k
  if (resources_.empty()) {
570
1.97k
    return;
571
1.97k
  }
572
573
0
  timer_ = dispatcher_.createTimer([this]() -> void {
574
    // Guarantee that all resource updates get flushed after no more than one refresh_interval_.
575
0
    flushResourceUpdates();
576
577
    // Start a new flush epoch. If all resource updates complete before this callback runs, the last
578
    // resource update will call flushResourceUpdates to flush the whole batch early.
579
0
    ++flush_epoch_;
580
0
    flush_awaiting_updates_ = resources_.size();
581
582
0
    for (auto& resource : resources_) {
583
0
      resource.second.update(flush_epoch_);
584
0
    }
585
586
    // Record delay.
587
0
    auto now = time_source_.monotonicTime();
588
0
    std::chrono::milliseconds delay =
589
0
        std::chrono::duration_cast<std::chrono::milliseconds>(now - time_resources_last_measured_);
590
0
    refresh_interval_delays_.recordValue(delay.count());
591
0
    time_resources_last_measured_ = now;
592
593
0
    timer_->enableTimer(refresh_interval_);
594
0
  });
595
596
0
  time_resources_last_measured_ = time_source_.monotonicTime();
597
0
  timer_->enableTimer(refresh_interval_);
598
0
}
599
600
4.47k
void OverloadManagerImpl::stop() {
601
  // Disable any pending timeouts.
602
4.47k
  if (timer_) {
603
0
    timer_->disableTimer();
604
0
  }
605
606
  // Clear the resource map to block on any pending updates.
607
4.47k
  resources_.clear();
608
609
  // TODO(nezdolik): wrap proactive monitors into atomic? and clear it here
610
4.47k
}
611
612
bool OverloadManagerImpl::registerForAction(const std::string& action,
613
                                            Event::Dispatcher& dispatcher,
614
22.2k
                                            OverloadActionCb callback) {
615
22.2k
  ASSERT(!started_);
616
22.2k
  const auto symbol = action_symbol_table_.get(action);
617
618
22.2k
  if (actions_.find(symbol) == actions_.end()) {
619
22.2k
    ENVOY_LOG(debug, "No overload action is configured for {}.", action);
620
22.2k
    return false;
621
22.2k
  }
622
623
0
  action_to_callbacks_.emplace(std::piecewise_construct, std::forward_as_tuple(symbol),
624
0
                               std::forward_as_tuple(dispatcher, callback));
625
0
  return true;
626
22.2k
}
627
628
6.89k
ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { return *tls_; }
629
4.45k
Event::ScaledRangeTimerManagerFactory OverloadManagerImpl::scaledTimerFactory() {
630
4.45k
  return [this](Event::Dispatcher& dispatcher) {
631
4.45k
    auto manager = createScaledRangeTimerManager(dispatcher, timer_minimums_);
632
4.45k
    registerForAction(OverloadActionNames::get().ReduceTimeouts, dispatcher,
633
4.45k
                      [manager = manager.get()](OverloadActionState scale_state) {
634
0
                        manager->setScaleFactor(
635
                            // The action state is 0 for no overload up to 1 for maximal overload,
636
                            // but the scale factor for timers is 1 for no scaling and 0 for maximal
637
                            // scaling, so invert the value to pass in (1-value).
638
0
                            scale_state.value().invert());
639
0
                      });
640
4.45k
    return manager;
641
4.45k
  };
642
4.45k
}
643
644
10.8k
LoadShedPoint* OverloadManagerImpl::getLoadShedPoint(absl::string_view point_name) {
645
10.8k
  if (auto it = loadshed_points_.find(point_name); it != loadshed_points_.end()) {
646
0
    return it->second.get();
647
0
  }
648
10.8k
  return nullptr;
649
10.8k
}
650
651
Event::ScaledRangeTimerManagerPtr OverloadManagerImpl::createScaledRangeTimerManager(
652
    Event::Dispatcher& dispatcher,
653
4.45k
    const Event::ScaledTimerTypeMapConstSharedPtr& timer_minimums) const {
654
4.45k
  return std::make_unique<Event::ScaledRangeTimerManagerImpl>(dispatcher, timer_minimums);
655
4.45k
}
656
657
void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure,
658
0
                                                 FlushEpochId flush_epoch) {
659
0
  auto [start, end] = resource_to_actions_.equal_range(resource);
660
661
0
  std::for_each(start, end, [&](ResourceToActionMap::value_type& entry) {
662
0
    const NamedOverloadActionSymbolTable::Symbol action = entry.second;
663
0
    auto action_it = actions_.find(action);
664
0
    ASSERT(action_it != actions_.end());
665
0
    const OverloadActionState old_state = action_it->second->getState();
666
0
    if (action_it->second->updateResourcePressure(resource, pressure)) {
667
0
      const auto state = action_it->second->getState();
668
669
0
      if (old_state.isSaturated() != state.isSaturated()) {
670
0
        ENVOY_LOG(debug, "Overload action {} became {}", action_symbol_table_.name(action),
671
0
                  (state.isSaturated() ? "saturated" : "scaling"));
672
0
      }
673
674
      // Record the updated value to be sent to workers on the next thread-local-state flush, along
675
      // with any update callbacks. This might overwrite a previous action state change caused by a
676
      // pressure update for a different resource that hasn't been flushed yet. That's okay because
677
      // the state recorded here includes the information from all previous resource updates. So
678
      // even if resource 1 causes an action to have value A, and a later update to resource 2
679
      // causes the action to have value B, B would have been the result for whichever order the
680
      // updates to resources 1 and 2 came in.
681
0
      state_updates_to_flush_.insert_or_assign(action, state);
682
0
      auto [callbacks_start, callbacks_end] = action_to_callbacks_.equal_range(action);
683
0
      std::for_each(callbacks_start, callbacks_end, [&](ActionToCallbackMap::value_type& cb_entry) {
684
0
        callbacks_to_flush_.insert_or_assign(&cb_entry.second, state);
685
0
      });
686
0
    }
687
0
  });
688
689
0
  for (auto& loadshed_point : loadshed_points_) {
690
0
    loadshed_point.second->updateResource(resource, pressure);
691
0
  }
692
693
  // Eagerly flush updates if this is the last call to updateResourcePressure expected for the
694
  // current epoch. This assert is always valid because flush_awaiting_updates_ is initialized
695
  // before each batch of updates, and even if a resource monitor performs a double update, or a
696
  // previous update callback is late, the logic in OverloadManager::Resource::update() will prevent
697
  // unexpected calls to this function.
698
0
  ASSERT(flush_awaiting_updates_ > 0);
699
0
  --flush_awaiting_updates_;
700
0
  if (flush_epoch == flush_epoch_ && flush_awaiting_updates_ == 0) {
701
0
    flushResourceUpdates();
702
0
  }
703
0
}
704
705
0
void OverloadManagerImpl::flushResourceUpdates() {
706
0
  if (!state_updates_to_flush_.empty()) {
707
0
    auto shared_updates = std::make_shared<
708
0
        absl::flat_hash_map<NamedOverloadActionSymbolTable::Symbol, OverloadActionState>>();
709
0
    std::swap(*shared_updates, state_updates_to_flush_);
710
711
0
    tls_.runOnAllThreads(
712
0
        [updates = std::move(shared_updates)](OptRef<ThreadLocalOverloadStateImpl> overload_state) {
713
0
          for (const auto& [action, state] : *updates) {
714
0
            overload_state->setState(action, state);
715
0
          }
716
0
        });
717
0
  }
718
719
0
  for (const auto& [cb, state] : callbacks_to_flush_) {
720
0
    cb->dispatcher_.post([cb = cb, state = state]() { cb->callback_(state); });
721
0
  }
722
0
  callbacks_to_flush_.clear();
723
0
}
724
725
OverloadManagerImpl::Resource::Resource(const std::string& name, ResourceMonitorPtr monitor,
726
                                        OverloadManagerImpl& manager, Stats::Scope& stats_scope)
727
    : name_(name), monitor_(std::move(monitor)), manager_(manager),
728
      pressure_gauge_(
729
          makeGauge(stats_scope, name, "pressure", Stats::Gauge::ImportMode::NeverImport)),
730
      failed_updates_counter_(makeCounter(stats_scope, name, "failed_updates")),
731
0
      skipped_updates_counter_(makeCounter(stats_scope, name, "skipped_updates")) {}
732
733
0
void OverloadManagerImpl::Resource::update(FlushEpochId flush_epoch) {
734
0
  if (!pending_update_) {
735
0
    pending_update_ = true;
736
0
    flush_epoch_ = flush_epoch;
737
0
    monitor_->updateResourceUsage(*this);
738
0
    return;
739
0
  }
740
0
  ENVOY_LOG(debug, "Skipping update for resource {} which has pending update", name_);
741
0
  skipped_updates_counter_.inc();
742
0
}
743
744
0
void OverloadManagerImpl::Resource::onSuccess(const ResourceUsage& usage) {
745
0
  pending_update_ = false;
746
0
  manager_.updateResourcePressure(name_, usage.resource_pressure_, flush_epoch_);
747
0
  pressure_gauge_.set(usage.resource_pressure_ * 100); // convert to percent
748
0
}
749
750
0
void OverloadManagerImpl::Resource::onFailure(const EnvoyException& error) {
751
0
  pending_update_ = false;
752
0
  ENVOY_LOG(info, "Failed to update resource {}: {}", name_, error.what());
753
0
  failed_updates_counter_.inc();
754
0
}
755
756
} // namespace Server
757
} // namespace Envoy