LCOV - code coverage report
Current view: top level - source/server - overload_manager_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 87 438 19.9 %
Date: 2024-01-05 06:35:25 Functions: 16 52 30.8 %

          Line data    Source code
       1             : #include "source/server/overload_manager_impl.h"
       2             : 
       3             : #include <chrono>
       4             : 
       5             : #include "envoy/common/exception.h"
       6             : #include "envoy/config/overload/v3/overload.pb.h"
       7             : #include "envoy/config/overload/v3/overload.pb.validate.h"
       8             : #include "envoy/stats/scope.h"
       9             : 
      10             : #include "source/common/common/fmt.h"
      11             : #include "source/common/config/utility.h"
      12             : #include "source/common/event/scaled_range_timer_manager_impl.h"
      13             : #include "source/common/protobuf/utility.h"
      14             : #include "source/common/stats/symbol_table.h"
      15             : #include "source/server/resource_monitor_config_impl.h"
      16             : 
      17             : #include "absl/container/node_hash_map.h"
      18             : #include "absl/strings/str_cat.h"
      19             : 
      20             : namespace Envoy {
      21             : namespace Server {
      22             : namespace {
      23             : 
      24             : using TriggerPtr = std::unique_ptr<Trigger>;
      25             : 
      26             : class ThresholdTriggerImpl final : public Trigger {
      27             : public:
      28             :   ThresholdTriggerImpl(const envoy::config::overload::v3::ThresholdTrigger& config)
      29           0 :       : threshold_(config.value()), state_(OverloadActionState::inactive()) {}
      30             : 
      31           0 :   bool updateValue(double value) override {
      32           0 :     const OverloadActionState state = actionState();
      33           0 :     state_ =
      34           0 :         value >= threshold_ ? OverloadActionState::saturated() : OverloadActionState::inactive();
      35             :     // This is a floating point comparison, though state_ is always either
      36             :     // saturated or inactive so there's no risk due to floating point precision.
      37           0 :     return state.value() != actionState().value();
      38           0 :   }
      39             : 
      40           0 :   OverloadActionState actionState() const override { return state_; }
      41             : 
      42             : private:
      43             :   const double threshold_;
      44             :   OverloadActionState state_;
      45             : };
      46             : 
      47             : class ScaledTriggerImpl final : public Trigger {
      48             : public:
      49             :   ScaledTriggerImpl(const envoy::config::overload::v3::ScaledTrigger& config)
      50             :       : scaling_threshold_(config.scaling_threshold()),
      51             :         saturated_threshold_(config.saturation_threshold()),
      52           0 :         state_(OverloadActionState::inactive()) {
      53           0 :     if (scaling_threshold_ >= saturated_threshold_) {
      54           0 :       throw EnvoyException("scaling_threshold must be less than saturation_threshold");
      55           0 :     }
      56           0 :   }
      57             : 
      58           0 :   bool updateValue(double value) override {
      59           0 :     const OverloadActionState old_state = actionState();
      60           0 :     if (value <= scaling_threshold_) {
      61           0 :       state_ = OverloadActionState::inactive();
      62           0 :     } else if (value >= saturated_threshold_) {
      63           0 :       state_ = OverloadActionState::saturated();
      64           0 :     } else {
      65           0 :       state_ = OverloadActionState(
      66           0 :           UnitFloat((value - scaling_threshold_) / (saturated_threshold_ - scaling_threshold_)));
      67           0 :     }
      68             :     // All values of state_ are produced via this same code path. Even if
      69             :     // old_state and state_ should be approximately equal, there's no harm in
      70             :     // signaling for a small change if they're not float::operator== equal.
      71           0 :     return state_.value() != old_state.value();
      72           0 :   }
      73             : 
      74           0 :   OverloadActionState actionState() const override { return state_; }
      75             : 
      76             : private:
      77             :   const double scaling_threshold_;
      78             :   const double saturated_threshold_;
      79             :   OverloadActionState state_;
      80             : };
      81             : 
      82           0 : TriggerPtr createTriggerFromConfig(const envoy::config::overload::v3::Trigger& trigger_config) {
      83           0 :   TriggerPtr trigger;
      84             : 
      85           0 :   switch (trigger_config.trigger_oneof_case()) {
      86           0 :   case envoy::config::overload::v3::Trigger::TriggerOneofCase::kThreshold:
      87           0 :     trigger = std::make_unique<ThresholdTriggerImpl>(trigger_config.threshold());
      88           0 :     break;
      89           0 :   case envoy::config::overload::v3::Trigger::TriggerOneofCase::kScaled:
      90           0 :     trigger = std::make_unique<ScaledTriggerImpl>(trigger_config.scaled());
      91           0 :     break;
      92           0 :   case envoy::config::overload::v3::Trigger::TriggerOneofCase::TRIGGER_ONEOF_NOT_SET:
      93           0 :     throw EnvoyException(absl::StrCat("action not set for trigger ", trigger_config.name()));
      94           0 :   }
      95             : 
      96           0 :   return trigger;
      97           0 : }
      98             : 
      99           0 : Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view name_of_stat) {
     100           0 :   Stats::StatNameManagedStorage stat_name(name_of_stat, scope.symbolTable());
     101           0 :   return scope.counterFromStatName(stat_name.statName());
     102           0 : }
     103             : 
     104           0 : Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view a, absl::string_view b) {
     105           0 :   return makeCounter(scope, absl::StrCat("overload.", a, ".", b));
     106           0 : }
     107             : 
     108             : Stats::Gauge& makeGauge(Stats::Scope& scope, absl::string_view a, absl::string_view b,
     109           0 :                         Stats::Gauge::ImportMode import_mode) {
     110           0 :   Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", a, ".", b),
     111           0 :                                           scope.symbolTable());
     112           0 :   return scope.gaugeFromStatName(stat_name.statName(), import_mode);
     113           0 : }
     114             : 
     115             : Stats::Histogram& makeHistogram(Stats::Scope& scope, absl::string_view name,
     116         248 :                                 Stats::Histogram::Unit unit) {
     117         248 :   Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", name), scope.symbolTable());
     118         248 :   return scope.histogramFromStatName(stat_name.statName(), unit);
     119         248 : }
     120             : 
     121             : Event::ScaledTimerType parseTimerType(
     122           0 :     envoy::config::overload::v3::ScaleTimersOverloadActionConfig::TimerType config_timer_type) {
     123           0 :   using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
     124             : 
     125           0 :   switch (config_timer_type) {
     126           0 :   case Config::HTTP_DOWNSTREAM_CONNECTION_IDLE:
     127           0 :     return Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout;
     128           0 :   case Config::HTTP_DOWNSTREAM_STREAM_IDLE:
     129           0 :     return Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout;
     130           0 :   case Config::TRANSPORT_SOCKET_CONNECT:
     131           0 :     return Event::ScaledTimerType::TransportSocketConnectTimeout;
     132           0 :   default:
     133           0 :     throw EnvoyException(fmt::format("Unknown timer type {}", config_timer_type));
     134           0 :   }
     135           0 : }
     136             : 
     137             : Event::ScaledTimerTypeMap
     138             : parseTimerMinimums(const ProtobufWkt::Any& typed_config,
     139           0 :                    ProtobufMessage::ValidationVisitor& validation_visitor) {
     140           0 :   using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig;
     141           0 :   const Config action_config =
     142           0 :       MessageUtil::anyConvertAndValidate<Config>(typed_config, validation_visitor);
     143             : 
     144           0 :   Event::ScaledTimerTypeMap timer_map;
     145             : 
     146           0 :   for (const auto& scale_timer : action_config.timer_scale_factors()) {
     147           0 :     const Event::ScaledTimerType timer_type = parseTimerType(scale_timer.timer());
     148             : 
     149           0 :     const Event::ScaledTimerMinimum minimum =
     150           0 :         scale_timer.has_min_timeout()
     151           0 :             ? Event::ScaledTimerMinimum(Event::AbsoluteMinimum(std::chrono::milliseconds(
     152           0 :                   DurationUtil::durationToMilliseconds(scale_timer.min_timeout()))))
     153           0 :             : Event::ScaledTimerMinimum(
     154           0 :                   Event::ScaledMinimum(UnitFloat(scale_timer.min_scale().value() / 100.0)));
     155             : 
     156           0 :     auto [_, inserted] = timer_map.insert(std::make_pair(timer_type, minimum));
     157           0 :     UNREFERENCED_PARAMETER(_);
     158           0 :     if (!inserted) {
     159           0 :       throw EnvoyException(fmt::format("Found duplicate entry for timer type {}",
     160           0 :                                        Config::TimerType_Name(scale_timer.timer())));
     161           0 :     }
     162           0 :   }
     163             : 
     164           0 :   return timer_map;
     165           0 : }
     166             : 
     167             : } // namespace
     168             : 
     169             : /**
     170             :  * Thread-local copy of the state of each configured overload action.
     171             :  */
     172             : class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState {
     173             : public:
     174             :   explicit ThreadLocalOverloadStateImpl(
     175             :       const NamedOverloadActionSymbolTable& action_symbol_table,
     176             :       std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>&
     177             :           proactive_resources)
     178             :       : action_symbol_table_(action_symbol_table),
     179             :         actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())),
     180         192 :         proactive_resources_(proactive_resources) {}
     181             : 
     182        1568 :   const OverloadActionState& getState(const std::string& action) override {
     183        1568 :     if (const auto symbol = action_symbol_table_.lookup(action); symbol != absl::nullopt) {
     184           0 :       return actions_[symbol->index()];
     185           0 :     }
     186        1568 :     return always_inactive_;
     187        1568 :   }
     188             : 
     189           0 :   void setState(NamedOverloadActionSymbolTable::Symbol action, OverloadActionState state) {
     190           0 :     actions_[action.index()] = state;
     191           0 :   }
     192             : 
     193             :   bool tryAllocateResource(OverloadProactiveResourceName resource_name,
     194           0 :                            int64_t increment) override {
     195           0 :     const auto proactive_resource = proactive_resources_->find(resource_name);
     196           0 :     if (proactive_resource == proactive_resources_->end()) {
     197           0 :       ENVOY_LOG_MISC(warn, "Failed to allocate resource usage, resource monitor is not configured");
     198           0 :       return false;
     199           0 :     }
     200             : 
     201           0 :     return proactive_resource->second.tryAllocateResource(increment);
     202           0 :   }
     203             : 
     204             :   bool tryDeallocateResource(OverloadProactiveResourceName resource_name,
     205           0 :                              int64_t decrement) override {
     206           0 :     const auto proactive_resource = proactive_resources_->find(resource_name);
     207           0 :     if (proactive_resource == proactive_resources_->end()) {
     208           0 :       ENVOY_LOG_MISC(warn,
     209           0 :                      "Failed to deallocate resource usage, resource monitor is not configured");
     210           0 :       return false;
     211           0 :     }
     212             : 
     213           0 :     return proactive_resource->second.tryDeallocateResource(decrement);
     214           0 :   }
     215             : 
     216         208 :   bool isResourceMonitorEnabled(OverloadProactiveResourceName resource_name) override {
     217         208 :     const auto proactive_resource = proactive_resources_->find(resource_name);
     218         208 :     return proactive_resource != proactive_resources_->end();
     219         208 :   }
     220             : 
     221             :   ProactiveResourceMonitorOptRef
     222           0 :   getProactiveResourceMonitorForTest(OverloadProactiveResourceName resource_name) override {
     223           0 :     const auto proactive_resource = proactive_resources_->find(resource_name);
     224           0 :     if (proactive_resource == proactive_resources_->end()) {
     225           0 :       ENVOY_LOG_MISC(warn, "Failed to get resource usage, resource monitor is not configured");
     226           0 :       return makeOptRefFromPtr<ProactiveResourceMonitor>(nullptr);
     227           0 :     }
     228           0 :     return proactive_resource->second.getProactiveResourceMonitorForTest();
     229           0 :   }
     230             : 
     231             : private:
     232             :   static const OverloadActionState always_inactive_;
     233             :   const NamedOverloadActionSymbolTable& action_symbol_table_;
     234             :   std::vector<OverloadActionState> actions_;
     235             :   std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>
     236             :       proactive_resources_;
     237             : };
     238             : 
     239             : const OverloadActionState ThreadLocalOverloadStateImpl::always_inactive_{UnitFloat::min()};
     240             : 
     241             : NamedOverloadActionSymbolTable::Symbol
     242         656 : NamedOverloadActionSymbolTable::get(absl::string_view string) {
     243         656 :   if (auto it = table_.find(string); it != table_.end()) {
     244           0 :     return Symbol(it->second);
     245           0 :   }
     246             : 
     247         656 :   size_t index = table_.size();
     248             : 
     249         656 :   names_.emplace_back(string);
     250         656 :   table_.emplace(std::make_pair(string, index));
     251             : 
     252         656 :   return Symbol(index);
     253         656 : }
     254             : 
     255             : absl::optional<NamedOverloadActionSymbolTable::Symbol>
     256        1568 : NamedOverloadActionSymbolTable::lookup(absl::string_view string) const {
     257        1568 :   if (auto it = table_.find(string); it != table_.end()) {
     258           0 :     return Symbol(it->second);
     259           0 :   }
     260        1568 :   return absl::nullopt;
     261        1568 : }
     262             : 
     263           0 : const absl::string_view NamedOverloadActionSymbolTable::name(Symbol symbol) const {
     264           0 :   return names_.at(symbol.index());
     265           0 : }
     266             : 
     267             : OverloadAction::OverloadAction(const envoy::config::overload::v3::OverloadAction& config,
     268             :                                Stats::Scope& stats_scope)
     269             :     : state_(OverloadActionState::inactive()),
     270             :       active_gauge_(
     271             :           makeGauge(stats_scope, config.name(), "active", Stats::Gauge::ImportMode::NeverImport)),
     272             :       scale_percent_gauge_(makeGauge(stats_scope, config.name(), "scale_percent",
     273           0 :                                      Stats::Gauge::ImportMode::NeverImport)) {
     274           0 :   for (const auto& trigger_config : config.triggers()) {
     275           0 :     if (!triggers_.try_emplace(trigger_config.name(), createTriggerFromConfig(trigger_config))
     276           0 :              .second) {
     277           0 :       throw EnvoyException(
     278           0 :           absl::StrCat("Duplicate trigger resource for overload action ", config.name()));
     279           0 :     }
     280           0 :   }
     281             : 
     282           0 :   active_gauge_.set(0);
     283           0 :   scale_percent_gauge_.set(0);
     284           0 : }
     285             : 
     286           0 : bool OverloadAction::updateResourcePressure(const std::string& name, double pressure) {
     287           0 :   const OverloadActionState old_state = getState();
     288             : 
     289           0 :   auto it = triggers_.find(name);
     290           0 :   ASSERT(it != triggers_.end());
     291           0 :   if (!it->second->updateValue(pressure)) {
     292           0 :     return false;
     293           0 :   }
     294           0 :   const auto trigger_new_state = it->second->actionState();
     295           0 :   active_gauge_.set(trigger_new_state.isSaturated() ? 1 : 0);
     296           0 :   scale_percent_gauge_.set(trigger_new_state.value().value() * 100);
     297             : 
     298           0 :   {
     299             :     // Compute the new state as the maximum over all trigger states.
     300           0 :     OverloadActionState new_state = OverloadActionState::inactive();
     301           0 :     for (auto& trigger : triggers_) {
     302           0 :       const auto trigger_state = trigger.second->actionState();
     303           0 :       if (trigger_state.value() > new_state.value()) {
     304           0 :         new_state = trigger_state;
     305           0 :       }
     306           0 :     }
     307           0 :     state_ = new_state;
     308           0 :   }
     309             : 
     310           0 :   return state_.value() != old_state.value();
     311           0 : }
     312             : 
     313           0 : OverloadActionState OverloadAction::getState() const { return state_; }
     314             : 
     315             : LoadShedPointImpl::LoadShedPointImpl(const envoy::config::overload::v3::LoadShedPoint& config,
     316             :                                      Stats::Scope& stats_scope,
     317             :                                      Random::RandomGenerator& random_generator)
     318             :     : scale_percent_(makeGauge(stats_scope, config.name(), "scale_percent",
     319             :                                Stats::Gauge::ImportMode::NeverImport)),
     320           0 :       random_generator_(random_generator) {
     321           0 :   for (const auto& trigger_config : config.triggers()) {
     322           0 :     if (!triggers_.try_emplace(trigger_config.name(), createTriggerFromConfig(trigger_config))
     323           0 :              .second) {
     324           0 :       throw EnvoyException(
     325           0 :           absl::StrCat("Duplicate trigger resource for LoadShedPoint ", config.name()));
     326           0 :     }
     327           0 :   }
     328           0 : };
     329             : 
     330             : void LoadShedPointImpl::updateResource(absl::string_view resource_name,
     331           0 :                                        double resource_utilization) {
     332           0 :   auto it = triggers_.find(resource_name);
     333           0 :   if (it == triggers_.end()) {
     334           0 :     return;
     335           0 :   }
     336             : 
     337           0 :   it->second->updateValue(resource_utilization);
     338           0 :   updateProbabilityShedLoad();
     339           0 : }
     340             : 
     341           0 : void LoadShedPointImpl::updateProbabilityShedLoad() {
     342           0 :   float max_unit_float = 0.0f;
     343           0 :   for (const auto& trigger : triggers_) {
     344           0 :     max_unit_float = std::max(trigger.second->actionState().value().value(), max_unit_float);
     345           0 :   }
     346             : 
     347           0 :   probability_shed_load_.store(max_unit_float);
     348             : 
     349             :   // Update stats.
     350           0 :   scale_percent_.set(100 * max_unit_float);
     351           0 : }
     352             : 
     353           0 : bool LoadShedPointImpl::shouldShedLoad() {
     354           0 :   float unit_float_probability_shed_load = probability_shed_load_.load();
     355             :   // This should be ok as we're using unit float which saturates at 1.0f.
     356           0 :   if (unit_float_probability_shed_load == 1.0f) {
     357           0 :     return true;
     358           0 :   }
     359             : 
     360           0 :   return random_generator_.bernoulli(UnitFloat(unit_float_probability_shed_load));
     361           0 : }
     362             : 
     363             : OverloadManagerImpl::OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope,
     364             :                                          ThreadLocal::SlotAllocator& slot_allocator,
     365             :                                          const envoy::config::overload::v3::OverloadManager& config,
     366             :                                          ProtobufMessage::ValidationVisitor& validation_visitor,
     367             :                                          Api::Api& api, const Server::Options& options)
     368             :     : dispatcher_(dispatcher), time_source_(api.timeSource()), tls_(slot_allocator),
     369             :       refresh_interval_(
     370             :           std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))),
     371             :       refresh_interval_delays_(makeHistogram(stats_scope, "refresh_interval_delay",
     372             :                                              Stats::Histogram::Unit::Milliseconds)),
     373             :       proactive_resources_(
     374             :           std::make_unique<
     375         248 :               absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>()) {
     376         248 :   Configuration::ResourceMonitorFactoryContextImpl context(dispatcher, options, api,
     377         248 :                                                            validation_visitor);
     378             :   // We should hide impl details from users, for them there should be no distinction between
     379             :   // proactive and regular resource monitors in configuration API. But internally we will maintain
     380             :   // two distinct collections of proactive and regular resources. Proactive resources are not
     381             :   // subject to periodic flushes and can be recalculated/updated on demand by invoking
     382             :   // `tryAllocateResource/tryDeallocateResource` via thread local overload state.
     383         248 :   for (const auto& resource : config.resource_monitors()) {
     384           6 :     const auto& name = resource.name();
     385             :     // Check if it is a proactive resource.
     386           6 :     auto proactive_resource_it =
     387           6 :         OverloadProactiveResources::get().proactive_action_name_to_resource_.find(name);
     388           6 :     ENVOY_LOG(debug, "Evaluating resource {}", name);
     389           6 :     bool result = false;
     390           6 :     if (proactive_resource_it !=
     391           6 :         OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
     392           0 :       ENVOY_LOG(debug, "Adding proactive resource monitor for {}", name);
     393           0 :       auto& factory =
     394           0 :           Config::Utility::getAndCheckFactory<Configuration::ProactiveResourceMonitorFactory>(
     395           0 :               resource);
     396           0 :       auto config =
     397           0 :           Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
     398           0 :       auto monitor = factory.createProactiveResourceMonitor(*config, context);
     399           0 :       result =
     400           0 :           proactive_resources_
     401           0 :               ->try_emplace(proactive_resource_it->second, name, std::move(monitor), stats_scope)
     402           0 :               .second;
     403           6 :     } else {
     404           6 :       ENVOY_LOG(debug, "Adding resource monitor for {}", name);
     405           6 :       auto& factory =
     406           6 :           Config::Utility::getAndCheckFactory<Configuration::ResourceMonitorFactory>(resource);
     407           6 :       auto config =
     408           6 :           Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory);
     409           6 :       auto monitor = factory.createResourceMonitor(*config, context);
     410           6 :       result = resources_.try_emplace(name, name, std::move(monitor), *this, stats_scope).second;
     411           6 :     }
     412           6 :     if (!result) {
     413           0 :       throw EnvoyException(absl::StrCat("Duplicate resource monitor ", name));
     414           0 :     }
     415           6 :   }
     416             : 
     417         248 :   for (const auto& action : config.actions()) {
     418           0 :     const auto& name = action.name();
     419           0 :     const auto symbol = action_symbol_table_.get(name);
     420           0 :     ENVOY_LOG(debug, "Adding overload action {}", name);
     421             : 
     422             :     // Validate that this is a well known overload action.
     423           0 :     if (Runtime::runtimeFeatureEnabled(
     424           0 :             "envoy.reloadable_features.overload_manager_error_unknown_action")) {
     425           0 :       auto& well_known_actions = OverloadActionNames::get().WellKnownActions;
     426           0 :       if (std::find(well_known_actions.begin(), well_known_actions.end(), name) ==
     427           0 :           well_known_actions.end()) {
     428           0 :         throw EnvoyException(absl::StrCat("Unknown Overload Manager Action ", name));
     429           0 :       }
     430           0 :     }
     431             : 
     432             :     // TODO: use in place construction once https://github.com/abseil/abseil-cpp/issues/388 is
     433             :     // addressed
     434             :     // We cannot currently use in place construction as the OverloadAction constructor may throw,
     435             :     // causing an inconsistent internal state of the actions_ map, which on destruction results in
     436             :     // an invalid free.
     437           0 :     auto result = actions_.try_emplace(symbol, OverloadAction(action, stats_scope));
     438           0 :     if (!result.second) {
     439           0 :       throw EnvoyException(absl::StrCat("Duplicate overload action ", name));
     440           0 :     }
     441             : 
     442           0 :     if (name == OverloadActionNames::get().ReduceTimeouts) {
     443           0 :       timer_minimums_ = std::make_shared<const Event::ScaledTimerTypeMap>(
     444           0 :           parseTimerMinimums(action.typed_config(), validation_visitor));
     445           0 :     } else if (name == OverloadActionNames::get().ResetStreams) {
     446           0 :       if (!config.has_buffer_factory_config()) {
     447           0 :         throw EnvoyException(
     448           0 :             fmt::format("Overload action \"{}\" requires buffer_factory_config.", name));
     449           0 :       }
     450           0 :       makeCounter(api.rootScope(), OverloadActionStatsNames::get().ResetStreamsCount);
     451           0 :     } else if (action.has_typed_config()) {
     452           0 :       throw EnvoyException(fmt::format(
     453           0 :           "Overload action \"{}\" has an unexpected value for the typed_config field", name));
     454           0 :     }
     455             : 
     456           0 :     for (const auto& trigger : action.triggers()) {
     457           0 :       const std::string& resource = trigger.name();
     458           0 :       auto proactive_resource_it =
     459           0 :           OverloadProactiveResources::get().proactive_action_name_to_resource_.find(resource);
     460             : 
     461           0 :       if (resources_.find(resource) == resources_.end() &&
     462           0 :           proactive_resource_it ==
     463           0 :               OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) {
     464           0 :         throw EnvoyException(
     465           0 :             fmt::format("Unknown trigger resource {} for overload action {}", resource, name));
     466           0 :       }
     467           0 :       resource_to_actions_.insert(std::make_pair(resource, symbol));
     468           0 :     }
     469           0 :   }
     470             : 
     471             :   // Validate the trigger resources for Load shedPoints.
     472         248 :   for (const auto& point : config.loadshed_points()) {
     473           0 :     for (const auto& trigger : point.triggers()) {
     474           0 :       if (!resources_.contains(trigger.name())) {
     475           0 :         throw EnvoyException(fmt::format("Unknown trigger resource {} for loadshed point {}",
     476           0 :                                          trigger.name(), point.name()));
     477           0 :       }
     478           0 :     }
     479             : 
     480           0 :     const auto result = loadshed_points_.try_emplace(
     481           0 :         point.name(),
     482           0 :         std::make_unique<LoadShedPointImpl>(point, api.rootScope(), api.randomGenerator()));
     483             : 
     484           0 :     if (!result.second) {
     485           0 :       throw EnvoyException(absl::StrCat("Duplicate loadshed point ", point.name()));
     486           0 :     }
     487           0 :   }
     488         248 : }
     489             : 
     490          98 : void OverloadManagerImpl::start() {
     491          98 :   ASSERT(!started_);
     492          98 :   started_ = true;
     493             : 
     494         192 :   tls_.set([this](Event::Dispatcher&) {
     495         192 :     return std::make_shared<ThreadLocalOverloadStateImpl>(action_symbol_table_,
     496         192 :                                                           proactive_resources_);
     497         192 :   });
     498             : 
     499          98 :   if (resources_.empty()) {
     500          98 :     return;
     501          98 :   }
     502             : 
     503           0 :   timer_ = dispatcher_.createTimer([this]() -> void {
     504             :     // Guarantee that all resource updates get flushed after no more than one refresh_interval_.
     505           0 :     flushResourceUpdates();
     506             : 
     507             :     // Start a new flush epoch. If all resource updates complete before this callback runs, the last
     508             :     // resource update will call flushResourceUpdates to flush the whole batch early.
     509           0 :     ++flush_epoch_;
     510           0 :     flush_awaiting_updates_ = resources_.size();
     511             : 
     512           0 :     for (auto& resource : resources_) {
     513           0 :       resource.second.update(flush_epoch_);
     514           0 :     }
     515             : 
     516             :     // Record delay.
     517           0 :     auto now = time_source_.monotonicTime();
     518           0 :     std::chrono::milliseconds delay =
     519           0 :         std::chrono::duration_cast<std::chrono::milliseconds>(now - time_resources_last_measured_);
     520           0 :     refresh_interval_delays_.recordValue(delay.count());
     521           0 :     time_resources_last_measured_ = now;
     522             : 
     523           0 :     timer_->enableTimer(refresh_interval_);
     524           0 :   });
     525             : 
     526           0 :   time_resources_last_measured_ = time_source_.monotonicTime();
     527           0 :   timer_->enableTimer(refresh_interval_);
     528           0 : }
     529             : 
     530         132 : void OverloadManagerImpl::stop() {
     531             :   // Disable any pending timeouts.
     532         132 :   if (timer_) {
     533           0 :     timer_->disableTimer();
     534           0 :   }
     535             : 
     536             :   // Clear the resource map to block on any pending updates.
     537         132 :   resources_.clear();
     538             : 
     539             :   // TODO(nezdolik): wrap proactive monitors into atomic? and clear it here
     540         132 : }
     541             : 
     542             : bool OverloadManagerImpl::registerForAction(const std::string& action,
     543             :                                             Event::Dispatcher& dispatcher,
     544         656 :                                             OverloadActionCb callback) {
     545         656 :   ASSERT(!started_);
     546         656 :   const auto symbol = action_symbol_table_.get(action);
     547             : 
     548         656 :   if (actions_.find(symbol) == actions_.end()) {
     549         656 :     ENVOY_LOG(debug, "No overload action is configured for {}.", action);
     550         656 :     return false;
     551         656 :   }
     552             : 
     553           0 :   action_to_callbacks_.emplace(std::piecewise_construct, std::forward_as_tuple(symbol),
     554           0 :                                std::forward_as_tuple(dispatcher, callback));
     555           0 :   return true;
     556         656 : }
     557             : 
     558         992 : ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { return *tls_; }
     559         131 : Event::ScaledRangeTimerManagerFactory OverloadManagerImpl::scaledTimerFactory() {
     560         131 :   return [this](Event::Dispatcher& dispatcher) {
     561         131 :     auto manager = createScaledRangeTimerManager(dispatcher, timer_minimums_);
     562         131 :     registerForAction(OverloadActionNames::get().ReduceTimeouts, dispatcher,
     563         131 :                       [manager = manager.get()](OverloadActionState scale_state) {
     564           0 :                         manager->setScaleFactor(
     565             :                             // The action state is 0 for no overload up to 1 for maximal overload,
     566             :                             // but the scale factor for timers is 1 for no scaling and 0 for maximal
     567             :                             // scaling, so invert the value to pass in (1-value).
     568           0 :                             scale_state.value().invert());
     569           0 :                       });
     570         131 :     return manager;
     571         131 :   };
     572         131 : }
     573             : 
     574        1524 : LoadShedPoint* OverloadManagerImpl::getLoadShedPoint(absl::string_view point_name) {
     575        1524 :   if (auto it = loadshed_points_.find(point_name); it != loadshed_points_.end()) {
     576           0 :     return it->second.get();
     577           0 :   }
     578        1524 :   return nullptr;
     579        1524 : }
     580             : 
     581             : Event::ScaledRangeTimerManagerPtr OverloadManagerImpl::createScaledRangeTimerManager(
     582             :     Event::Dispatcher& dispatcher,
     583         131 :     const Event::ScaledTimerTypeMapConstSharedPtr& timer_minimums) const {
     584         131 :   return std::make_unique<Event::ScaledRangeTimerManagerImpl>(dispatcher, timer_minimums);
     585         131 : }
     586             : 
     587             : void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure,
     588           0 :                                                  FlushEpochId flush_epoch) {
     589           0 :   auto [start, end] = resource_to_actions_.equal_range(resource);
     590             : 
     591           0 :   std::for_each(start, end, [&](ResourceToActionMap::value_type& entry) {
     592           0 :     const NamedOverloadActionSymbolTable::Symbol action = entry.second;
     593           0 :     auto action_it = actions_.find(action);
     594           0 :     ASSERT(action_it != actions_.end());
     595           0 :     const OverloadActionState old_state = action_it->second.getState();
     596           0 :     if (action_it->second.updateResourcePressure(resource, pressure)) {
     597           0 :       const auto state = action_it->second.getState();
     598             : 
     599           0 :       if (old_state.isSaturated() != state.isSaturated()) {
     600           0 :         ENVOY_LOG(debug, "Overload action {} became {}", action_symbol_table_.name(action),
     601           0 :                   (state.isSaturated() ? "saturated" : "scaling"));
     602           0 :       }
     603             : 
     604             :       // Record the updated value to be sent to workers on the next thread-local-state flush, along
     605             :       // with any update callbacks. This might overwrite a previous action state change caused by a
     606             :       // pressure update for a different resource that hasn't been flushed yet. That's okay because
     607             :       // the state recorded here includes the information from all previous resource updates. So
     608             :       // even if resource 1 causes an action to have value A, and a later update to resource 2
     609             :       // causes the action to have value B, B would have been the result for whichever order the
     610             :       // updates to resources 1 and 2 came in.
     611           0 :       state_updates_to_flush_.insert_or_assign(action, state);
     612           0 :       auto [callbacks_start, callbacks_end] = action_to_callbacks_.equal_range(action);
     613           0 :       std::for_each(callbacks_start, callbacks_end, [&](ActionToCallbackMap::value_type& cb_entry) {
     614           0 :         callbacks_to_flush_.insert_or_assign(&cb_entry.second, state);
     615           0 :       });
     616           0 :     }
     617           0 :   });
     618             : 
     619           0 :   for (auto& loadshed_point : loadshed_points_) {
     620           0 :     loadshed_point.second->updateResource(resource, pressure);
     621           0 :   }
     622             : 
     623             :   // Eagerly flush updates if this is the last call to updateResourcePressure expected for the
     624             :   // current epoch. This assert is always valid because flush_awaiting_updates_ is initialized
     625             :   // before each batch of updates, and even if a resource monitor performs a double update, or a
     626             :   // previous update callback is late, the logic in OverloadManager::Resource::update() will prevent
     627             :   // unexpected calls to this function.
     628           0 :   ASSERT(flush_awaiting_updates_ > 0);
     629           0 :   --flush_awaiting_updates_;
     630           0 :   if (flush_epoch == flush_epoch_ && flush_awaiting_updates_ == 0) {
     631           0 :     flushResourceUpdates();
     632           0 :   }
     633           0 : }
     634             : 
     635           0 : void OverloadManagerImpl::flushResourceUpdates() {
     636           0 :   if (!state_updates_to_flush_.empty()) {
     637           0 :     auto shared_updates = std::make_shared<
     638           0 :         absl::flat_hash_map<NamedOverloadActionSymbolTable::Symbol, OverloadActionState>>();
     639           0 :     std::swap(*shared_updates, state_updates_to_flush_);
     640             : 
     641           0 :     tls_.runOnAllThreads(
     642           0 :         [updates = std::move(shared_updates)](OptRef<ThreadLocalOverloadStateImpl> overload_state) {
     643           0 :           for (const auto& [action, state] : *updates) {
     644           0 :             overload_state->setState(action, state);
     645           0 :           }
     646           0 :         });
     647           0 :   }
     648             : 
     649           0 :   for (const auto& [cb, state] : callbacks_to_flush_) {
     650           0 :     cb->dispatcher_.post([cb = cb, state = state]() { cb->callback_(state); });
     651           0 :   }
     652           0 :   callbacks_to_flush_.clear();
     653           0 : }
     654             : 
     655             : OverloadManagerImpl::Resource::Resource(const std::string& name, ResourceMonitorPtr monitor,
     656             :                                         OverloadManagerImpl& manager, Stats::Scope& stats_scope)
     657             :     : name_(name), monitor_(std::move(monitor)), manager_(manager),
     658             :       pressure_gauge_(
     659             :           makeGauge(stats_scope, name, "pressure", Stats::Gauge::ImportMode::NeverImport)),
     660             :       failed_updates_counter_(makeCounter(stats_scope, name, "failed_updates")),
     661           0 :       skipped_updates_counter_(makeCounter(stats_scope, name, "skipped_updates")) {}
     662             : 
     663           0 : void OverloadManagerImpl::Resource::update(FlushEpochId flush_epoch) {
     664           0 :   if (!pending_update_) {
     665           0 :     pending_update_ = true;
     666           0 :     flush_epoch_ = flush_epoch;
     667           0 :     monitor_->updateResourceUsage(*this);
     668           0 :     return;
     669           0 :   }
     670           0 :   ENVOY_LOG(debug, "Skipping update for resource {} which has pending update", name_);
     671           0 :   skipped_updates_counter_.inc();
     672           0 : }
     673             : 
     674           0 : void OverloadManagerImpl::Resource::onSuccess(const ResourceUsage& usage) {
     675           0 :   pending_update_ = false;
     676           0 :   manager_.updateResourcePressure(name_, usage.resource_pressure_, flush_epoch_);
     677           0 :   pressure_gauge_.set(usage.resource_pressure_ * 100); // convert to percent
     678           0 : }
     679             : 
     680           0 : void OverloadManagerImpl::Resource::onFailure(const EnvoyException& error) {
     681           0 :   pending_update_ = false;
     682           0 :   ENVOY_LOG(info, "Failed to update resource {}: {}", name_, error.what());
     683           0 :   failed_updates_counter_.inc();
     684           0 : }
     685             : 
     686             : } // namespace Server
     687             : } // namespace Envoy

Generated by: LCOV version 1.15