/proc/self/cwd/source/server/overload_manager_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/server/overload_manager_impl.h" |
2 | | |
3 | | #include <chrono> |
4 | | |
5 | | #include "envoy/common/exception.h" |
6 | | #include "envoy/config/overload/v3/overload.pb.h" |
7 | | #include "envoy/config/overload/v3/overload.pb.validate.h" |
8 | | #include "envoy/stats/scope.h" |
9 | | |
10 | | #include "source/common/common/fmt.h" |
11 | | #include "source/common/config/utility.h" |
12 | | #include "source/common/event/scaled_range_timer_manager_impl.h" |
13 | | #include "source/common/protobuf/utility.h" |
14 | | #include "source/common/stats/symbol_table.h" |
15 | | #include "source/server/resource_monitor_config_impl.h" |
16 | | |
17 | | #include "absl/container/node_hash_map.h" |
18 | | #include "absl/strings/str_cat.h" |
19 | | |
20 | | namespace Envoy { |
21 | | namespace Server { |
22 | | namespace { |
23 | | |
24 | | using TriggerPtr = std::unique_ptr<Trigger>; |
25 | | |
26 | | class ThresholdTriggerImpl final : public Trigger { |
27 | | public: |
28 | | ThresholdTriggerImpl(const envoy::config::overload::v3::ThresholdTrigger& config) |
29 | 0 | : threshold_(config.value()), state_(OverloadActionState::inactive()) {} |
30 | | |
31 | 0 | bool updateValue(double value) override { |
32 | 0 | const OverloadActionState state = actionState(); |
33 | 0 | state_ = |
34 | 0 | value >= threshold_ ? OverloadActionState::saturated() : OverloadActionState::inactive(); |
35 | | // This is a floating point comparison, though state_ is always either |
36 | | // saturated or inactive so there's no risk due to floating point precision. |
37 | 0 | return state.value() != actionState().value(); |
38 | 0 | } |
39 | | |
40 | 0 | OverloadActionState actionState() const override { return state_; } |
41 | | |
42 | | private: |
43 | | const double threshold_; |
44 | | OverloadActionState state_; |
45 | | }; |
46 | | |
47 | | class ScaledTriggerImpl final : public Trigger { |
48 | | public: |
49 | | ScaledTriggerImpl(const envoy::config::overload::v3::ScaledTrigger& config) |
50 | | : scaling_threshold_(config.scaling_threshold()), |
51 | | saturated_threshold_(config.saturation_threshold()), |
52 | 0 | state_(OverloadActionState::inactive()) { |
53 | 0 | if (scaling_threshold_ >= saturated_threshold_) { |
54 | 0 | throwEnvoyExceptionOrPanic("scaling_threshold must be less than saturation_threshold"); |
55 | 0 | } |
56 | 0 | } |
57 | | |
58 | 0 | bool updateValue(double value) override { |
59 | 0 | const OverloadActionState old_state = actionState(); |
60 | 0 | if (value <= scaling_threshold_) { |
61 | 0 | state_ = OverloadActionState::inactive(); |
62 | 0 | } else if (value >= saturated_threshold_) { |
63 | 0 | state_ = OverloadActionState::saturated(); |
64 | 0 | } else { |
65 | 0 | state_ = OverloadActionState( |
66 | 0 | UnitFloat((value - scaling_threshold_) / (saturated_threshold_ - scaling_threshold_))); |
67 | 0 | } |
68 | | // All values of state_ are produced via this same code path. Even if |
69 | | // old_state and state_ should be approximately equal, there's no harm in |
70 | | // signaling for a small change if they're not float::operator== equal. |
71 | 0 | return state_.value() != old_state.value(); |
72 | 0 | } |
73 | | |
74 | 0 | OverloadActionState actionState() const override { return state_; } |
75 | | |
76 | | private: |
77 | | const double scaling_threshold_; |
78 | | const double saturated_threshold_; |
79 | | OverloadActionState state_; |
80 | | }; |
81 | | |
82 | 0 | TriggerPtr createTriggerFromConfig(const envoy::config::overload::v3::Trigger& trigger_config) { |
83 | 0 | TriggerPtr trigger; |
84 | |
|
85 | 0 | switch (trigger_config.trigger_oneof_case()) { |
86 | 0 | case envoy::config::overload::v3::Trigger::TriggerOneofCase::kThreshold: |
87 | 0 | trigger = std::make_unique<ThresholdTriggerImpl>(trigger_config.threshold()); |
88 | 0 | break; |
89 | 0 | case envoy::config::overload::v3::Trigger::TriggerOneofCase::kScaled: |
90 | 0 | trigger = std::make_unique<ScaledTriggerImpl>(trigger_config.scaled()); |
91 | 0 | break; |
92 | 0 | case envoy::config::overload::v3::Trigger::TriggerOneofCase::TRIGGER_ONEOF_NOT_SET: |
93 | 0 | throwEnvoyExceptionOrPanic(absl::StrCat("action not set for trigger ", trigger_config.name())); |
94 | 0 | } |
95 | | |
96 | 0 | return trigger; |
97 | 0 | } |
98 | | |
99 | 0 | Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view name_of_stat) { |
100 | 0 | Stats::StatNameManagedStorage stat_name(name_of_stat, scope.symbolTable()); |
101 | 0 | return scope.counterFromStatName(stat_name.statName()); |
102 | 0 | } |
103 | | |
104 | 0 | Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view a, absl::string_view b) { |
105 | 0 | return makeCounter(scope, absl::StrCat("overload.", a, ".", b)); |
106 | 0 | } |
107 | | |
108 | | Stats::Gauge& makeGauge(Stats::Scope& scope, absl::string_view a, absl::string_view b, |
109 | 0 | Stats::Gauge::ImportMode import_mode) { |
110 | 0 | Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", a, ".", b), |
111 | 0 | scope.symbolTable()); |
112 | 0 | return scope.gaugeFromStatName(stat_name.statName(), import_mode); |
113 | 0 | } |
114 | | |
115 | | Stats::Histogram& makeHistogram(Stats::Scope& scope, absl::string_view name, |
116 | 10.5k | Stats::Histogram::Unit unit) { |
117 | 10.5k | Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", name), scope.symbolTable()); |
118 | 10.5k | return scope.histogramFromStatName(stat_name.statName(), unit); |
119 | 10.5k | } |
120 | | |
121 | | Event::ScaledTimerType parseTimerType( |
122 | 0 | envoy::config::overload::v3::ScaleTimersOverloadActionConfig::TimerType config_timer_type) { |
123 | 0 | using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig; |
124 | |
|
125 | 0 | switch (config_timer_type) { |
126 | 0 | case Config::HTTP_DOWNSTREAM_CONNECTION_IDLE: |
127 | 0 | return Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout; |
128 | 0 | case Config::HTTP_DOWNSTREAM_STREAM_IDLE: |
129 | 0 | return Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout; |
130 | 0 | case Config::TRANSPORT_SOCKET_CONNECT: |
131 | 0 | return Event::ScaledTimerType::TransportSocketConnectTimeout; |
132 | 0 | default: |
133 | 0 | throwEnvoyExceptionOrPanic(fmt::format("Unknown timer type {}", config_timer_type)); |
134 | 0 | } |
135 | 0 | } |
136 | | |
137 | | Event::ScaledTimerTypeMap |
138 | | parseTimerMinimums(const ProtobufWkt::Any& typed_config, |
139 | 0 | ProtobufMessage::ValidationVisitor& validation_visitor) { |
140 | 0 | using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig; |
141 | 0 | const Config action_config = |
142 | 0 | MessageUtil::anyConvertAndValidate<Config>(typed_config, validation_visitor); |
143 | |
|
144 | 0 | Event::ScaledTimerTypeMap timer_map; |
145 | |
|
146 | 0 | for (const auto& scale_timer : action_config.timer_scale_factors()) { |
147 | 0 | const Event::ScaledTimerType timer_type = parseTimerType(scale_timer.timer()); |
148 | |
|
149 | 0 | const Event::ScaledTimerMinimum minimum = |
150 | 0 | scale_timer.has_min_timeout() |
151 | 0 | ? Event::ScaledTimerMinimum(Event::AbsoluteMinimum(std::chrono::milliseconds( |
152 | 0 | DurationUtil::durationToMilliseconds(scale_timer.min_timeout())))) |
153 | 0 | : Event::ScaledTimerMinimum( |
154 | 0 | Event::ScaledMinimum(UnitFloat(scale_timer.min_scale().value() / 100.0))); |
155 | |
|
156 | 0 | auto [_, inserted] = timer_map.insert(std::make_pair(timer_type, minimum)); |
157 | 0 | UNREFERENCED_PARAMETER(_); |
158 | 0 | if (!inserted) { |
159 | 0 | throwEnvoyExceptionOrPanic(fmt::format("Found duplicate entry for timer type {}", |
160 | 0 | Config::TimerType_Name(scale_timer.timer()))); |
161 | 0 | } |
162 | 0 | } |
163 | | |
164 | 0 | return timer_map; |
165 | 0 | } |
166 | | |
167 | | } // namespace |
168 | | |
169 | | /** |
170 | | * Thread-local copy of the state of each configured overload action. |
171 | | */ |
172 | | class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState { |
173 | | public: |
174 | | explicit ThreadLocalOverloadStateImpl( |
175 | | const NamedOverloadActionSymbolTable& action_symbol_table, |
176 | | std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>& |
177 | | proactive_resources) |
178 | | : action_symbol_table_(action_symbol_table), |
179 | | actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())), |
180 | 5.29k | proactive_resources_(proactive_resources) {} |
181 | | |
182 | 7.25k | const OverloadActionState& getState(const std::string& action) override { |
183 | 7.25k | if (const auto symbol = action_symbol_table_.lookup(action); symbol != absl::nullopt) { |
184 | 0 | return actions_[symbol->index()]; |
185 | 0 | } |
186 | 7.25k | return always_inactive_; |
187 | 7.25k | } |
188 | | |
189 | 0 | void setState(NamedOverloadActionSymbolTable::Symbol action, OverloadActionState state) { |
190 | 0 | actions_[action.index()] = state; |
191 | 0 | } |
192 | | |
193 | | bool tryAllocateResource(OverloadProactiveResourceName resource_name, |
194 | 0 | int64_t increment) override { |
195 | 0 | const auto proactive_resource = proactive_resources_->find(resource_name); |
196 | 0 | if (proactive_resource == proactive_resources_->end()) { |
197 | 0 | ENVOY_LOG_MISC(warn, "Failed to allocate resource usage, resource monitor is not configured"); |
198 | 0 | return false; |
199 | 0 | } |
200 | | |
201 | 0 | return proactive_resource->second.tryAllocateResource(increment); |
202 | 0 | } |
203 | | |
204 | | bool tryDeallocateResource(OverloadProactiveResourceName resource_name, |
205 | 0 | int64_t decrement) override { |
206 | 0 | const auto proactive_resource = proactive_resources_->find(resource_name); |
207 | 0 | if (proactive_resource == proactive_resources_->end()) { |
208 | 0 | ENVOY_LOG_MISC(warn, |
209 | 0 | "Failed to deallocate resource usage, resource monitor is not configured"); |
210 | 0 | return false; |
211 | 0 | } |
212 | | |
213 | 0 | return proactive_resource->second.tryDeallocateResource(decrement); |
214 | 0 | } |
215 | | |
216 | 5.31k | bool isResourceMonitorEnabled(OverloadProactiveResourceName resource_name) override { |
217 | 5.31k | const auto proactive_resource = proactive_resources_->find(resource_name); |
218 | 5.31k | return proactive_resource != proactive_resources_->end(); |
219 | 5.31k | } |
220 | | |
221 | | ProactiveResourceMonitorOptRef |
222 | 0 | getProactiveResourceMonitorForTest(OverloadProactiveResourceName resource_name) override { |
223 | 0 | const auto proactive_resource = proactive_resources_->find(resource_name); |
224 | 0 | if (proactive_resource == proactive_resources_->end()) { |
225 | 0 | ENVOY_LOG_MISC(warn, "Failed to get resource usage, resource monitor is not configured"); |
226 | 0 | return makeOptRefFromPtr<ProactiveResourceMonitor>(nullptr); |
227 | 0 | } |
228 | 0 | return proactive_resource->second.getProactiveResourceMonitorForTest(); |
229 | 0 | } |
230 | | |
231 | | private: |
232 | | static const OverloadActionState always_inactive_; |
233 | | const NamedOverloadActionSymbolTable& action_symbol_table_; |
234 | | std::vector<OverloadActionState> actions_; |
235 | | std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>> |
236 | | proactive_resources_; |
237 | | }; |
238 | | |
239 | | const OverloadActionState ThreadLocalOverloadStateImpl::always_inactive_{UnitFloat::min()}; |
240 | | |
241 | | NamedOverloadActionSymbolTable::Symbol |
242 | 26.6k | NamedOverloadActionSymbolTable::get(absl::string_view string) { |
243 | 26.6k | if (auto it = table_.find(string); it != table_.end()) { |
244 | 0 | return Symbol(it->second); |
245 | 0 | } |
246 | | |
247 | 26.6k | size_t index = table_.size(); |
248 | | |
249 | 26.6k | names_.emplace_back(string); |
250 | 26.6k | table_.emplace(std::make_pair(string, index)); |
251 | | |
252 | 26.6k | return Symbol(index); |
253 | 26.6k | } |
254 | | |
255 | | absl::optional<NamedOverloadActionSymbolTable::Symbol> |
256 | 7.25k | NamedOverloadActionSymbolTable::lookup(absl::string_view string) const { |
257 | 7.25k | if (auto it = table_.find(string); it != table_.end()) { |
258 | 0 | return Symbol(it->second); |
259 | 0 | } |
260 | 7.25k | return absl::nullopt; |
261 | 7.25k | } |
262 | | |
263 | 0 | const absl::string_view NamedOverloadActionSymbolTable::name(Symbol symbol) const { |
264 | 0 | return names_.at(symbol.index()); |
265 | 0 | } |
266 | | |
267 | | OverloadAction::OverloadAction(const envoy::config::overload::v3::OverloadAction& config, |
268 | | Stats::Scope& stats_scope) |
269 | | : state_(OverloadActionState::inactive()), |
270 | | active_gauge_( |
271 | | makeGauge(stats_scope, config.name(), "active", Stats::Gauge::ImportMode::NeverImport)), |
272 | | scale_percent_gauge_(makeGauge(stats_scope, config.name(), "scale_percent", |
273 | 0 | Stats::Gauge::ImportMode::NeverImport)) { |
274 | 0 | for (const auto& trigger_config : config.triggers()) { |
275 | 0 | if (!triggers_.try_emplace(trigger_config.name(), createTriggerFromConfig(trigger_config)) |
276 | 0 | .second) { |
277 | 0 | throwEnvoyExceptionOrPanic( |
278 | 0 | absl::StrCat("Duplicate trigger resource for overload action ", config.name())); |
279 | 0 | } |
280 | 0 | } |
281 | | |
282 | 0 | active_gauge_.set(0); |
283 | 0 | scale_percent_gauge_.set(0); |
284 | 0 | } |
285 | | |
286 | 0 | bool OverloadAction::updateResourcePressure(const std::string& name, double pressure) { |
287 | 0 | const OverloadActionState old_state = getState(); |
288 | |
|
289 | 0 | auto it = triggers_.find(name); |
290 | 0 | ASSERT(it != triggers_.end()); |
291 | 0 | if (!it->second->updateValue(pressure)) { |
292 | 0 | return false; |
293 | 0 | } |
294 | 0 | const auto trigger_new_state = it->second->actionState(); |
295 | 0 | active_gauge_.set(trigger_new_state.isSaturated() ? 1 : 0); |
296 | 0 | scale_percent_gauge_.set(trigger_new_state.value().value() * 100); |
297 | |
|
298 | 0 | { |
299 | | // Compute the new state as the maximum over all trigger states. |
300 | 0 | OverloadActionState new_state = OverloadActionState::inactive(); |
301 | 0 | for (auto& trigger : triggers_) { |
302 | 0 | const auto trigger_state = trigger.second->actionState(); |
303 | 0 | if (trigger_state.value() > new_state.value()) { |
304 | 0 | new_state = trigger_state; |
305 | 0 | } |
306 | 0 | } |
307 | 0 | state_ = new_state; |
308 | 0 | } |
309 | |
|
310 | 0 | return state_.value() != old_state.value(); |
311 | 0 | } |
312 | | |
313 | 0 | OverloadActionState OverloadAction::getState() const { return state_; } |
314 | | |
315 | | LoadShedPointImpl::LoadShedPointImpl(const envoy::config::overload::v3::LoadShedPoint& config, |
316 | | Stats::Scope& stats_scope, |
317 | | Random::RandomGenerator& random_generator) |
318 | | : scale_percent_(makeGauge(stats_scope, config.name(), "scale_percent", |
319 | | Stats::Gauge::ImportMode::NeverImport)), |
320 | 0 | random_generator_(random_generator) { |
321 | 0 | for (const auto& trigger_config : config.triggers()) { |
322 | 0 | if (!triggers_.try_emplace(trigger_config.name(), createTriggerFromConfig(trigger_config)) |
323 | 0 | .second) { |
324 | 0 | throwEnvoyExceptionOrPanic( |
325 | 0 | absl::StrCat("Duplicate trigger resource for LoadShedPoint ", config.name())); |
326 | 0 | } |
327 | 0 | } |
328 | 0 | }; |
329 | | |
330 | | void LoadShedPointImpl::updateResource(absl::string_view resource_name, |
331 | 0 | double resource_utilization) { |
332 | 0 | auto it = triggers_.find(resource_name); |
333 | 0 | if (it == triggers_.end()) { |
334 | 0 | return; |
335 | 0 | } |
336 | | |
337 | 0 | it->second->updateValue(resource_utilization); |
338 | 0 | updateProbabilityShedLoad(); |
339 | 0 | } |
340 | | |
341 | 0 | void LoadShedPointImpl::updateProbabilityShedLoad() { |
342 | 0 | float max_unit_float = 0.0f; |
343 | 0 | for (const auto& trigger : triggers_) { |
344 | 0 | max_unit_float = std::max(trigger.second->actionState().value().value(), max_unit_float); |
345 | 0 | } |
346 | |
|
347 | 0 | probability_shed_load_.store(max_unit_float); |
348 | | |
349 | | // Update stats. |
350 | 0 | scale_percent_.set(100 * max_unit_float); |
351 | 0 | } |
352 | | |
353 | 0 | bool LoadShedPointImpl::shouldShedLoad() { |
354 | 0 | float unit_float_probability_shed_load = probability_shed_load_.load(); |
355 | | // This should be ok as we're using unit float which saturates at 1.0f. |
356 | 0 | if (unit_float_probability_shed_load == 1.0f) { |
357 | 0 | return true; |
358 | 0 | } |
359 | | |
360 | 0 | return random_generator_.bernoulli(UnitFloat(unit_float_probability_shed_load)); |
361 | 0 | } |
362 | | |
363 | | OverloadManagerImpl::OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope, |
364 | | ThreadLocal::SlotAllocator& slot_allocator, |
365 | | const envoy::config::overload::v3::OverloadManager& config, |
366 | | ProtobufMessage::ValidationVisitor& validation_visitor, |
367 | | Api::Api& api, const Server::Options& options) |
368 | | : dispatcher_(dispatcher), time_source_(api.timeSource()), tls_(slot_allocator), |
369 | | refresh_interval_( |
370 | | std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))), |
371 | | refresh_interval_delays_(makeHistogram(stats_scope, "refresh_interval_delay", |
372 | | Stats::Histogram::Unit::Milliseconds)), |
373 | | proactive_resources_( |
374 | | std::make_unique< |
375 | 10.5k | absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>()) { |
376 | 10.5k | Configuration::ResourceMonitorFactoryContextImpl context(dispatcher, options, api, |
377 | 10.5k | validation_visitor); |
378 | | // We should hide impl details from users, for them there should be no distinction between |
379 | | // proactive and regular resource monitors in configuration API. But internally we will maintain |
380 | | // two distinct collections of proactive and regular resources. Proactive resources are not |
381 | | // subject to periodic flushes and can be recalculated/updated on demand by invoking |
382 | | // `tryAllocateResource/tryDeallocateResource` via thread local overload state. |
383 | 10.5k | for (const auto& resource : config.resource_monitors()) { |
384 | 90 | const auto& name = resource.name(); |
385 | | // Check if it is a proactive resource. |
386 | 90 | auto proactive_resource_it = |
387 | 90 | OverloadProactiveResources::get().proactive_action_name_to_resource_.find(name); |
388 | 90 | ENVOY_LOG(debug, "Evaluating resource {}", name); |
389 | 90 | bool result = false; |
390 | 90 | if (proactive_resource_it != |
391 | 90 | OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) { |
392 | 0 | ENVOY_LOG(debug, "Adding proactive resource monitor for {}", name); |
393 | 0 | auto& factory = |
394 | 0 | Config::Utility::getAndCheckFactory<Configuration::ProactiveResourceMonitorFactory>( |
395 | 0 | resource); |
396 | 0 | auto config = |
397 | 0 | Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory); |
398 | 0 | auto monitor = factory.createProactiveResourceMonitor(*config, context); |
399 | 0 | result = |
400 | 0 | proactive_resources_ |
401 | 0 | ->try_emplace(proactive_resource_it->second, name, std::move(monitor), stats_scope) |
402 | 0 | .second; |
403 | 90 | } else { |
404 | 90 | ENVOY_LOG(debug, "Adding resource monitor for {}", name); |
405 | 90 | auto& factory = |
406 | 90 | Config::Utility::getAndCheckFactory<Configuration::ResourceMonitorFactory>(resource); |
407 | 90 | auto config = |
408 | 90 | Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory); |
409 | 90 | auto monitor = factory.createResourceMonitor(*config, context); |
410 | 90 | result = resources_.try_emplace(name, name, std::move(monitor), *this, stats_scope).second; |
411 | 90 | } |
412 | 90 | if (!result) { |
413 | 0 | throwEnvoyExceptionOrPanic(absl::StrCat("Duplicate resource monitor ", name)); |
414 | 0 | } |
415 | 90 | } |
416 | | |
417 | 10.5k | for (const auto& action : config.actions()) { |
418 | 0 | const auto& name = action.name(); |
419 | 0 | const auto symbol = action_symbol_table_.get(name); |
420 | 0 | ENVOY_LOG(debug, "Adding overload action {}", name); |
421 | | |
422 | | // Validate that this is a well known overload action. |
423 | 0 | if (Runtime::runtimeFeatureEnabled( |
424 | 0 | "envoy.reloadable_features.overload_manager_error_unknown_action")) { |
425 | 0 | auto& well_known_actions = OverloadActionNames::get().WellKnownActions; |
426 | 0 | if (std::find(well_known_actions.begin(), well_known_actions.end(), name) == |
427 | 0 | well_known_actions.end()) { |
428 | 0 | throwEnvoyExceptionOrPanic(absl::StrCat("Unknown Overload Manager Action ", name)); |
429 | 0 | } |
430 | 0 | } |
431 | | |
432 | | // TODO: use in place construction once https://github.com/abseil/abseil-cpp/issues/388 is |
433 | | // addressed |
434 | | // We cannot currently use in place construction as the OverloadAction constructor may throw, |
435 | | // causing an inconsistent internal state of the actions_ map, which on destruction results in |
436 | | // an invalid free. |
437 | 0 | auto result = actions_.try_emplace(symbol, OverloadAction(action, stats_scope)); |
438 | 0 | if (!result.second) { |
439 | 0 | throwEnvoyExceptionOrPanic(absl::StrCat("Duplicate overload action ", name)); |
440 | 0 | } |
441 | | |
442 | 0 | if (name == OverloadActionNames::get().ReduceTimeouts) { |
443 | 0 | timer_minimums_ = std::make_shared<const Event::ScaledTimerTypeMap>( |
444 | 0 | parseTimerMinimums(action.typed_config(), validation_visitor)); |
445 | 0 | } else if (name == OverloadActionNames::get().ResetStreams) { |
446 | 0 | if (!config.has_buffer_factory_config()) { |
447 | 0 | throwEnvoyExceptionOrPanic( |
448 | 0 | fmt::format("Overload action \"{}\" requires buffer_factory_config.", name)); |
449 | 0 | } |
450 | 0 | makeCounter(api.rootScope(), OverloadActionStatsNames::get().ResetStreamsCount); |
451 | 0 | } else if (action.has_typed_config()) { |
452 | 0 | throwEnvoyExceptionOrPanic(fmt::format( |
453 | 0 | "Overload action \"{}\" has an unexpected value for the typed_config field", name)); |
454 | 0 | } |
455 | | |
456 | 0 | for (const auto& trigger : action.triggers()) { |
457 | 0 | const std::string& resource = trigger.name(); |
458 | 0 | auto proactive_resource_it = |
459 | 0 | OverloadProactiveResources::get().proactive_action_name_to_resource_.find(resource); |
460 | |
|
461 | 0 | if (resources_.find(resource) == resources_.end() && |
462 | 0 | proactive_resource_it == |
463 | 0 | OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) { |
464 | 0 | throwEnvoyExceptionOrPanic( |
465 | 0 | fmt::format("Unknown trigger resource {} for overload action {}", resource, name)); |
466 | 0 | } |
467 | 0 | resource_to_actions_.insert(std::make_pair(resource, symbol)); |
468 | 0 | } |
469 | 0 | } |
470 | | |
471 | | // Validate the trigger resources for Load shedPoints. |
472 | 10.5k | for (const auto& point : config.loadshed_points()) { |
473 | 0 | for (const auto& trigger : point.triggers()) { |
474 | 0 | if (!resources_.contains(trigger.name())) { |
475 | 0 | throwEnvoyExceptionOrPanic(fmt::format("Unknown trigger resource {} for loadshed point {}", |
476 | 0 | trigger.name(), point.name())); |
477 | 0 | } |
478 | 0 | } |
479 | | |
480 | 0 | const auto result = loadshed_points_.try_emplace( |
481 | 0 | point.name(), |
482 | 0 | std::make_unique<LoadShedPointImpl>(point, api.rootScope(), api.randomGenerator())); |
483 | |
|
484 | 0 | if (!result.second) { |
485 | 0 | throwEnvoyExceptionOrPanic(absl::StrCat("Duplicate loadshed point ", point.name())); |
486 | 0 | } |
487 | 0 | } |
488 | 10.5k | } |
489 | | |
490 | 2.64k | void OverloadManagerImpl::start() { |
491 | 2.64k | ASSERT(!started_); |
492 | 2.64k | started_ = true; |
493 | | |
494 | 5.29k | tls_.set([this](Event::Dispatcher&) { |
495 | 5.29k | return std::make_shared<ThreadLocalOverloadStateImpl>(action_symbol_table_, |
496 | 5.29k | proactive_resources_); |
497 | 5.29k | }); |
498 | | |
499 | 2.64k | if (resources_.empty()) { |
500 | 2.64k | return; |
501 | 2.64k | } |
502 | | |
503 | 0 | timer_ = dispatcher_.createTimer([this]() -> void { |
504 | | // Guarantee that all resource updates get flushed after no more than one refresh_interval_. |
505 | 0 | flushResourceUpdates(); |
506 | | |
507 | | // Start a new flush epoch. If all resource updates complete before this callback runs, the last |
508 | | // resource update will call flushResourceUpdates to flush the whole batch early. |
509 | 0 | ++flush_epoch_; |
510 | 0 | flush_awaiting_updates_ = resources_.size(); |
511 | |
|
512 | 0 | for (auto& resource : resources_) { |
513 | 0 | resource.second.update(flush_epoch_); |
514 | 0 | } |
515 | | |
516 | | // Record delay. |
517 | 0 | auto now = time_source_.monotonicTime(); |
518 | 0 | std::chrono::milliseconds delay = |
519 | 0 | std::chrono::duration_cast<std::chrono::milliseconds>(now - time_resources_last_measured_); |
520 | 0 | refresh_interval_delays_.recordValue(delay.count()); |
521 | 0 | time_resources_last_measured_ = now; |
522 | |
|
523 | 0 | timer_->enableTimer(refresh_interval_); |
524 | 0 | }); |
525 | |
|
526 | 0 | time_resources_last_measured_ = time_source_.monotonicTime(); |
527 | 0 | timer_->enableTimer(refresh_interval_); |
528 | 0 | } |
529 | | |
530 | 5.34k | void OverloadManagerImpl::stop() { |
531 | | // Disable any pending timeouts. |
532 | 5.34k | if (timer_) { |
533 | 0 | timer_->disableTimer(); |
534 | 0 | } |
535 | | |
536 | | // Clear the resource map to block on any pending updates. |
537 | 5.34k | resources_.clear(); |
538 | | |
539 | | // TODO(nezdolik): wrap proactive monitors into atomic? and clear it here |
540 | 5.34k | } |
541 | | |
542 | | bool OverloadManagerImpl::registerForAction(const std::string& action, |
543 | | Event::Dispatcher& dispatcher, |
544 | 26.6k | OverloadActionCb callback) { |
545 | 26.6k | ASSERT(!started_); |
546 | 26.6k | const auto symbol = action_symbol_table_.get(action); |
547 | | |
548 | 26.6k | if (actions_.find(symbol) == actions_.end()) { |
549 | 26.6k | ENVOY_LOG(debug, "No overload action is configured for {}.", action); |
550 | 26.6k | return false; |
551 | 26.6k | } |
552 | | |
553 | 0 | action_to_callbacks_.emplace(std::piecewise_construct, std::forward_as_tuple(symbol), |
554 | 0 | std::forward_as_tuple(dispatcher, callback)); |
555 | 0 | return true; |
556 | 26.6k | } |
557 | | |
558 | 8.93k | ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { return *tls_; } |
559 | 5.33k | Event::ScaledRangeTimerManagerFactory OverloadManagerImpl::scaledTimerFactory() { |
560 | 5.33k | return [this](Event::Dispatcher& dispatcher) { |
561 | 5.33k | auto manager = createScaledRangeTimerManager(dispatcher, timer_minimums_); |
562 | 5.33k | registerForAction(OverloadActionNames::get().ReduceTimeouts, dispatcher, |
563 | 5.33k | [manager = manager.get()](OverloadActionState scale_state) { |
564 | 0 | manager->setScaleFactor( |
565 | | // The action state is 0 for no overload up to 1 for maximal overload, |
566 | | // but the scale factor for timers is 1 for no scaling and 0 for maximal |
567 | | // scaling, so invert the value to pass in (1-value). |
568 | 0 | scale_state.value().invert()); |
569 | 0 | }); |
570 | 5.33k | return manager; |
571 | 5.33k | }; |
572 | 5.33k | } |
573 | | |
574 | 8.09k | LoadShedPoint* OverloadManagerImpl::getLoadShedPoint(absl::string_view point_name) { |
575 | 8.09k | if (auto it = loadshed_points_.find(point_name); it != loadshed_points_.end()) { |
576 | 0 | return it->second.get(); |
577 | 0 | } |
578 | 8.09k | return nullptr; |
579 | 8.09k | } |
580 | | |
581 | | Event::ScaledRangeTimerManagerPtr OverloadManagerImpl::createScaledRangeTimerManager( |
582 | | Event::Dispatcher& dispatcher, |
583 | 5.33k | const Event::ScaledTimerTypeMapConstSharedPtr& timer_minimums) const { |
584 | 5.33k | return std::make_unique<Event::ScaledRangeTimerManagerImpl>(dispatcher, timer_minimums); |
585 | 5.33k | } |
586 | | |
587 | | void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure, |
588 | 0 | FlushEpochId flush_epoch) { |
589 | 0 | auto [start, end] = resource_to_actions_.equal_range(resource); |
590 | |
|
591 | 0 | std::for_each(start, end, [&](ResourceToActionMap::value_type& entry) { |
592 | 0 | const NamedOverloadActionSymbolTable::Symbol action = entry.second; |
593 | 0 | auto action_it = actions_.find(action); |
594 | 0 | ASSERT(action_it != actions_.end()); |
595 | 0 | const OverloadActionState old_state = action_it->second.getState(); |
596 | 0 | if (action_it->second.updateResourcePressure(resource, pressure)) { |
597 | 0 | const auto state = action_it->second.getState(); |
598 | |
|
599 | 0 | if (old_state.isSaturated() != state.isSaturated()) { |
600 | 0 | ENVOY_LOG(debug, "Overload action {} became {}", action_symbol_table_.name(action), |
601 | 0 | (state.isSaturated() ? "saturated" : "scaling")); |
602 | 0 | } |
603 | | |
604 | | // Record the updated value to be sent to workers on the next thread-local-state flush, along |
605 | | // with any update callbacks. This might overwrite a previous action state change caused by a |
606 | | // pressure update for a different resource that hasn't been flushed yet. That's okay because |
607 | | // the state recorded here includes the information from all previous resource updates. So |
608 | | // even if resource 1 causes an action to have value A, and a later update to resource 2 |
609 | | // causes the action to have value B, B would have been the result for whichever order the |
610 | | // updates to resources 1 and 2 came in. |
611 | 0 | state_updates_to_flush_.insert_or_assign(action, state); |
612 | 0 | auto [callbacks_start, callbacks_end] = action_to_callbacks_.equal_range(action); |
613 | 0 | std::for_each(callbacks_start, callbacks_end, [&](ActionToCallbackMap::value_type& cb_entry) { |
614 | 0 | callbacks_to_flush_.insert_or_assign(&cb_entry.second, state); |
615 | 0 | }); |
616 | 0 | } |
617 | 0 | }); |
618 | |
|
619 | 0 | for (auto& loadshed_point : loadshed_points_) { |
620 | 0 | loadshed_point.second->updateResource(resource, pressure); |
621 | 0 | } |
622 | | |
623 | | // Eagerly flush updates if this is the last call to updateResourcePressure expected for the |
624 | | // current epoch. This assert is always valid because flush_awaiting_updates_ is initialized |
625 | | // before each batch of updates, and even if a resource monitor performs a double update, or a |
626 | | // previous update callback is late, the logic in OverloadManager::Resource::update() will prevent |
627 | | // unexpected calls to this function. |
628 | 0 | ASSERT(flush_awaiting_updates_ > 0); |
629 | 0 | --flush_awaiting_updates_; |
630 | 0 | if (flush_epoch == flush_epoch_ && flush_awaiting_updates_ == 0) { |
631 | 0 | flushResourceUpdates(); |
632 | 0 | } |
633 | 0 | } |
634 | | |
635 | 0 | void OverloadManagerImpl::flushResourceUpdates() { |
636 | 0 | if (!state_updates_to_flush_.empty()) { |
637 | 0 | auto shared_updates = std::make_shared< |
638 | 0 | absl::flat_hash_map<NamedOverloadActionSymbolTable::Symbol, OverloadActionState>>(); |
639 | 0 | std::swap(*shared_updates, state_updates_to_flush_); |
640 | |
|
641 | 0 | tls_.runOnAllThreads( |
642 | 0 | [updates = std::move(shared_updates)](OptRef<ThreadLocalOverloadStateImpl> overload_state) { |
643 | 0 | for (const auto& [action, state] : *updates) { |
644 | 0 | overload_state->setState(action, state); |
645 | 0 | } |
646 | 0 | }); |
647 | 0 | } |
648 | |
|
649 | 0 | for (const auto& [cb, state] : callbacks_to_flush_) { |
650 | 0 | cb->dispatcher_.post([cb = cb, state = state]() { cb->callback_(state); }); |
651 | 0 | } |
652 | 0 | callbacks_to_flush_.clear(); |
653 | 0 | } |
654 | | |
655 | | OverloadManagerImpl::Resource::Resource(const std::string& name, ResourceMonitorPtr monitor, |
656 | | OverloadManagerImpl& manager, Stats::Scope& stats_scope) |
657 | | : name_(name), monitor_(std::move(monitor)), manager_(manager), |
658 | | pressure_gauge_( |
659 | | makeGauge(stats_scope, name, "pressure", Stats::Gauge::ImportMode::NeverImport)), |
660 | | failed_updates_counter_(makeCounter(stats_scope, name, "failed_updates")), |
661 | 0 | skipped_updates_counter_(makeCounter(stats_scope, name, "skipped_updates")) {} |
662 | | |
663 | 0 | void OverloadManagerImpl::Resource::update(FlushEpochId flush_epoch) { |
664 | 0 | if (!pending_update_) { |
665 | 0 | pending_update_ = true; |
666 | 0 | flush_epoch_ = flush_epoch; |
667 | 0 | monitor_->updateResourceUsage(*this); |
668 | 0 | return; |
669 | 0 | } |
670 | 0 | ENVOY_LOG(debug, "Skipping update for resource {} which has pending update", name_); |
671 | 0 | skipped_updates_counter_.inc(); |
672 | 0 | } |
673 | | |
674 | 0 | void OverloadManagerImpl::Resource::onSuccess(const ResourceUsage& usage) { |
675 | 0 | pending_update_ = false; |
676 | 0 | manager_.updateResourcePressure(name_, usage.resource_pressure_, flush_epoch_); |
677 | 0 | pressure_gauge_.set(usage.resource_pressure_ * 100); // convert to percent |
678 | 0 | } |
679 | | |
680 | 0 | void OverloadManagerImpl::Resource::onFailure(const EnvoyException& error) { |
681 | 0 | pending_update_ = false; |
682 | 0 | ENVOY_LOG(info, "Failed to update resource {}: {}", name_, error.what()); |
683 | 0 | failed_updates_counter_.inc(); |
684 | 0 | } |
685 | | |
686 | | } // namespace Server |
687 | | } // namespace Envoy |