1
#include "source/server/overload_manager_impl.h"
2

            
3
#include <chrono>
4

            
5
#include "envoy/common/exception.h"
6
#include "envoy/config/overload/v3/overload.pb.h"
7
#include "envoy/config/overload/v3/overload.pb.validate.h"
8
#include "envoy/stats/scope.h"
9

            
10
#include "source/common/common/fmt.h"
11
#include "source/common/config/utility.h"
12
#include "source/common/event/scaled_range_timer_manager_impl.h"
13
#include "source/common/protobuf/utility.h"
14
#include "source/common/stats/symbol_table.h"
15
#include "source/server/resource_monitor_config_impl.h"
16

            
17
#include "absl/container/node_hash_map.h"
18
#include "absl/strings/str_cat.h"
19

            
20
namespace Envoy {
21
namespace Server {
22
namespace {
23

            
24
using TriggerPtr = std::unique_ptr<Trigger>;
25

            
26
class ThresholdTriggerImpl final : public Trigger {
27
public:
28
  ThresholdTriggerImpl(const envoy::config::overload::v3::ThresholdTrigger& config)
29
110
      : threshold_(config.value()), state_(OverloadActionState::inactive()) {}
30

            
31
26894
  bool updateValue(double value) override {
32
26894
    const OverloadActionState state = actionState();
33
26894
    state_ =
34
26894
        value >= threshold_ ? OverloadActionState::saturated() : OverloadActionState::inactive();
35
    // This is a floating point comparison, though state_ is always either
36
    // saturated or inactive so there's no risk due to floating point precision.
37
26894
    return state.value() != actionState().value();
38
26894
  }
39

            
40
55234
  OverloadActionState actionState() const override { return state_; }
41

            
42
private:
43
  const double threshold_;
44
  OverloadActionState state_;
45
};
46

            
47
class ScaledTriggerImpl final : public Trigger {
48
public:
49
  static absl::StatusOr<std::unique_ptr<ScaledTriggerImpl>>
50
92
  create(const envoy::config::overload::v3::ScaledTrigger& config) {
51
92
    if (config.scaling_threshold() >= config.saturation_threshold()) {
52
2
      return absl::InvalidArgumentError("scaling_threshold must be less than saturation_threshold");
53
2
    }
54
90
    return std::unique_ptr<ScaledTriggerImpl>(new ScaledTriggerImpl(config));
55
92
  }
56

            
57
98886
  bool updateValue(double value) override {
58
98886
    const OverloadActionState old_state = actionState();
59
98886
    if (value <= scaling_threshold_) {
60
854
      state_ = OverloadActionState::inactive();
61
98229
    } else if (value >= saturated_threshold_) {
62
31067
      state_ = OverloadActionState::saturated();
63
66965
    } else {
64
66965
      state_ = OverloadActionState(
65
66965
          UnitFloat((value - scaling_threshold_) / (saturated_threshold_ - scaling_threshold_)));
66
66965
    }
67
    // All values of state_ are produced via this same code path. Even if
68
    // old_state and state_ should be approximately equal, there's no harm in
69
    // signaling for a small change if they're not float::operator== equal.
70
98886
    return state_.value() != old_state.value();
71
98886
  }
72

            
73
99203
  OverloadActionState actionState() const override { return state_; }
74

            
75
private:
76
  ScaledTriggerImpl(const envoy::config::overload::v3::ScaledTrigger& config)
77
90
      : scaling_threshold_(config.scaling_threshold()),
78
90
        saturated_threshold_(config.saturation_threshold()),
79
90
        state_(OverloadActionState::inactive()) {}
80

            
81
  const double scaling_threshold_;
82
  const double saturated_threshold_;
83
  OverloadActionState state_;
84
};
85

            
86
absl::StatusOr<TriggerPtr>
87
203
createTriggerFromConfig(const envoy::config::overload::v3::Trigger& trigger_config) {
88
203
  TriggerPtr trigger;
89

            
90
203
  switch (trigger_config.trigger_oneof_case()) {
91
110
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::kThreshold:
92
110
    trigger = std::make_unique<ThresholdTriggerImpl>(trigger_config.threshold());
93
110
    break;
94
92
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::kScaled: {
95
92
    auto trigger_or_error = ScaledTriggerImpl::create(trigger_config.scaled());
96
92
    RETURN_IF_NOT_OK(trigger_or_error.status());
97
90
    trigger = std::move(trigger_or_error.value());
98
90
    break;
99
92
  }
100
1
  case envoy::config::overload::v3::Trigger::TriggerOneofCase::TRIGGER_ONEOF_NOT_SET:
101
1
    return absl::InvalidArgumentError(
102
1
        absl::StrCat("action not set for trigger ", trigger_config.name()));
103
203
  }
104

            
105
200
  return trigger;
106
203
}
107

            
108
448
Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view name_of_stat) {
109
448
  Stats::StatNameManagedStorage stat_name(name_of_stat, scope.symbolTable());
110
448
  return scope.counterFromStatName(stat_name.statName());
111
448
}
112

            
113
440
Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view a, absl::string_view b) {
114
440
  return makeCounter(scope, absl::StrCat("overload.", a, ".", b));
115
440
}
116

            
117
Stats::Gauge& makeGauge(Stats::Scope& scope, absl::string_view a, absl::string_view b,
118
515
                        Stats::Gauge::ImportMode import_mode) {
119
515
  Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", a, ".", b),
120
515
                                          scope.symbolTable());
121
515
  return scope.gaugeFromStatName(stat_name.statName(), import_mode);
122
515
}
123

            
124
Stats::Histogram& makeHistogram(Stats::Scope& scope, absl::string_view name,
125
10756
                                Stats::Histogram::Unit unit) {
126
10756
  Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", name), scope.symbolTable());
127
10756
  return scope.histogramFromStatName(stat_name.statName(), unit);
128
10756
}
129

            
130
absl::StatusOr<Event::ScaledTimerType> parseTimerType(
131
70
    envoy::config::overload::v3::ScaleTimersOverloadActionConfig::TimerType config_timer_type) {
132
70
  using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
133

            
134
70
  switch (config_timer_type) {
135
17
  case Config::HTTP_DOWNSTREAM_CONNECTION_IDLE:
136
17
    return Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout;
137
14
  case Config::HTTP_DOWNSTREAM_STREAM_IDLE:
138
14
    return Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout;
139
9
  case Config::TRANSPORT_SOCKET_CONNECT:
140
9
    return Event::ScaledTimerType::TransportSocketConnectTimeout;
141
28
  case Config::HTTP_DOWNSTREAM_CONNECTION_MAX:
142
28
    return Event::ScaledTimerType::HttpDownstreamMaxConnectionTimeout;
143
2
  case Config::HTTP_DOWNSTREAM_STREAM_FLUSH:
144
2
    return Event::ScaledTimerType::HttpDownstreamStreamFlush;
145
  default:
146
    return absl::InvalidArgumentError(
147
        fmt::format("Unknown timer type {}", static_cast<int>(config_timer_type)));
148
70
  }
149
70
}
150

            
151
absl::StatusOr<Event::ScaledTimerTypeMap>
152
parseTimerMinimums(const Protobuf::Any& typed_config,
153
63
                   ProtobufMessage::ValidationVisitor& validation_visitor) {
154
63
  using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
155
63
  const Config action_config =
156
63
      MessageUtil::anyConvertAndValidate<Config>(typed_config, validation_visitor);
157

            
158
63
  Event::ScaledTimerTypeMap timer_map;
159

            
160
70
  for (const auto& scale_timer : action_config.timer_scale_factors()) {
161
70
    auto timer_or_error = parseTimerType(scale_timer.timer());
162
70
    RETURN_IF_NOT_OK(timer_or_error.status());
163
70
    const Event::ScaledTimerType& timer_type = *timer_or_error;
164

            
165
70
    const Event::ScaledTimerMinimum minimum =
166
70
        scale_timer.has_min_timeout()
167
70
            ? Event::ScaledTimerMinimum(Event::AbsoluteMinimum(std::chrono::milliseconds(
168
60
                  DurationUtil::durationToMilliseconds(scale_timer.min_timeout()))))
169
70
            : Event::ScaledTimerMinimum(
170
10
                  Event::ScaledMinimum(UnitFloat(scale_timer.min_scale().value() / 100.0)));
171

            
172
70
    auto [_, inserted] = timer_map.insert(std::make_pair(timer_type, minimum));
173
70
    UNREFERENCED_PARAMETER(_);
174
70
    if (!inserted) {
175
      return absl::InvalidArgumentError(fmt::format("Found duplicate entry for timer type {}",
176
                                                    Config::TimerType_Name(scale_timer.timer())));
177
    }
178
70
  }
179

            
180
63
  return timer_map;
181
63
}
182

            
183
} // namespace
184

            
185
/**
186
 * Thread-local copy of the state of each configured overload action.
187
 */
