Line data Source code
1 : #include "source/server/overload_manager_impl.h"
2 :
3 : #include <chrono>
4 :
5 : #include "envoy/common/exception.h"
6 : #include "envoy/config/overload/v3/overload.pb.h"
7 : #include "envoy/config/overload/v3/overload.pb.validate.h"
8 : #include "envoy/stats/scope.h"
9 :
10 : #include "source/common/common/fmt.h"
11 : #include "source/common/config/utility.h"
12 : #include "source/common/event/scaled_range_timer_manager_impl.h"
13 : #include "source/common/protobuf/utility.h"
14 : #include "source/common/stats/symbol_table.h"
15 : #include "source/server/resource_monitor_config_impl.h"
16 :
17 : #include "absl/container/node_hash_map.h"
18 : #include "absl/strings/str_cat.h"
19 :
20 : namespace Envoy {
21 : namespace Server {
22 : namespace {
23 :
24 : using TriggerPtr = std::unique_ptr<Trigger>;
25 :
26 : class ThresholdTriggerImpl final : public Trigger {
27 : public:
28 : ThresholdTriggerImpl(const envoy::config::overload::v3::ThresholdTrigger& config)
29 0 : : threshold_(config.value()), state_(OverloadActionState::inactive()) {}
30 :
31 0 : bool updateValue(double value) override {
32 0 : const OverloadActionState state = actionState();
33 0 : state_ =
34 0 : value >= threshold_ ? OverloadActionState::saturated() : OverloadActionState::inactive();
35 : // This is a floating point comparison, though state_ is always either
36 : // saturated or inactive so there's no risk due to floating point precision.
37 0 : return state.value() != actionState().value();
38 0 : }
39 :
40 0 : OverloadActionState actionState() const override { return state_; }
41 :
42 : private:
43 : const double threshold_;
44 : OverloadActionState state_;
45 : };
46 :
47 : class ScaledTriggerImpl final : public Trigger {
48 : public:
49 : ScaledTriggerImpl(const envoy::config::overload::v3::ScaledTrigger& config)
50 : : scaling_threshold_(config.scaling_threshold()),
51 : saturated_threshold_(config.saturation_threshold()),
52 0 : state_(OverloadActionState::inactive()) {
53 0 : if (scaling_threshold_ >= saturated_threshold_) {
54 0 : throw EnvoyException("scaling_threshold must be less than saturation_threshold");
55 0 : }
56 0 : }
57 :
58 0 : bool updateValue(double value) override {
59 0 : const OverloadActionState old_state = actionState();
60 0 : if (value <= scaling_threshold_) {
61 0 : state_ = OverloadActionState::inactive();
62 0 : } else if (value >= saturated_threshold_) {
63 0 : state_ = OverloadActionState::saturated();
64 0 : } else {
65 0 : state_ = OverloadActionState(
66 0 : UnitFloat((value - scaling_threshold_) / (saturated_threshold_ - scaling_threshold_)));
67 0 : }
68 : // All values of state_ are produced via this same code path. Even if
69 : // old_state and state_ should be approximately equal, there's no harm in
70 : // signaling for a small change if they're not float::operator== equal.
71 0 : return state_.value() != old_state.value();
72 0 : }
73 :
74 0 : OverloadActionState actionState() const override { return state_; }
75 :
76 : private:
77 : const double scaling_threshold_;
78 : const double saturated_threshold_;
79 : OverloadActionState state_;
80 : };
81 :
82 0 : TriggerPtr createTriggerFromConfig(const envoy::config::overload::v3::Trigger& trigger_config) {
83 0 : TriggerPtr trigger;
84 :
85 0 : switch (trigger_config.trigger_oneof_case()) {
86 0 : case envoy::config::overload::v3::Trigger::TriggerOneofCase::kThreshold:
87 0 : trigger = std::make_unique<ThresholdTriggerImpl>(trigger_config.threshold());
88 0 : break;
89 0 : case envoy::config::overload::v3::Trigger::TriggerOneofCase::kScaled:
90 0 : trigger = std::make_unique<ScaledTriggerImpl>(trigger_config.scaled());
91 0 : break;
92 0 : case envoy::config::overload::v3::Trigger::TriggerOneofCase::TRIGGER_ONEOF_NOT_SET:
93 0 : throw EnvoyException(absl::StrCat("action not set for trigger ", trigger_config.name()));
94 0 : }
95 :
96 0 : return trigger;
97 0 : }
98 :
99 0 : Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view name_of_stat) {
100 0 : Stats::StatNameManagedStorage stat_name(name_of_stat, scope.symbolTable());
101 0 : return scope.counterFromStatName(stat_name.statName());
102 0 : }
103 :
104 0 : Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view a, absl::string_view b) {
105 0 : return makeCounter(scope, absl::StrCat("overload.", a, ".", b));
106 0 : }
107 :
108 : Stats::Gauge& makeGauge(Stats::Scope& scope, absl::string_view a, absl::string_view b,
109 0 : Stats::Gauge::ImportMode import_mode) {
110 0 : Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", a, ".", b),
111 0 : scope.symbolTable());
112 0 : return scope.gaugeFromStatName(stat_name.statName(), import_mode);
113 0 : }
114 :
115 : Stats::Histogram& makeHistogram(Stats::Scope& scope, absl::string_view name,
116 248 : Stats::Histogram::Unit unit) {
117 248 : Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", name), scope.symbolTable());
118 248 : return scope.histogramFromStatName(stat_name.statName(), unit);
119 248 : }
120 :
121 : Event::ScaledTimerType parseTimerType(
122 0 : envoy::config::overload::v3::ScaleTimersOverloadActionConfig::TimerType config_timer_type) {
123 0 : using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
124 :
125 0 : switch (config_timer_type) {
126 0 : case Config::HTTP_DOWNSTREAM_CONNECTION_IDLE:
127 0 : return Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout;
128 0 : case Config::HTTP_DOWNSTREAM_STREAM_IDLE:
129 0 : return Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout;
130 0 : case Config::TRANSPORT_SOCKET_CONNECT:
131 0 : return Event::ScaledTimerType::TransportSocketConnectTimeout;
132 0 : default:
133 0 : throw EnvoyException(fmt::format("Unknown timer type {}", config_timer_type));
134 0 : }
135 0 : }
136 :
137 : Event::ScaledTimerTypeMap
138 : parseTimerMinimums(const ProtobufWkt::Any& typed_config,
139 0 : ProtobufMessage::ValidationVisitor& validation_visitor) {
140 0 : using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
141 0 : const Config action_config =
142 0 : MessageUtil::anyConvertAndValidate<Config>(typed_config, validation_visitor);
143 :
144 0 : Event::ScaledTimerTypeMap timer_map;
145 :
146 0 : for (const auto& scale_timer : action_config.timer_scale_factors()) {
147 0 : const Event::ScaledTimerType timer_type = parseTimerType(scale_timer.timer());
148 :
149 0 : const Event::ScaledTimerMinimum minimum =
150 0 : scale_timer.has_min_timeout()
151 0 : ? Event::ScaledTimerMinimum(Event::AbsoluteMinimum(std::chrono::milliseconds(
152 0 : DurationUtil::durationToMilliseconds(scale_timer.min_timeout()))))
153 0 : : Event::ScaledTimerMinimum(
154 0 : Event::ScaledMinimum(UnitFloat(scale_timer.min_scale().value() / 100.0)));
155 :
156 0 : auto [_, inserted] = timer_map.insert(std::make_pair(timer_type, minimum));
157 0 : UNREFERENCED_PARAMETER(_);
158 0 : if (!inserted) {
159 0 : throw EnvoyException(fmt::format("Found duplicate entry for timer type {}",
160 0 : Config::TimerType_Name(scale_timer.timer())));
161 0 : }
162 0 : }
163 :
164 0 : return timer_map;
165 0 : }
166 :
167 : } // namespace
168 :
169 : /**
170 : * Thread-local copy of the state of each configured overload action.
171 : */
172 : class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState {
173 : public:
174 : explicit ThreadLocalOverloadStateImpl(
175 : const NamedOverloadActionSymbolTable& action_symbol_table,
176 : std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>&
177 : proactive_resources)
178 : : action_symbol_table_(action_symbol_table),
179 : actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())),
180 192 : proactive_resources_(proactive_resources) {}
181 :
182 1568 : const OverloadActionState& getState(const std::string& action) override {
183 1568 : if (const auto symbol = action_symbol_table_.lookup(action); symbol != absl::nullopt) {
184 0 : return actions_[symbol->index()];
185 0 : }
186 1568 : return always_inactive_;
187 1568 : }
188 :
189 0 : void setState(NamedOverloadActionSymbolTable::Symbol action, OverloadActionState state) {
190 0 : actions_[action.index()] = state;
191 0 : }
192 :
193 : bool tryAllocateResource(OverloadProactiveResourceName resource_name,
194 0 : int64_t increment) override {
195 0 : const auto proactive_resource = proactive_resources_->find(resource_name);
196 0 : if (proactive_resource == proactive_resources_->end()) {
197 0 : ENVOY_LOG_MISC(warn, "Failed to allocate resource usage, resource monitor is not configured");
198 0 : return false;
199 0 : }
200 :
201 0 : return proactive_resource->second.tryAllocateResource(increment);
202 0 : }
203 :
204 : bool tryDeallocateResource(OverloadProactiveResourceName resource_name,
205 0 : int64_t decrement) override {
206 0 : const auto proactive_resource = proactive_resources_->find(resource_name);
207 0 : if (proactive_resource == proactive_resources_->end()) {
208 0 : ENVOY_LOG_MISC(warn,
209 0 : "Failed to deallocate resource usage, resource monitor is not configured");
210 0 : return false;
211 0 : }
212 :
213 0 : return proactive_resource->second.tryDeallocateResource(decrement);
214 0 : }
215 :
216 208 : bool isResourceMonitorEnabled(OverloadProactiveResourceName resource_name) override {
217 208 : const auto proactive_resource = proactive_resources_->find(resource_name);
218 208 : return proactive_resource != proactive_resources_->end();
219 208 : }
220 :
221 : ProactiveResourceMonitorOptRef
222 0 : getProactiveResourceMonitorForTest(OverloadProactiveResourceName resource_name) override {
223 0 : const auto proactive_resource = proactive_resources_->find(resource_name);
224 0 : if (proactive_resource == proactive_resources_->end()) {
225 0 : ENVOY_LOG_MISC(warn, "Failed to get resource usage, resource monitor is not configured");
226 0 : return makeOptRefFromPtr<ProactiveResourceMonitor>(nullptr);
227 0 : }
228 0 : return proactive_resource->second.getProactiveResourceMonitorForTest();
229 0 : }
230 :
231 : private:
232 : static const OverloadActionState always_inactive_;
233 : const NamedOverloadActionSymbolTable& action_symbol_table_;
234 : std::vector<OverloadActionState> actions_;
235 : std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>
236 : proactive_resources_;
237 : };
238 :
239 : const OverloadActionState ThreadLocalOverloadStateImpl::always_inactive_{UnitFloat::min()};
240 :
241 : NamedOverloadActionSymbolTable::Symbol
242 656 : NamedOverloadActionSymbolTable::get(absl::string_view string) {
243 656 : if (auto it = table_.find(string); it != table_.end()) {
244 0 : return Symbol(it->second);
245 0 : }
246 :
247 656 : size_t index = table_.size();
248 :
249 656 : names_.emplace_back(string);
250 656 : table_.emplace(std::make_pair(string, index));
251 :
252 656 : return Symbol(index);
253 656 : }
254 :
255 : absl::optional<NamedOverloadActionSymbolTable::Symbol>
256 1568 : NamedOverloadActionSymbolTable::lookup(absl::string_view string) const {
257 1568 : if (auto it = table_.find(string); it != table_.end()) {
258 0 : return Symbol(it->second);
259 0 : }
260 1568 : return absl::nullopt;
261 1568 : }
262 :
263 0 : const absl::string_view NamedOverloadActionSymbolTable::name(Symbol symbol) const {
264 0 : return names_.at(symbol.index());
265 0 : }
266 :
267 : OverloadAction::OverloadAction(const envoy::config::overload::v3::OverloadAction& config,
268 : Stats::Scope& stats_scope)
269 : : state_(OverloadActionState::inactive()),
270 : active_gauge_(
271 : makeGauge(stats_scope, config.name(), "active", Stats::Gauge::ImportMode::NeverImport)),
272 : scale_percent_gauge_(makeGauge(stats_scope, config.name(), "scale_percent",
273 0 : Stats::Gauge::ImportMode::NeverImport)) {
274 0 : for (const auto& trigger_config : config.triggers()) {
275 0 : if (!triggers_.try_emplace(trigger_config.name(), createTriggerFromConfig(trigger_config))
276 0 : .second) {
277 0 : throw EnvoyException(
278 0 : absl::StrCat("Duplicate trigger resource for overload action ", config.name()));
279 0 : }
280 0 : }
281 :
282 0 : active_gauge_.set(0);
283 0 : scale_percent_gauge_.set(0);
284 0 : }
285 :
286 0 : bool OverloadAction::updateResourcePressure(const std::string& name, double pressure) {
287 0 : const OverloadActionState old_state = getState();
288 :
289 0 : auto it = triggers_.find(name);
290 0 : ASSERT(it != triggers_.end());
291 0 : if (!it->second->updateValue(pressure)) {
292 0 : return false;
293 0 : }
294 0 : const auto trigger_new_state = it->second->actionState();
295 0 : active_gauge_.set(trigger_new_state.isSaturated() ? 1 : 0);
296 0 : scale_percent_gauge_.set(trigger_new_state.value().value() * 100);
297 :
298 0 : {
299 : // Compute the new state as the maximum over all trigger states.
300 0 : OverloadActionState new_state = OverloadActionState::inactive();
301 0 : for (auto& trigger : triggers_) {
302 0 : const auto trigger_state = trigger.second->actionState();
303 0 : if (trigger_state.value() > new_state.value()) {
304 0 : new_state = trigger_state;
305 0 : }
306 0 : }
307 0 : state_ = new_state;
308 0 : }
309 :
310 0 : return state_.value() != old_state.value();
311 0 : }
312 :
313 0 : OverloadActionState OverloadAction::getState() const { return state_; }
314 :
315 : LoadShedPointImpl::LoadShedPointImpl(const envoy::config::overload::v3::LoadShedPoint& config,
316 : Stats::Scope& stats_scope,
317 : Random::RandomGenerator& random_generator)
318 : : scale_percent_(makeGauge(stats_scope, config.name(), "scale_percent",
319 : Stats::Gauge::ImportMode::NeverImport)),
320 0 : random_generator_(random_generator) {
321 0 : for (const auto& trigger_config : config.triggers()) {
322 0 : if (!triggers_.try_emplace(trigger_config.name(), createTriggerFromConfig(trigger_config))
323 0 : .second) {
324 0 : throw EnvoyException(
325 0 : absl::StrCat("Duplicate trigger resource for LoadShedPoint ", config.name()));
326 0 : }
327 0 : }
328 0 : };
329 :
330 : void LoadShedPointImpl::updateResource(absl::string_view resource_name,
331 0 : double resource_utilization) {
332 0 : auto it = triggers_.find(resource_name);
333 0 : if (it == triggers_.end()) {
334 0 : return;
335 0 : }
336 :
337 0 : it->second->updateValue(resource_utilization);
338 0 : updateProbabilityShedLoad();
339 0 : }
340 :
341 0 : void LoadShedPointImpl::updateProbabilityShedLoad() {
342 0 : float max_unit_float = 0.0f;
343 0 : for (const auto& trigger : triggers_) {
344 0 : max_unit_float = std::max(trigger.second->actionState().value().value(), max_unit_float);
345 0 : }
346 :
347 0 : probability_shed_load_.store(max_unit_float);
348 :
349 : // Update stats.
350 0 : scale_percent_.set(100 * max_unit_float);
351 0 : }
352 :
353 0 : bool LoadShedPointImpl::shouldShedLoad() {
354 0 : float unit_float_probability_shed_load = probability_shed_load_.load();
355 : // This should be ok as we're using unit float which saturates at 1.0f.
356 0 : if (unit_float_probability_shed_load == 1.0f) {
357 0 : return true;
358 0 : }
359 :
360 0 : return random_generator_.bernoulli(UnitFloat(unit_float_probability_shed_load));
361 0 : }
362 :
363 : OverloadManagerImpl::OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
364 : ThreadLocal::SlotAllocator& slot_allocator,
365 : const envoy::config::overload::v3::OverloadManager& config,
366 : ProtobufMessage::ValidationVisitor& validation_visitor,
367 : Api::Api& api, const Server::Options& options)
368 : : dispatcher_(dispatcher), time_source_(api.timeSource()), tls_(slot_allocator),
369 : refresh_interval_(
370 : std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))),
371 : refresh_interval_delays_(makeHistogram(stats_scope, "refresh_interval_delay",
372 : Stats::Histogram::Unit::Milliseconds)),
373 : proactive_resources_(
374 : std::make_unique<
375 248 : absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>()) {
376 248 : Configuration::ResourceMonitorFactoryContextImpl context(dispatcher, options, api,
377 248 : validation_visitor);
378 : // We should hide impl details from users, for them there should be no distinction between
379 : // proactive and regular resource monitors in configuration API. But internally we will maintain
380 : // two distinct collections of proactive and regular resources. Proactive resources are not
381 : // subject to periodic flushes and can be recalculated/updated on demand by invoking
382 : // `tryAllocateResource/tryDeallocateResource` via thread local overload state.
383 248 : for (const auto& resource : config.resource_monitors()) {
384 6 : const auto& name = resource.name();
385 : // Check if it is a proactive resource.
386 6 : auto proactive_resource_it =
387 6 : OverloadProactiveResources::get().proactive_action_name_to_resource_.find(name);
388 6 : ENVOY_LOG(debug, "Evaluating resource {}", name);
389 6 : bool result = false;
390 6 : if (proactive_resource_it !=
391 6 : OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
392 0 : ENVOY_LOG(debug, "Adding proactive resource monitor for {}", name);
393 0 : auto& factory =
394 0 : Config::Utility::getAndCheckFactory<Configuration::ProactiveResourceMonitorFactory>(
395 0 : resource);
396 0 : auto config =
397 0 : Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
398 0 : auto monitor = factory.createProactiveResourceMonitor(*config, context);
399 0 : result =
400 0 : proactive_resources_
401 0 : ->try_emplace(proactive_resource_it->second, name, std::move(monitor), stats_scope)
402 0 : .second;
403 6 : } else {
404 6 : ENVOY_LOG(debug, "Adding resource monitor for {}", name);
405 6 : auto& factory =
406 6 : Config::Utility::getAndCheckFactory<Configuration::ResourceMonitorFactory>(resource);
407 6 : auto config =
408 6 : Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
409 6 : auto monitor = factory.createResourceMonitor(*config, context);
410 6 : result = resources_.try_emplace(name, name, std::move(monitor), *this, stats_scope).second;
411 6 : }
412 6 : if (!result) {
413 0 : throw EnvoyException(absl::StrCat("Duplicate resource monitor ", name));
414 0 : }
415 6 : }
416 :
417 248 : for (const auto& action : config.actions()) {
418 0 : const auto& name = action.name();
419 0 : const auto symbol = action_symbol_table_.get(name);
420 0 : ENVOY_LOG(debug, "Adding overload action {}", name);
421 :
422 : // Validate that this is a well known overload action.
423 0 : if (Runtime::runtimeFeatureEnabled(
424 0 : "envoy.reloadable_features.overload_manager_error_unknown_action")) {
425 0 : auto& well_known_actions = OverloadActionNames::get().WellKnownActions;
426 0 : if (std::find(well_known_actions.begin(), well_known_actions.end(), name) ==
427 0 : well_known_actions.end()) {
428 0 : throw EnvoyException(absl::StrCat("Unknown Overload Manager Action ", name));
429 0 : }
430 0 : }
431 :
432 : // TODO: use in place construction once https://github.com/abseil/abseil-cpp/issues/388 is
433 : // addressed
434 : // We cannot currently use in place construction as the OverloadAction constructor may throw,
435 : // causing an inconsistent internal state of the actions_ map, which on destruction results in
436 : // an invalid free.
437 0 : auto result = actions_.try_emplace(symbol, OverloadAction(action, stats_scope));
438 0 : if (!result.second) {
439 0 : throw EnvoyException(absl::StrCat("Duplicate overload action ", name));
440 0 : }
441 :
442 0 : if (name == OverloadActionNames::get().ReduceTimeouts) {
443 0 : timer_minimums_ = std::make_shared<const Event::ScaledTimerTypeMap>(
444 0 : parseTimerMinimums(action.typed_config(), validation_visitor));
445 0 : } else if (name == OverloadActionNames::get().ResetStreams) {
446 0 : if (!config.has_buffer_factory_config()) {
447 0 : throw EnvoyException(
448 0 : fmt::format("Overload action \"{}\" requires buffer_factory_config.", name));
449 0 : }
450 0 : makeCounter(api.rootScope(), OverloadActionStatsNames::get().ResetStreamsCount);
451 0 : } else if (action.has_typed_config()) {
452 0 : throw EnvoyException(fmt::format(
453 0 : "Overload action \"{}\" has an unexpected value for the typed_config field", name));
454 0 : }
455 :
456 0 : for (const auto& trigger : action.triggers()) {
457 0 : const std::string& resource = trigger.name();
458 0 : auto proactive_resource_it =
459 0 : OverloadProactiveResources::get().proactive_action_name_to_resource_.find(resource);
460 :
461 0 : if (resources_.find(resource) == resources_.end() &&
462 0 : proactive_resource_it ==
463 0 : OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
464 0 : throw EnvoyException(
465 0 : fmt::format("Unknown trigger resource {} for overload action {}", resource, name));
466 0 : }
467 0 : resource_to_actions_.insert(std::make_pair(resource, symbol));
468 0 : }
469 0 : }
470 :
471 : // Validate the trigger resources for Load shedPoints.
472 248 : for (const auto& point : config.loadshed_points()) {
473 0 : for (const auto& trigger : point.triggers()) {
474 0 : if (!resources_.contains(trigger.name())) {
475 0 : throw EnvoyException(fmt::format("Unknown trigger resource {} for loadshed point {}",
476 0 : trigger.name(), point.name()));
477 0 : }
478 0 : }
479 :
480 0 : const auto result = loadshed_points_.try_emplace(
481 0 : point.name(),
482 0 : std::make_unique<LoadShedPointImpl>(point, api.rootScope(), api.randomGenerator()));
483 :
484 0 : if (!result.second) {
485 0 : throw EnvoyException(absl::StrCat("Duplicate loadshed point ", point.name()));
486 0 : }
487 0 : }
488 248 : }
489 :
490 98 : void OverloadManagerImpl::start() {
491 98 : ASSERT(!started_);
492 98 : started_ = true;
493 :
494 192 : tls_.set([this](Event::Dispatcher&) {
495 192 : return std::make_shared<ThreadLocalOverloadStateImpl>(action_symbol_table_,
496 192 : proactive_resources_);
497 192 : });
498 :
499 98 : if (resources_.empty()) {
500 98 : return;
501 98 : }
502 :
503 0 : timer_ = dispatcher_.createTimer([this]() -> void {
504 : // Guarantee that all resource updates get flushed after no more than one refresh_interval_.
505 0 : flushResourceUpdates();
506 :
507 : // Start a new flush epoch. If all resource updates complete before this callback runs, the last
508 : // resource update will call flushResourceUpdates to flush the whole batch early.
509 0 : ++flush_epoch_;
510 0 : flush_awaiting_updates_ = resources_.size();
511 :
512 0 : for (auto& resource : resources_) {
513 0 : resource.second.update(flush_epoch_);
514 0 : }
515 :
516 : // Record delay.
517 0 : auto now = time_source_.monotonicTime();
518 0 : std::chrono::milliseconds delay =
519 0 : std::chrono::duration_cast<std::chrono::milliseconds>(now - time_resources_last_measured_);
520 0 : refresh_interval_delays_.recordValue(delay.count());
521 0 : time_resources_last_measured_ = now;
522 :
523 0 : timer_->enableTimer(refresh_interval_);
524 0 : });
525 :
526 0 : time_resources_last_measured_ = time_source_.monotonicTime();
527 0 : timer_->enableTimer(refresh_interval_);
528 0 : }
529 :
530 132 : void OverloadManagerImpl::stop() {
531 : // Disable any pending timeouts.
532 132 : if (timer_) {
533 0 : timer_->disableTimer();
534 0 : }
535 :
536 : // Clear the resource map to block on any pending updates.
537 132 : resources_.clear();
538 :
539 : // TODO(nezdolik): wrap proactive monitors into atomic? and clear it here
540 132 : }
541 :
542 : bool OverloadManagerImpl::registerForAction(const std::string& action,
543 : Event::Dispatcher& dispatcher,
544 656 : OverloadActionCb callback) {
545 656 : ASSERT(!started_);
546 656 : const auto symbol = action_symbol_table_.get(action);
547 :
548 656 : if (actions_.find(symbol) == actions_.end()) {
549 656 : ENVOY_LOG(debug, "No overload action is configured for {}.", action);
550 656 : return false;
551 656 : }
552 :
553 0 : action_to_callbacks_.emplace(std::piecewise_construct, std::forward_as_tuple(symbol),
554 0 : std::forward_as_tuple(dispatcher, callback));
555 0 : return true;
556 656 : }
557 :
558 992 : ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { return *tls_; }
559 131 : Event::ScaledRangeTimerManagerFactory OverloadManagerImpl::scaledTimerFactory() {
560 131 : return [this](Event::Dispatcher& dispatcher) {
561 131 : auto manager = createScaledRangeTimerManager(dispatcher, timer_minimums_);
562 131 : registerForAction(OverloadActionNames::get().ReduceTimeouts, dispatcher,
563 131 : [manager = manager.get()](OverloadActionState scale_state) {
564 0 : manager->setScaleFactor(
565 : // The action state is 0 for no overload up to 1 for maximal overload,
566 : // but the scale factor for timers is 1 for no scaling and 0 for maximal
567 : // scaling, so invert the value to pass in (1-value).
568 0 : scale_state.value().invert());
569 0 : });
570 131 : return manager;
571 131 : };
572 131 : }
573 :
574 1524 : LoadShedPoint* OverloadManagerImpl::getLoadShedPoint(absl::string_view point_name) {
575 1524 : if (auto it = loadshed_points_.find(point_name); it != loadshed_points_.end()) {
576 0 : return it->second.get();
577 0 : }
578 1524 : return nullptr;
579 1524 : }
580 :
581 : Event::ScaledRangeTimerManagerPtr OverloadManagerImpl::createScaledRangeTimerManager(
582 : Event::Dispatcher& dispatcher,
583 131 : const Event::ScaledTimerTypeMapConstSharedPtr& timer_minimums) const {
584 131 : return std::make_unique<Event::ScaledRangeTimerManagerImpl>(dispatcher, timer_minimums);
585 131 : }
586 :
587 : void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure,
588 0 : FlushEpochId flush_epoch) {
589 0 : auto [start, end] = resource_to_actions_.equal_range(resource);
590 :
591 0 : std::for_each(start, end, [&](ResourceToActionMap::value_type& entry) {
592 0 : const NamedOverloadActionSymbolTable::Symbol action = entry.second;
593 0 : auto action_it = actions_.find(action);
594 0 : ASSERT(action_it != actions_.end());
595 0 : const OverloadActionState old_state = action_it->second.getState();
596 0 : if (action_it->second.updateResourcePressure(resource, pressure)) {
597 0 : const auto state = action_it->second.getState();
598 :
599 0 : if (old_state.isSaturated() != state.isSaturated()) {
600 0 : ENVOY_LOG(debug, "Overload action {} became {}", action_symbol_table_.name(action),
601 0 : (state.isSaturated() ? "saturated" : "scaling"));
602 0 : }
603 :
604 : // Record the updated value to be sent to workers on the next thread-local-state flush, along
605 : // with any update callbacks. This might overwrite a previous action state change caused by a
606 : // pressure update for a different resource that hasn't been flushed yet. That's okay because
607 : // the state recorded here includes the information from all previous resource updates. So
608 : // even if resource 1 causes an action to have value A, and a later update to resource 2
609 : // causes the action to have value B, B would have been the result for whichever order the
610 : // updates to resources 1 and 2 came in.
611 0 : state_updates_to_flush_.insert_or_assign(action, state);
612 0 : auto [callbacks_start, callbacks_end] = action_to_callbacks_.equal_range(action);
613 0 : std::for_each(callbacks_start, callbacks_end, [&](ActionToCallbackMap::value_type& cb_entry) {
614 0 : callbacks_to_flush_.insert_or_assign(&cb_entry.second, state);
615 0 : });
616 0 : }
617 0 : });
618 :
619 0 : for (auto& loadshed_point : loadshed_points_) {
620 0 : loadshed_point.second->updateResource(resource, pressure);
621 0 : }
622 :
623 : // Eagerly flush updates if this is the last call to updateResourcePressure expected for the
624 : // current epoch. This assert is always valid because flush_awaiting_updates_ is initialized
625 : // before each batch of updates, and even if a resource monitor performs a double update, or a
626 : // previous update callback is late, the logic in OverloadManager::Resource::update() will prevent
627 : // unexpected calls to this function.
628 0 : ASSERT(flush_awaiting_updates_ > 0);
629 0 : --flush_awaiting_updates_;
630 0 : if (flush_epoch == flush_epoch_ && flush_awaiting_updates_ == 0) {
631 0 : flushResourceUpdates();
632 0 : }
633 0 : }
634 :
635 0 : void OverloadManagerImpl::flushResourceUpdates() {
636 0 : if (!state_updates_to_flush_.empty()) {
637 0 : auto shared_updates = std::make_shared<
638 0 : absl::flat_hash_map<NamedOverloadActionSymbolTable::Symbol, OverloadActionState>>();
639 0 : std::swap(*shared_updates, state_updates_to_flush_);
640 :
641 0 : tls_.runOnAllThreads(
642 0 : [updates = std::move(shared_updates)](OptRef<ThreadLocalOverloadStateImpl> overload_state) {
643 0 : for (const auto& [action, state] : *updates) {
644 0 : overload_state->setState(action, state);
645 0 : }
646 0 : });
647 0 : }
648 :
649 0 : for (const auto& [cb, state] : callbacks_to_flush_) {
650 0 : cb->dispatcher_.post([cb = cb, state = state]() { cb->callback_(state); });
651 0 : }
652 0 : callbacks_to_flush_.clear();
653 0 : }
654 :
655 : OverloadManagerImpl::Resource::Resource(const std::string& name, ResourceMonitorPtr monitor,
656 : OverloadManagerImpl& manager, Stats::Scope& stats_scope)
657 : : name_(name), monitor_(std::move(monitor)), manager_(manager),
658 : pressure_gauge_(
659 : makeGauge(stats_scope, name, "pressure", Stats::Gauge::ImportMode::NeverImport)),
660 : failed_updates_counter_(makeCounter(stats_scope, name, "failed_updates")),
661 0 : skipped_updates_counter_(makeCounter(stats_scope, name, "skipped_updates")) {}
662 :
663 0 : void OverloadManagerImpl::Resource::update(FlushEpochId flush_epoch) {
664 0 : if (!pending_update_) {
665 0 : pending_update_ = true;
666 0 : flush_epoch_ = flush_epoch;
667 0 : monitor_->updateResourceUsage(*this);
668 0 : return;
669 0 : }
670 0 : ENVOY_LOG(debug, "Skipping update for resource {} which has pending update", name_);
671 0 : skipped_updates_counter_.inc();
672 0 : }
673 :
674 0 : void OverloadManagerImpl::Resource::onSuccess(const ResourceUsage& usage) {
675 0 : pending_update_ = false;
676 0 : manager_.updateResourcePressure(name_, usage.resource_pressure_, flush_epoch_);
677 0 : pressure_gauge_.set(usage.resource_pressure_ * 100); // convert to percent
678 0 : }
679 :
680 0 : void OverloadManagerImpl::Resource::onFailure(const EnvoyException& error) {
681 0 : pending_update_ = false;
682 0 : ENVOY_LOG(info, "Failed to update resource {}: {}", name_, error.what());
683 0 : failed_updates_counter_.inc();
684 0 : }
685 :
686 : } // namespace Server
687 : } // namespace Envoy
|