188
class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState {
189
public:
190
  explicit ThreadLocalOverloadStateImpl(
191
      const NamedOverloadActionSymbolTable& action_symbol_table,
192
      std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>&
193
          proactive_resources)
194
21258
      : action_symbol_table_(action_symbol_table),
195
21258
        actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())),
196
21258
        proactive_resources_(proactive_resources) {}
197

            
198
43469
  const OverloadActionState& getState(const std::string& action) override {
199
43469
    if (const auto symbol = action_symbol_table_.lookup(action); symbol != absl::nullopt) {
200
83
      return actions_[symbol->index()];
201
83
    }
202
43386
    return always_inactive_;
203
43469
  }
204

            
205
406
  void setState(NamedOverloadActionSymbolTable::Symbol action, OverloadActionState state) {
206
406
    actions_[action.index()] = state;
207
406
  }
208

            
209
  bool tryAllocateResource(OverloadProactiveResourceName resource_name,
210
16
                           int64_t increment) override {
211
16
    const auto proactive_resource = proactive_resources_->find(resource_name);
212
16
    if (proactive_resource == proactive_resources_->end()) {
213
      ENVOY_LOG_MISC(warn, "Failed to allocate resource usage, resource monitor is not configured");
214
      return false;
215
    }
216

            
217
16
    return proactive_resource->second.tryAllocateResource(increment);
218
16
  }
219

            
220
  bool tryDeallocateResource(OverloadProactiveResourceName resource_name,
221
19
                             int64_t decrement) override {
222
19
    const auto proactive_resource = proactive_resources_->find(resource_name);
223
19
    if (proactive_resource == proactive_resources_->end()) {
224
      ENVOY_LOG_MISC(warn,
225
                     "Failed to deallocate resource usage, resource monitor is not configured");
226
      return false;
227
    }
228

            
229
19
    return proactive_resource->second.tryDeallocateResource(decrement);
230
19
  }
231

            
232
20805
  bool isResourceMonitorEnabled(OverloadProactiveResourceName resource_name) override {
233
20805
    const auto proactive_resource = proactive_resources_->find(resource_name);
234
20805
    return proactive_resource != proactive_resources_->end();
235
20805
  }
236

            
237
  ProactiveResourceMonitorOptRef
238
3
  getProactiveResourceMonitorForTest(OverloadProactiveResourceName resource_name) override {
239
3
    const auto proactive_resource = proactive_resources_->find(resource_name);
240
3
    if (proactive_resource == proactive_resources_->end()) {
241
      ENVOY_LOG_MISC(warn, "Failed to get resource usage, resource monitor is not configured");
242
      return makeOptRefFromPtr<ProactiveResourceMonitor>(nullptr);
243
    }
244
3
    return proactive_resource->second.getProactiveResourceMonitorForTest();
245
3
  }
246

            
247
private:
248
  static const OverloadActionState always_inactive_;
249
  const NamedOverloadActionSymbolTable& action_symbol_table_;
250
  std::vector<OverloadActionState> actions_;
251
  std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>
252
      proactive_resources_;
253
};
254

            
255
const OverloadActionState ThreadLocalOverloadStateImpl::always_inactive_{UnitFloat::min()};
256

            
257
NamedOverloadActionSymbolTable::Symbol
258
54251
NamedOverloadActionSymbolTable::get(absl::string_view string) {
259
54251
  if (auto it = table_.find(string); it != table_.end()) {
260
843
    return Symbol(it->second);
261
843
  }
262

            
263
53408
  size_t index = table_.size();
264

            
265
53408
  names_.emplace_back(string);
266
53408
  table_.emplace(std::make_pair(string, index));
267

            
268
53408
  return Symbol(index);
269
54251
}
270

            
271
absl::optional<NamedOverloadActionSymbolTable::Symbol>
272
43469
NamedOverloadActionSymbolTable::lookup(absl::string_view string) const {
273
43469
  if (auto it = table_.find(string); it != table_.end()) {
274
83
    return Symbol(it->second);
275
83
  }
276
43386
  return absl::nullopt;
277
43469
}
278

            
279
2
const absl::string_view NamedOverloadActionSymbolTable::name(Symbol symbol) const {
280
2
  return names_.at(symbol.index());
281
2
}
282

            
283
absl::StatusOr<std::unique_ptr<OverloadAction>>
284
OverloadAction::create(const envoy::config::overload::v3::OverloadAction& config,
285
136
                       Stats::Scope& stats_scope) {
286
136
  absl::Status creation_status = absl::OkStatus();
287
136
  auto ret =
288
136
      std::unique_ptr<OverloadAction>(new OverloadAction(config, stats_scope, creation_status));
289
136
  RETURN_IF_NOT_OK(creation_status);
290
132
  return ret;
291
136
}
292

            
293
OverloadAction::OverloadAction(const envoy::config::overload::v3::OverloadAction& config,
294
                               Stats::Scope& stats_scope, absl::Status& creation_status)
295
136
    : state_(OverloadActionState::inactive()),
296
      active_gauge_(
297
136
          makeGauge(stats_scope, config.name(), "active", Stats::Gauge::ImportMode::NeverImport)),
298
136
      scale_percent_gauge_(makeGauge(stats_scope, config.name(), "scale_percent",
299
136
                                     Stats::Gauge::ImportMode::NeverImport)) {
300
157
  for (const auto& trigger_config : config.triggers()) {
301
157
    absl::StatusOr<TriggerPtr> trigger_or_error = createTriggerFromConfig(trigger_config);
302
157
    SET_AND_RETURN_IF_NOT_OK(trigger_or_error.status(), creation_status);
303
154
    if (!triggers_.try_emplace(trigger_config.name(), std::move(*trigger_or_error)).second) {
304
1
      creation_status = absl::InvalidArgumentError(
305
1
          absl::StrCat("Duplicate trigger resource for overload action ", config.name()));
306
1
      return;
307
1
    }
308
154
  }
309

            
310
132
  active_gauge_.set(0);
311
132
  scale_percent_gauge_.set(0);
312
132
}
313

            
314
124497
bool OverloadAction::updateResourcePressure(const std::string& name, double pressure) {
315
124497
  const OverloadActionState old_state = getState();
316

            
317
124497
  auto it = triggers_.find(name);
318
124497
  ASSERT(it != triggers_.end());
319
124497
  if (!it->second->updateValue(pressure)) {
320
124282
    return false;
321
124282
  }
322
215
  const auto trigger_new_state = it->second->actionState();
323
215
  active_gauge_.set(trigger_new_state.isSaturated() ? 1 : 0);
324
215
  scale_percent_gauge_.set(trigger_new_state.value().value() * 100);
325

            
326
215
  {
327
    // Compute the new state as the maximum over all trigger states.
328
215
    OverloadActionState new_state = OverloadActionState::inactive();
329
257
    for (auto& trigger : triggers_) {
330
257
      const auto trigger_state = trigger.second->actionState();
331
257
      if (trigger_state.value() > new_state.value()) {
332
180
        new_state = trigger_state;
333
180
      }
334
257
    }
335
215
    state_ = new_state;
336
215
  }
337

            
338
215
  return state_.value() != old_state.value();
339
124497
}
340

            
341
249205
OverloadActionState OverloadAction::getState() const { return state_; }
342

            
343
absl::StatusOr<std::unique_ptr<LoadShedPointImpl>>
344
LoadShedPointImpl::create(const envoy::config::overload::v3::LoadShedPoint& config,
345
46
                          Stats::Scope& stats_scope, Random::RandomGenerator& random_generator) {
346
46
  absl::Status creation_status = absl::OkStatus();
347
46
  auto ret = std::unique_ptr<LoadShedPointImpl>(
348
46
      new LoadShedPointImpl(config, stats_scope, random_generator, creation_status));
349
46
  RETURN_IF_NOT_OK(creation_status);
350
45
  return ret;
351
46
}
352
LoadShedPointImpl::LoadShedPointImpl(const envoy::config::overload::v3::LoadShedPoint& config,
353
                                     Stats::Scope& stats_scope,
354
                                     Random::RandomGenerator& random_generator,
355
                                     absl::Status& creation_status)
356
46
    : scale_percent_(makeGauge(stats_scope, config.name(), "scale_percent",
357
46
                               Stats::Gauge::ImportMode::NeverImport)),
358
46
      shed_load_counter_(makeCounter(stats_scope, config.name(), "shed_load_count")),
359
46
      random_generator_(random_generator) {
360
46
  for (const auto& trigger_config : config.triggers()) {
361
46
    auto trigger_or_error = createTriggerFromConfig(trigger_config);
362
46
    SET_AND_RETURN_IF_NOT_OK(trigger_or_error.status(), creation_status);
363
46
    if (!triggers_.try_emplace(trigger_config.name(), std::move(*trigger_or_error)).second) {
364
1
      creation_status = absl::InvalidArgumentError(
365
1
          absl::StrCat("Duplicate trigger resource for LoadShedPoint ", config.name()));
366
1
      return;
367
1
    }
368
46
  }
369
46
};
370

            
371
void LoadShedPointImpl::updateResource(absl::string_view resource_name,
372
1283
                                       double resource_utilization) {
373
1283
  auto it = triggers_.find(resource_name);
374
1283
  if (it == triggers_.end()) {
375
    return;
376
  }
377

            
378
1283
  it->second->updateValue(resource_utilization);
379
1283
  updateProbabilityShedLoad();
380
1283
}
381

            
382
1283
void LoadShedPointImpl::updateProbabilityShedLoad() {
383
1283
  float max_unit_float = 0.0f;
384
1291
  for (const auto& trigger : triggers_) {
385
1291
    max_unit_float = std::max(trigger.second->actionState().value().value(), max_unit_float);
386
1291
  }
387

            
388
1283
  probability_shed_load_.store(max_unit_float);
389

            
390
  // Update stats.
391
1283
  scale_percent_.set(100 * max_unit_float);
392
1283
}
393

            
394
90
bool LoadShedPointImpl::shouldShedLoad() {
395
90
  float unit_float_probability_shed_load = probability_shed_load_.load();
396
  // This should be ok as we're using unit float which saturates at 1.0f.
397
90
  if (unit_float_probability_shed_load == 1.0f) {
398
41
    shed_load_counter_.inc();
399
41
    return true;
400
41
  }
401

            
402
49
  if (random_generator_.bernoulli(UnitFloat(unit_float_probability_shed_load))) {
403
1
    shed_load_counter_.inc();
404
1
    return true;
405
1
  }
406
48
  return false;
407
49
}
408

            
409
absl::StatusOr<std::unique_ptr<OverloadManagerImpl>>
410
OverloadManagerImpl::create(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
411
                            ThreadLocal::SlotAllocator& slot_allocator,
412
                            const envoy::config::overload::v3::OverloadManager& config,
413
                            ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api,
414
10720
                            const Server::Options& options) {
415
10720
  absl::Status creation_status = absl::OkStatus();
416
10720
  auto ret = std::unique_ptr<OverloadManagerImpl>(
417
10720
      new OverloadManagerImpl(dispatcher, stats_scope, slot_allocator, config, validation_visitor,
418
10720
                              api, options, creation_status));
419
10720
  RETURN_IF_NOT_OK(creation_status);
420
10720
  return ret;
421
10720
}
422
OverloadManagerImpl::OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
423
                                         ThreadLocal::SlotAllocator& slot_allocator,
424
                                         const envoy::config::overload::v3::OverloadManager& config,
425
                                         ProtobufMessage::ValidationVisitor& validation_visitor,
426
                                         Api::Api& api, const Server::Options& options,
427
                                         absl::Status& creation_status)
428
10756
    : dispatcher_(dispatcher), time_source_(api.timeSource()), tls_(slot_allocator),
429
      refresh_interval_(
430
10756
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))),
431
10756
      refresh_interval_delays_(makeHistogram(stats_scope, "refresh_interval_delay",
432
10756
                                             Stats::Histogram::Unit::Milliseconds)),
433
10756
      proactive_resources_(
434
10756
          std::make_unique<
435
10756
              absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>()) {
436
10756
  Configuration::ResourceMonitorFactoryContextImpl context(dispatcher, options, api,
437
10756
                                                           validation_visitor);
438
  // We should hide impl details from users, for them there should be no distinction between
439
  // proactive and regular resource monitors in configuration API. But internally we will maintain
440
  // two distinct collections of proactive and regular resources. Proactive resources are not
441
  // subject to periodic flushes and can be recalculated/updated on demand by invoking
442
  // `tryAllocateResource/tryDeallocateResource` via thread local overload state.
443
10781
  for (const auto& resource : config.resource_monitors()) {
444
206
    const auto& name = resource.name();
445
    // Check if it is a proactive resource.
446
206
    auto proactive_resource_it =
447
206
        OverloadProactiveResources::get().proactive_action_name_to_resource_.find(name);
448
206
    ENVOY_LOG(debug, "Evaluating resource {}", name);
449
206
    bool result = false;
450
206
    if (proactive_resource_it !=
451
206
        OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
452
8
      ENVOY_LOG(debug, "Adding proactive resource monitor for {}", name);
453
8
      auto& factory =
454
8
          Config::Utility::getAndCheckFactory<Configuration::ProactiveResourceMonitorFactory>(
455
8
              resource);
456
8
      auto config =
457
8
          Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
458
8
      auto monitor = factory.createProactiveResourceMonitor(*config, context);
459
8
      result =
460
8
          proactive_resources_
461
8
              ->try_emplace(proactive_resource_it->second, name, std::move(monitor), stats_scope)
462
8
              .second;
463
202
    } else {
464
198
      ENVOY_LOG(debug, "Adding resource monitor for {}", name);
465
198
      auto& factory =
466
198
          Config::Utility::getAndCheckFactory<Configuration::ResourceMonitorFactory>(resource);
467
198
      auto config =
468
198
          Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
469
198
      auto monitor = factory.createResourceMonitor(*config, context);
470
198
      result = resources_.try_emplace(name, name, std::move(monitor), *this, stats_scope).second;
471
198
    }
472
206
    if (!result) {
473
2
      creation_status =
474
2
          absl::InvalidArgumentError(absl::StrCat("Duplicate resource monitor ", name));
475
2
      return;
476
2
    }
477
206
  }
478

            
479
10760
  for (const auto& action : config.actions()) {
480
137
    const auto& name = action.name();
481
137
    const auto symbol = action_symbol_table_.get(name);
482
137
    ENVOY_LOG(debug, "Adding overload action {}", name);
483

            
484
    // Validate that this is a well known overload action.
485
137
    const auto& well_known_actions = OverloadActionNames::get().WellKnownActions;
486
137
    if (std::find(well_known_actions.begin(), well_known_actions.end(), name) ==
487
137
        well_known_actions.end()) {
488
1
      creation_status =
489
1
          absl::InvalidArgumentError(absl::StrCat("Unknown Overload Manager Action ", name));
490
1
      return;
491
1
    }
492

            
493
    // TODO: use in place construction once https://github.com/abseil/abseil-cpp/issues/388 is
494
    // addressed
495
    // We cannot currently use in place construction as the OverloadAction constructor may fail,
496
    // causing an inconsistent internal state of the actions_ map, which on destruction results in
497
    // an invalid free.
498
136
    auto action_or_error = OverloadAction::create(action, stats_scope);
499
136
    SET_AND_RETURN_IF_NOT_OK(action_or_error.status(), creation_status);
500
132
    auto result = actions_.try_emplace(symbol, std::move(*action_or_error));
501
132
    if (!result.second) {
502
1
      creation_status =
503
1
          absl::InvalidArgumentError(absl::StrCat("Duplicate overload action ", name));
504
1
      return;
505
1
    }
506

            
507
131
    if (name == OverloadActionNames::get().ReduceTimeouts) {
508
63
      auto timer_or_error = parseTimerMinimums(action.typed_config(), validation_visitor);
509
63
      SET_AND_RETURN_IF_NOT_OK(timer_or_error.status(), creation_status);
510
63
      timer_minimums_ =
511
63
          std::make_shared<const Event::ScaledTimerTypeMap>(std::move(*timer_or_error));
512
90
    } else if (name == OverloadActionNames::get().ResetStreams) {
513
9
      if (!config.has_buffer_factory_config()) {
514
1
        creation_status = absl::InvalidArgumentError(
515
1
            fmt::format("Overload action \"{}\" requires buffer_factory_config.", name));
516
1
        return;
517
1
      }
518
8
      makeCounter(api.rootScope(), OverloadActionStatsNames::get().ResetStreamsCount);
519
67
    } else if (action.has_typed_config()) {
520
1
      creation_status = absl::InvalidArgumentError(fmt::format(
521
1
          "Overload action \"{}\" has an unexpected value for the typed_config field", name));
522
1
      return;
523
1
    }
524

            
525
152
    for (const auto& trigger : action.triggers()) {
526
152
      const std::string& resource = trigger.name();
527
152
      auto proactive_resource_it =
528
152
          OverloadProactiveResources::get().proactive_action_name_to_resource_.find(resource);
529

            
530
152
      if (resources_.find(resource) == resources_.end() &&
531
152
          proactive_resource_it ==
532
3
              OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
533
1
        creation_status = absl::InvalidArgumentError(
534
1
            fmt::format("Unknown trigger resource {} for overload action {}", resource, name));
535
1
        return;
536
1
      }
537
151
      resource_to_actions_.insert(std::make_pair(resource, symbol));
538
151
    }
539
129
  }
540

            
541
  // Validate the trigger resources for Load shedPoints.
542
10745
  for (const auto& point : config.loadshed_points()) {
543
47
    for (const auto& trigger : point.triggers()) {
544
47
      if (!resources_.contains(trigger.name())) {
545
1
        creation_status = absl::InvalidArgumentError(fmt::format(
546
1
            "Unknown trigger resource {} for loadshed point {}", trigger.name(), point.name()));
547
1
        return;
548
1
      }
549
47
    }
550

            
551
46
    auto load_shed_or_error =
552
46
        LoadShedPointImpl::create(point, api.rootScope(), api.randomGenerator());
553
46
    SET_AND_RETURN_IF_NOT_OK(load_shed_or_error.status(), creation_status);
554
45
    const auto result = loadshed_points_.try_emplace(point.name(), *std::move(load_shed_or_error));
555

            
556
45
    if (!result.second) {
557
1
      creation_status =
558
1
          absl::InvalidArgumentError(absl::StrCat("Duplicate loadshed point ", point.name()));
559
1
      return;
560
1
    }
561
45
  }
562
10745
}
563

            
564
10622
void OverloadManagerImpl::start() {
565
10622
  ASSERT(!started_);
566
10622
  started_ = true;
567

            
568
21258
  tls_.set([this](Event::Dispatcher&) {
569
21258
    return std::make_shared<ThreadLocalOverloadStateImpl>(action_symbol_table_,
570
21258
                                                          proactive_resources_);
571
21258
  });
572

            
573
10622
  if (resources_.empty()) {
574
10464
    return;
575
10464
  }
576

            
577
117595
  timer_ = dispatcher_.createTimer([this]() -> void {
578
    // Guarantee that all resource updates get flushed after no more than one refresh_interval_.
579
117595
    flushResourceUpdates();
580

            
581
    // Start a new flush epoch. If all resource updates complete before this callback runs, the last
582
    // resource update will call flushResourceUpdates to flush the whole batch early.
583
117595
    ++flush_epoch_;
584
117595
    flush_awaiting_updates_ = resources_.size() + proactive_resources_->size();
585

            
586
121728
    for (auto& resource : resources_) {
587
121728
      resource.second.update(flush_epoch_);
588
121728
    }
589

            
590
117595
    for (auto& resource : *proactive_resources_) {
591
4
      const double pressure = resource.second.updateResourcePressure();
592
4
      updateResourcePressure(OverloadProactiveResources::get().resourceToName(resource.first),
593
4
                             pressure, flush_epoch_);
594
4
    }
595

            
596
    // Record delay.
597
117595
    auto now = time_source_.monotonicTime();
598
117595
    std::chrono::milliseconds delay =
599
117595
        std::chrono::duration_cast<std::chrono::milliseconds>(now - time_resources_last_measured_);
600
117595
    refresh_interval_delays_.recordValue(delay.count());
601
117595
    time_resources_last_measured_ = now;
602

            
603
117595
    timer_->enableTimer(refresh_interval_);
604
117595
  });
605

            
606
158
  time_resources_last_measured_ = time_source_.monotonicTime();
607
158
  timer_->enableTimer(refresh_interval_);
608
158
}
609

            
610
10678
void OverloadManagerImpl::stop() {
611
  // Disable any pending timeouts.
612
10678
  if (timer_) {
613
148
    timer_->disableTimer();
614
148
  }
615

            
616
  // Clear the resource map to block on any pending updates.
617
10678
  resources_.clear();
618

            
619
  // TODO(nezdolik): wrap proactive monitors into atomic? and clear it here
620
10678
}
621

            
622
bool OverloadManagerImpl::registerForAction(const std::string& action,
623
                                            Event::Dispatcher& dispatcher,
624
54114
                                            OverloadActionCb callback) {
625
54114
  ASSERT(!started_);
626
54114
  const auto symbol = action_symbol_table_.get(action);
627

            
628
54114
  if (actions_.find(symbol) == actions_.end()) {
629
54032
    ENVOY_LOG(debug, "No overload action is configured for {}.", action);
630
54032
    return false;
631
54032
  }
632

            
633
82
  action_to_callbacks_.emplace(std::piecewise_construct, std::forward_as_tuple(symbol),
634
82
                               std::forward_as_tuple(dispatcher, callback));
635
82
  return true;
636
54114
}
637

            
638
42489
ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { return *tls_; }
639
10862
Event::ScaledRangeTimerManagerFactory OverloadManagerImpl::scaledTimerFactory() {
640
10862
  return [this](Event::Dispatcher& dispatcher) {
641
10862
    auto manager = createScaledRangeTimerManager(dispatcher, timer_minimums_);
642
10862
    registerForAction(OverloadActionNames::get().ReduceTimeouts, dispatcher,
643
10862
                      [manager = manager.get()](OverloadActionState scale_state) {
644
114
                        manager->setScaleFactor(
645
                            // The action state is 0 for no overload up to 1 for maximal overload,
646
                            // but the scale factor for timers is 1 for no scaling and 0 for maximal
647
                            // scaling, so invert the value to pass in (1-value).
648
114
                            scale_state.value().invert());
649
114
                      });
650
10862
    return manager;
651
10862
  };
652
10862
}
653

            
654
190729
LoadShedPoint* OverloadManagerImpl::getLoadShedPoint(absl::string_view point_name) {
655
190729
  if (auto it = loadshed_points_.find(point_name); it != loadshed_points_.end()) {
656
50
    return it->second.get();
657
50
  }
658
190679
  return nullptr;
659
190729
}
660

            
661
Event::ScaledRangeTimerManagerPtr OverloadManagerImpl::createScaledRangeTimerManager(
662
    Event::Dispatcher& dispatcher,
663
10859
    const Event::ScaledTimerTypeMapConstSharedPtr& timer_minimums) const {
664
10859
  return std::make_unique<Event::ScaledRangeTimerManagerImpl>(dispatcher, timer_minimums);
665
10859
}
666

            
667
void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure,
668
121725
                                                 FlushEpochId flush_epoch) {
669
121725
  auto [start, end] = resource_to_actions_.equal_range(resource);
670

            
671
125782
  std::for_each(start, end, [&](ResourceToActionMap::value_type& entry) {
672
124497
    const NamedOverloadActionSymbolTable::Symbol action = entry.second;
673
124497
    auto action_it = actions_.find(action);
674
124497
    ASSERT(action_it != actions_.end());
675
124497
    const OverloadActionState old_state = action_it->second->getState();
676
124497
    if (action_it->second->updateResourcePressure(resource, pressure)) {
677
211
      const auto state = action_it->second->getState();
678

            
679
211
      if (old_state.isSaturated() != state.isSaturated()) {
680
131
        ENVOY_LOG(debug, "Overload action {} became {}", action_symbol_table_.name(action),
681
131
                  (state.isSaturated() ? "saturated" : "scaling"));
682
131
      }
683

            
684
      // Record the updated value to be sent to workers on the next thread-local-state flush, along
685
      // with any update callbacks. This might overwrite a previous action state change caused by a
686
      // pressure update for a different resource that hasn't been flushed yet. That's okay because
687
      // the state recorded here includes the information from all previous resource updates. So
688
      // even if resource 1 causes an action to have value A, and a later update to resource 2
689
      // causes the action to have value B, B would have been the result for whichever order the
690
      // updates to resources 1 and 2 came in.
691
211
      state_updates_to_flush_.insert_or_assign(action, state);
692
211
      auto [callbacks_start, callbacks_end] = action_to_callbacks_.equal_range(action);
693
211
      std::for_each(callbacks_start, callbacks_end, [&](ActionToCallbackMap::value_type& cb_entry) {
694
162
        callbacks_to_flush_.insert_or_assign(&cb_entry.second, state);
695
162
      });
696
211
    }
697
124497
  });
698

            
699
121725
  for (auto& loadshed_point : loadshed_points_) {
700
1283
    loadshed_point.second->updateResource(resource, pressure);
701
1283
  }
702

            
703
  // Eagerly flush updates if this is the last call to updateResourcePressure expected for the
704
  // current epoch. This assert is always valid because flush_awaiting_updates_ is initialized
705
  // before each batch of updates, and even if a resource monitor performs a double update, or a
706
  // previous update callback is late, the logic in OverloadManager::Resource::update() will prevent
707
  // unexpected calls to this function.
708
121725
  ASSERT(flush_awaiting_updates_ > 0);
709
121725
  --flush_awaiting_updates_;
710
121725
  if (flush_epoch == flush_epoch_ && flush_awaiting_updates_ == 0) {
711
117587
    flushResourceUpdates();
712
117587
  }
713
121725
}
714

            
715
235182
void OverloadManagerImpl::flushResourceUpdates() {
716
235182
  if (!state_updates_to_flush_.empty()) {
717
206
    auto shared_updates = std::make_shared<
718
206
        absl::flat_hash_map<NamedOverloadActionSymbolTable::Symbol, OverloadActionState>>();
719
206
    std::swap(*shared_updates, state_updates_to_flush_);
720

            
721
206
    tls_.runOnAllThreads(
722
398
        [updates = std::move(shared_updates)](OptRef<ThreadLocalOverloadStateImpl> overload_state) {
723
406
          for (const auto& [action, state] : *updates) {
724
406
            overload_state->setState(action, state);
725
406
          }
726
398
        });
727
206
  }
728

            
729
235182
  for (const auto& [cb, state] : callbacks_to_flush_) {
730
162
    cb->dispatcher_.post([cb = cb, state = state]() { cb->callback_(state); });
731
162
  }
732
235182
  callbacks_to_flush_.clear();
733
235182
}
734

            
735
OverloadManagerImpl::Resource::Resource(const std::string& name, ResourceMonitorPtr monitor,
736
                                        OverloadManagerImpl& manager, Stats::Scope& stats_scope)
737
197
    : name_(name), monitor_(std::move(monitor)), manager_(manager),
738
      pressure_gauge_(
739
197
          makeGauge(stats_scope, name, "pressure", Stats::Gauge::ImportMode::NeverImport)),
740
197
      failed_updates_counter_(makeCounter(stats_scope, name, "failed_updates")),
741
197
      skipped_updates_counter_(makeCounter(stats_scope, name, "skipped_updates")) {}
742

            
743
121728
void OverloadManagerImpl::Resource::update(FlushEpochId flush_epoch) {
744
121728
  if (!pending_update_) {
745
121725
    pending_update_ = true;
746
121725
    flush_epoch_ = flush_epoch;
747
121725
    monitor_->updateResourceUsage(*this);
748
121725
    return;
749
121725
  }
750
3
  ENVOY_LOG(debug, "Skipping update for resource {} which has pending update", name_);
751
3
  skipped_updates_counter_.inc();
752
3
}
753

            
754
121721
void OverloadManagerImpl::Resource::onSuccess(const ResourceUsage& usage) {
755
121721
  pending_update_ = false;
756
121721
  manager_.updateResourcePressure(name_, usage.resource_pressure_, flush_epoch_);
757
121721
  pressure_gauge_.set(usage.resource_pressure_ * 100); // convert to percent
758
121721
}
759

            
760
2
void OverloadManagerImpl::Resource::onFailure(const EnvoyException& error) {
761
2
  pending_update_ = false;
762
2
  ENVOY_LOG(info, "Failed to update resource {}: {}", name_, error.what());
763
2
  failed_updates_counter_.inc();
764
2
}
765

            
766
} // namespace Server
767
} // namespace Envoy