/proc/self/cwd/source/server/overload_manager_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/server/overload_manager_impl.h" |
2 | | |
3 | | #include <chrono> |
4 | | |
5 | | #include "envoy/common/exception.h" |
6 | | #include "envoy/config/overload/v3/overload.pb.h" |
7 | | #include "envoy/config/overload/v3/overload.pb.validate.h" |
8 | | #include "envoy/stats/scope.h" |
9 | | |
10 | | #include "source/common/common/fmt.h" |
11 | | #include "source/common/config/utility.h" |
12 | | #include "source/common/event/scaled_range_timer_manager_impl.h" |
13 | | #include "source/common/protobuf/utility.h" |
14 | | #include "source/common/stats/symbol_table.h" |
15 | | #include "source/server/resource_monitor_config_impl.h" |
16 | | |
17 | | #include "absl/container/node_hash_map.h" |
18 | | #include "absl/strings/str_cat.h" |
19 | | |
20 | | namespace Envoy { |
21 | | namespace Server { |
22 | | namespace { |
23 | | |
24 | | using TriggerPtr = std::unique_ptr<Trigger>; |
25 | | |
26 | | class ThresholdTriggerImpl final : public Trigger { |
27 | | public: |
28 | | ThresholdTriggerImpl(const envoy::config::overload::v3::ThresholdTrigger& config) |
29 | 0 | : threshold_(config.value()), state_(OverloadActionState::inactive()) {} |
30 | | |
31 | 0 | bool updateValue(double value) override { |
32 | 0 | const OverloadActionState state = actionState(); |
33 | 0 | state_ = |
34 | 0 | value >= threshold_ ? OverloadActionState::saturated() : OverloadActionState::inactive(); |
35 | | // This is a floating point comparison, though state_ is always either |
36 | | // saturated or inactive so there's no risk due to floating point precision. |
37 | 0 | return state.value() != actionState().value(); |
38 | 0 | } |
39 | | |
40 | 0 | OverloadActionState actionState() const override { return state_; } |
41 | | |
42 | | private: |
43 | | const double threshold_; |
44 | | OverloadActionState state_; |
45 | | }; |
46 | | |
47 | | class ScaledTriggerImpl final : public Trigger { |
48 | | public: |
49 | | static absl::StatusOr<std::unique_ptr<ScaledTriggerImpl>> |
50 | 0 | create(const envoy::config::overload::v3::ScaledTrigger& config) { |
51 | 0 | if (config.scaling_threshold() >= config.saturation_threshold()) { |
52 | 0 | return absl::InvalidArgumentError("scaling_threshold must be less than saturation_threshold"); |
53 | 0 | } |
54 | 0 | return std::unique_ptr<ScaledTriggerImpl>(new ScaledTriggerImpl(config)); |
55 | 0 | } |
56 | | |
57 | 0 | bool updateValue(double value) override { |
58 | 0 | const OverloadActionState old_state = actionState(); |
59 | 0 | if (value <= scaling_threshold_) { |
60 | 0 | state_ = OverloadActionState::inactive(); |
61 | 0 | } else if (value >= saturated_threshold_) { |
62 | 0 | state_ = OverloadActionState::saturated(); |
63 | 0 | } else { |
64 | 0 | state_ = OverloadActionState( |
65 | 0 | UnitFloat((value - scaling_threshold_) / (saturated_threshold_ - scaling_threshold_))); |
66 | 0 | } |
67 | | // All values of state_ are produced via this same code path. Even if |
68 | | // old_state and state_ should be approximately equal, there's no harm in |
69 | | // signaling for a small change if they're not float::operator== equal. |
70 | 0 | return state_.value() != old_state.value(); |
71 | 0 | } |
72 | | |
73 | 0 | OverloadActionState actionState() const override { return state_; } |
74 | | |
75 | | private: |
76 | | ScaledTriggerImpl(const envoy::config::overload::v3::ScaledTrigger& config) |
77 | | : scaling_threshold_(config.scaling_threshold()), |
78 | | saturated_threshold_(config.saturation_threshold()), |
79 | 0 | state_(OverloadActionState::inactive()) {} |
80 | | |
81 | | const double scaling_threshold_; |
82 | | const double saturated_threshold_; |
83 | | OverloadActionState state_; |
84 | | }; |
85 | | |
86 | | absl::StatusOr<TriggerPtr> |
87 | 0 | createTriggerFromConfig(const envoy::config::overload::v3::Trigger& trigger_config) { |
88 | 0 | TriggerPtr trigger; |
89 | |
|
90 | 0 | switch (trigger_config.trigger_oneof_case()) { |
91 | 0 | case envoy::config::overload::v3::Trigger::TriggerOneofCase::kThreshold: |
92 | 0 | trigger = std::make_unique<ThresholdTriggerImpl>(trigger_config.threshold()); |
93 | 0 | break; |
94 | 0 | case envoy::config::overload::v3::Trigger::TriggerOneofCase::kScaled: { |
95 | 0 | auto trigger_or_error = ScaledTriggerImpl::create(trigger_config.scaled()); |
96 | 0 | RETURN_IF_NOT_OK(trigger_or_error.status()); |
97 | 0 | trigger = std::move(trigger_or_error.value()); |
98 | 0 | break; |
99 | 0 | } |
100 | 0 | case envoy::config::overload::v3::Trigger::TriggerOneofCase::TRIGGER_ONEOF_NOT_SET: |
101 | 0 | return absl::InvalidArgumentError( |
102 | 0 | absl::StrCat("action not set for trigger ", trigger_config.name())); |
103 | 0 | } |
104 | | |
105 | 0 | return trigger; |
106 | 0 | } |
107 | | |
108 | 0 | Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view name_of_stat) { |
109 | 0 | Stats::StatNameManagedStorage stat_name(name_of_stat, scope.symbolTable()); |
110 | 0 | return scope.counterFromStatName(stat_name.statName()); |
111 | 0 | } |
112 | | |
113 | 0 | Stats::Counter& makeCounter(Stats::Scope& scope, absl::string_view a, absl::string_view b) { |
114 | 0 | return makeCounter(scope, absl::StrCat("overload.", a, ".", b)); |
115 | 0 | } |
116 | | |
117 | | Stats::Gauge& makeGauge(Stats::Scope& scope, absl::string_view a, absl::string_view b, |
118 | 0 | Stats::Gauge::ImportMode import_mode) { |
119 | 0 | Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", a, ".", b), |
120 | 0 | scope.symbolTable()); |
121 | 0 | return scope.gaugeFromStatName(stat_name.statName(), import_mode); |
122 | 0 | } |
123 | | |
124 | | Stats::Histogram& makeHistogram(Stats::Scope& scope, absl::string_view name, |
125 | 8.66k | Stats::Histogram::Unit unit) { |
126 | 8.66k | Stats::StatNameManagedStorage stat_name(absl::StrCat("overload.", name), scope.symbolTable()); |
127 | 8.66k | return scope.histogramFromStatName(stat_name.statName(), unit); |
128 | 8.66k | } |
129 | | |
130 | | absl::StatusOr<Event::ScaledTimerType> parseTimerType( |
131 | 0 | envoy::config::overload::v3::ScaleTimersOverloadActionConfig::TimerType config_timer_type) { |
132 | 0 | using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig; |
133 | |
|
134 | 0 | switch (config_timer_type) { |
135 | 0 | case Config::HTTP_DOWNSTREAM_CONNECTION_IDLE: |
136 | 0 | return Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout; |
137 | 0 | case Config::HTTP_DOWNSTREAM_STREAM_IDLE: |
138 | 0 | return Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout; |
139 | 0 | case Config::TRANSPORT_SOCKET_CONNECT: |
140 | 0 | return Event::ScaledTimerType::TransportSocketConnectTimeout; |
141 | 0 | default: |
142 | 0 | return absl::InvalidArgumentError( |
143 | 0 | fmt::format("Unknown timer type {}", static_cast<int>(config_timer_type))); |
144 | 0 | } |
145 | 0 | } |
146 | | |
147 | | absl::StatusOr<Event::ScaledTimerTypeMap> |
148 | | parseTimerMinimums(const ProtobufWkt::Any& typed_config, |
149 | 0 | ProtobufMessage::ValidationVisitor& validation_visitor) { |
150 | 0 | using Config = envoy::config::overload::v3::ScaleTimersOverloadActionConfig; |
151 | 0 | const Config action_config = |
152 | 0 | MessageUtil::anyConvertAndValidate<Config>(typed_config, validation_visitor); |
153 | |
|
154 | 0 | Event::ScaledTimerTypeMap timer_map; |
155 | |
|
156 | 0 | for (const auto& scale_timer : action_config.timer_scale_factors()) { |
157 | 0 | auto timer_or_error = parseTimerType(scale_timer.timer()); |
158 | 0 | RETURN_IF_NOT_OK(timer_or_error.status()); |
159 | 0 | const Event::ScaledTimerType& timer_type = *timer_or_error; |
160 | |
|
161 | 0 | const Event::ScaledTimerMinimum minimum = |
162 | 0 | scale_timer.has_min_timeout() |
163 | 0 | ? Event::ScaledTimerMinimum(Event::AbsoluteMinimum(std::chrono::milliseconds( |
164 | 0 | DurationUtil::durationToMilliseconds(scale_timer.min_timeout())))) |
165 | 0 | : Event::ScaledTimerMinimum( |
166 | 0 | Event::ScaledMinimum(UnitFloat(scale_timer.min_scale().value() / 100.0))); |
167 | |
|
168 | 0 | auto [_, inserted] = timer_map.insert(std::make_pair(timer_type, minimum)); |
169 | 0 | UNREFERENCED_PARAMETER(_); |
170 | 0 | if (!inserted) { |
171 | 0 | return absl::InvalidArgumentError(fmt::format("Found duplicate entry for timer type {}", |
172 | 0 | Config::TimerType_Name(scale_timer.timer()))); |
173 | 0 | } |
174 | 0 | } |
175 | | |
176 | 0 | return timer_map; |
177 | 0 | } |
178 | | |
179 | | } // namespace |
180 | | |
181 | | /** |
182 | | * Thread-local copy of the state of each configured overload action. |
183 | | */ |
184 | | class ThreadLocalOverloadStateImpl : public ThreadLocalOverloadState { |
185 | | public: |
186 | | explicit ThreadLocalOverloadStateImpl( |
187 | | const NamedOverloadActionSymbolTable& action_symbol_table, |
188 | | std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>& |
189 | | proactive_resources) |
190 | | : action_symbol_table_(action_symbol_table), |
191 | | actions_(action_symbol_table.size(), OverloadActionState(UnitFloat::min())), |
192 | 3.95k | proactive_resources_(proactive_resources) {} |
193 | | |
194 | 5.85k | const OverloadActionState& getState(const std::string& action) override { |
195 | 5.85k | if (const auto symbol = action_symbol_table_.lookup(action); symbol != absl::nullopt) { |
196 | 0 | return actions_[symbol->index()]; |
197 | 0 | } |
198 | 5.85k | return always_inactive_; |
199 | 5.85k | } |
200 | | |
201 | 0 | void setState(NamedOverloadActionSymbolTable::Symbol action, OverloadActionState state) { |
202 | 0 | actions_[action.index()] = state; |
203 | 0 | } |
204 | | |
205 | | bool tryAllocateResource(OverloadProactiveResourceName resource_name, |
206 | 0 | int64_t increment) override { |
207 | 0 | const auto proactive_resource = proactive_resources_->find(resource_name); |
208 | 0 | if (proactive_resource == proactive_resources_->end()) { |
209 | 0 | ENVOY_LOG_MISC(warn, "Failed to allocate resource usage, resource monitor is not configured"); |
210 | 0 | return false; |
211 | 0 | } |
212 | | |
213 | 0 | return proactive_resource->second.tryAllocateResource(increment); |
214 | 0 | } |
215 | | |
216 | | bool tryDeallocateResource(OverloadProactiveResourceName resource_name, |
217 | 0 | int64_t decrement) override { |
218 | 0 | const auto proactive_resource = proactive_resources_->find(resource_name); |
219 | 0 | if (proactive_resource == proactive_resources_->end()) { |
220 | 0 | ENVOY_LOG_MISC(warn, |
221 | 0 | "Failed to deallocate resource usage, resource monitor is not configured"); |
222 | 0 | return false; |
223 | 0 | } |
224 | | |
225 | 0 | return proactive_resource->second.tryDeallocateResource(decrement); |
226 | 0 | } |
227 | | |
228 | 3.96k | bool isResourceMonitorEnabled(OverloadProactiveResourceName resource_name) override { |
229 | 3.96k | const auto proactive_resource = proactive_resources_->find(resource_name); |
230 | 3.96k | return proactive_resource != proactive_resources_->end(); |
231 | 3.96k | } |
232 | | |
233 | | ProactiveResourceMonitorOptRef |
234 | 0 | getProactiveResourceMonitorForTest(OverloadProactiveResourceName resource_name) override { |
235 | 0 | const auto proactive_resource = proactive_resources_->find(resource_name); |
236 | 0 | if (proactive_resource == proactive_resources_->end()) { |
237 | 0 | ENVOY_LOG_MISC(warn, "Failed to get resource usage, resource monitor is not configured"); |
238 | 0 | return makeOptRefFromPtr<ProactiveResourceMonitor>(nullptr); |
239 | 0 | } |
240 | 0 | return proactive_resource->second.getProactiveResourceMonitorForTest(); |
241 | 0 | } |
242 | | |
243 | | private: |
244 | | static const OverloadActionState always_inactive_; |
245 | | const NamedOverloadActionSymbolTable& action_symbol_table_; |
246 | | std::vector<OverloadActionState> actions_; |
247 | | std::shared_ptr<absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>> |
248 | | proactive_resources_; |
249 | | }; |
250 | | |
251 | | const OverloadActionState ThreadLocalOverloadStateImpl::always_inactive_{UnitFloat::min()}; |
252 | | |
253 | | NamedOverloadActionSymbolTable::Symbol |
254 | 22.2k | NamedOverloadActionSymbolTable::get(absl::string_view string) { |
255 | 22.2k | if (auto it = table_.find(string); it != table_.end()) { |
256 | 0 | return Symbol(it->second); |
257 | 0 | } |
258 | | |
259 | 22.2k | size_t index = table_.size(); |
260 | | |
261 | 22.2k | names_.emplace_back(string); |
262 | 22.2k | table_.emplace(std::make_pair(string, index)); |
263 | | |
264 | 22.2k | return Symbol(index); |
265 | 22.2k | } |
266 | | |
267 | | absl::optional<NamedOverloadActionSymbolTable::Symbol> |
268 | 5.85k | NamedOverloadActionSymbolTable::lookup(absl::string_view string) const { |
269 | 5.85k | if (auto it = table_.find(string); it != table_.end()) { |
270 | 0 | return Symbol(it->second); |
271 | 0 | } |
272 | 5.85k | return absl::nullopt; |
273 | 5.85k | } |
274 | | |
275 | 0 | const absl::string_view NamedOverloadActionSymbolTable::name(Symbol symbol) const { |
276 | 0 | return names_.at(symbol.index()); |
277 | 0 | } |
278 | | |
279 | | absl::StatusOr<std::unique_ptr<OverloadAction>> |
280 | | OverloadAction::create(const envoy::config::overload::v3::OverloadAction& config, |
281 | 0 | Stats::Scope& stats_scope) { |
282 | 0 | absl::Status creation_status = absl::OkStatus(); |
283 | 0 | auto ret = |
284 | 0 | std::unique_ptr<OverloadAction>(new OverloadAction(config, stats_scope, creation_status)); |
285 | 0 | RETURN_IF_NOT_OK(creation_status); |
286 | 0 | return ret; |
287 | 0 | } |
288 | | |
289 | | OverloadAction::OverloadAction(const envoy::config::overload::v3::OverloadAction& config, |
290 | | Stats::Scope& stats_scope, absl::Status& creation_status) |
291 | | : state_(OverloadActionState::inactive()), |
292 | | active_gauge_( |
293 | | makeGauge(stats_scope, config.name(), "active", Stats::Gauge::ImportMode::NeverImport)), |
294 | | scale_percent_gauge_(makeGauge(stats_scope, config.name(), "scale_percent", |
295 | 0 | Stats::Gauge::ImportMode::NeverImport)) { |
296 | 0 | for (const auto& trigger_config : config.triggers()) { |
297 | 0 | absl::StatusOr<TriggerPtr> trigger_or_error = createTriggerFromConfig(trigger_config); |
298 | 0 | SET_AND_RETURN_IF_NOT_OK(trigger_or_error.status(), creation_status); |
299 | 0 | if (!triggers_.try_emplace(trigger_config.name(), std::move(*trigger_or_error)).second) { |
300 | 0 | creation_status = absl::InvalidArgumentError( |
301 | 0 | absl::StrCat("Duplicate trigger resource for overload action ", config.name())); |
302 | 0 | return; |
303 | 0 | } |
304 | 0 | } |
305 | | |
306 | 0 | active_gauge_.set(0); |
307 | 0 | scale_percent_gauge_.set(0); |
308 | 0 | } |
309 | | |
310 | 0 | bool OverloadAction::updateResourcePressure(const std::string& name, double pressure) { |
311 | 0 | const OverloadActionState old_state = getState(); |
312 | |
|
313 | 0 | auto it = triggers_.find(name); |
314 | 0 | ASSERT(it != triggers_.end()); |
315 | 0 | if (!it->second->updateValue(pressure)) { |
316 | 0 | return false; |
317 | 0 | } |
318 | 0 | const auto trigger_new_state = it->second->actionState(); |
319 | 0 | active_gauge_.set(trigger_new_state.isSaturated() ? 1 : 0); |
320 | 0 | scale_percent_gauge_.set(trigger_new_state.value().value() * 100); |
321 | |
|
322 | 0 | { |
323 | | // Compute the new state as the maximum over all trigger states. |
324 | 0 | OverloadActionState new_state = OverloadActionState::inactive(); |
325 | 0 | for (auto& trigger : triggers_) { |
326 | 0 | const auto trigger_state = trigger.second->actionState(); |
327 | 0 | if (trigger_state.value() > new_state.value()) { |
328 | 0 | new_state = trigger_state; |
329 | 0 | } |
330 | 0 | } |
331 | 0 | state_ = new_state; |
332 | 0 | } |
333 | |
|
334 | 0 | return state_.value() != old_state.value(); |
335 | 0 | } |
336 | | |
337 | 0 | OverloadActionState OverloadAction::getState() const { return state_; } |
338 | | |
339 | | absl::StatusOr<std::unique_ptr<LoadShedPointImpl>> |
340 | | LoadShedPointImpl::create(const envoy::config::overload::v3::LoadShedPoint& config, |
341 | 0 | Stats::Scope& stats_scope, Random::RandomGenerator& random_generator) { |
342 | 0 | absl::Status creation_status = absl::OkStatus(); |
343 | 0 | auto ret = std::unique_ptr<LoadShedPointImpl>( |
344 | 0 | new LoadShedPointImpl(config, stats_scope, random_generator, creation_status)); |
345 | 0 | RETURN_IF_NOT_OK(creation_status); |
346 | 0 | return ret; |
347 | 0 | } |
348 | | LoadShedPointImpl::LoadShedPointImpl(const envoy::config::overload::v3::LoadShedPoint& config, |
349 | | Stats::Scope& stats_scope, |
350 | | Random::RandomGenerator& random_generator, |
351 | | absl::Status& creation_status) |
352 | | : scale_percent_(makeGauge(stats_scope, config.name(), "scale_percent", |
353 | | Stats::Gauge::ImportMode::NeverImport)), |
354 | | shed_load_counter_(makeCounter(stats_scope, config.name(), "shed_load_count")), |
355 | 0 | random_generator_(random_generator) { |
356 | 0 | for (const auto& trigger_config : config.triggers()) { |
357 | 0 | auto trigger_or_error = createTriggerFromConfig(trigger_config); |
358 | 0 | SET_AND_RETURN_IF_NOT_OK(trigger_or_error.status(), creation_status); |
359 | 0 | if (!triggers_.try_emplace(trigger_config.name(), std::move(*trigger_or_error)).second) { |
360 | 0 | creation_status = absl::InvalidArgumentError( |
361 | 0 | absl::StrCat("Duplicate trigger resource for LoadShedPoint ", config.name())); |
362 | 0 | return; |
363 | 0 | } |
364 | 0 | } |
365 | 0 | }; |
366 | | |
367 | | void LoadShedPointImpl::updateResource(absl::string_view resource_name, |
368 | 0 | double resource_utilization) { |
369 | 0 | auto it = triggers_.find(resource_name); |
370 | 0 | if (it == triggers_.end()) { |
371 | 0 | return; |
372 | 0 | } |
373 | | |
374 | 0 | it->second->updateValue(resource_utilization); |
375 | 0 | updateProbabilityShedLoad(); |
376 | 0 | } |
377 | | |
378 | 0 | void LoadShedPointImpl::updateProbabilityShedLoad() { |
379 | 0 | float max_unit_float = 0.0f; |
380 | 0 | for (const auto& trigger : triggers_) { |
381 | 0 | max_unit_float = std::max(trigger.second->actionState().value().value(), max_unit_float); |
382 | 0 | } |
383 | |
|
384 | 0 | probability_shed_load_.store(max_unit_float); |
385 | | |
386 | | // Update stats. |
387 | 0 | scale_percent_.set(100 * max_unit_float); |
388 | 0 | } |
389 | | |
390 | 0 | bool LoadShedPointImpl::shouldShedLoad() { |
391 | 0 | float unit_float_probability_shed_load = probability_shed_load_.load(); |
392 | | // This should be ok as we're using unit float which saturates at 1.0f. |
393 | 0 | if (unit_float_probability_shed_load == 1.0f) { |
394 | 0 | shed_load_counter_.inc(); |
395 | 0 | return true; |
396 | 0 | } |
397 | | |
398 | 0 | if (random_generator_.bernoulli(UnitFloat(unit_float_probability_shed_load))) { |
399 | 0 | shed_load_counter_.inc(); |
400 | 0 | return true; |
401 | 0 | } |
402 | 0 | return false; |
403 | 0 | } |
404 | | |
405 | | absl::StatusOr<std::unique_ptr<OverloadManagerImpl>> |
406 | | OverloadManagerImpl::create(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope, |
407 | | ThreadLocal::SlotAllocator& slot_allocator, |
408 | | const envoy::config::overload::v3::OverloadManager& config, |
409 | | ProtobufMessage::ValidationVisitor& validation_visitor, Api::Api& api, |
410 | 8.66k | const Server::Options& options) { |
411 | 8.66k | absl::Status creation_status = absl::OkStatus(); |
412 | 8.66k | auto ret = std::unique_ptr<OverloadManagerImpl>( |
413 | 8.66k | new OverloadManagerImpl(dispatcher, stats_scope, slot_allocator, config, validation_visitor, |
414 | 8.66k | api, options, creation_status)); |
415 | 8.66k | RETURN_IF_NOT_OK(creation_status); |
416 | 8.66k | return ret; |
417 | 8.66k | } |
418 | | OverloadManagerImpl::OverloadManagerImpl(Event::Dispatcher& dispatcher, Stats::Scope& stats_scope, |
419 | | ThreadLocal::SlotAllocator& slot_allocator, |
420 | | const envoy::config::overload::v3::OverloadManager& config, |
421 | | ProtobufMessage::ValidationVisitor& validation_visitor, |
422 | | Api::Api& api, const Server::Options& options, |
423 | | absl::Status& creation_status) |
424 | | : dispatcher_(dispatcher), time_source_(api.timeSource()), tls_(slot_allocator), |
425 | | refresh_interval_( |
426 | | std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, refresh_interval, 1000))), |
427 | | refresh_interval_delays_(makeHistogram(stats_scope, "refresh_interval_delay", |
428 | | Stats::Histogram::Unit::Milliseconds)), |
429 | | proactive_resources_( |
430 | | std::make_unique< |
431 | 8.66k | absl::node_hash_map<OverloadProactiveResourceName, ProactiveResource>>()) { |
432 | 8.66k | Configuration::ResourceMonitorFactoryContextImpl context(dispatcher, options, api, |
433 | 8.66k | validation_visitor); |
434 | | // We should hide impl details from users, for them there should be no distinction between |
435 | | // proactive and regular resource monitors in configuration API. But internally we will maintain |
436 | | // two distinct collections of proactive and regular resources. Proactive resources are not |
437 | | // subject to periodic flushes and can be recalculated/updated on demand by invoking |
438 | | // `tryAllocateResource/tryDeallocateResource` via thread local overload state. |
439 | 8.66k | for (const auto& resource : config.resource_monitors()) { |
440 | 54 | const auto& name = resource.name(); |
441 | | // Check if it is a proactive resource. |
442 | 54 | auto proactive_resource_it = |
443 | 54 | OverloadProactiveResources::get().proactive_action_name_to_resource_.find(name); |
444 | 54 | ENVOY_LOG(debug, "Evaluating resource {}", name); |
445 | 54 | bool result = false; |
446 | 54 | if (proactive_resource_it != |
447 | 54 | OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) { |
448 | 0 | ENVOY_LOG(debug, "Adding proactive resource monitor for {}", name); |
449 | 0 | auto& factory = |
450 | 0 | Config::Utility::getAndCheckFactory<Configuration::ProactiveResourceMonitorFactory>( |
451 | 0 | resource); |
452 | 0 | auto config = |
453 | 0 | Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory); |
454 | 0 | auto monitor = factory.createProactiveResourceMonitor(*config, context); |
455 | 0 | result = |
456 | 0 | proactive_resources_ |
457 | 0 | ->try_emplace(proactive_resource_it->second, name, std::move(monitor), stats_scope) |
458 | 0 | .second; |
459 | 54 | } else { |
460 | 54 | ENVOY_LOG(debug, "Adding resource monitor for {}", name); |
461 | 54 | auto& factory = |
462 | 54 | Config::Utility::getAndCheckFactory<Configuration::ResourceMonitorFactory>(resource); |
463 | 54 | auto config = |
464 | 54 | Config::Utility::translateToFactoryConfig(resource, validation_visitor, factory); |
465 | 54 | auto monitor = factory.createResourceMonitor(*config, context); |
466 | 54 | result = resources_.try_emplace(name, name, std::move(monitor), *this, stats_scope).second; |
467 | 54 | } |
468 | 54 | if (!result) { |
469 | 0 | creation_status = |
470 | 0 | absl::InvalidArgumentError(absl::StrCat("Duplicate resource monitor ", name)); |
471 | 0 | return; |
472 | 0 | } |
473 | 54 | } |
474 | | |
475 | 8.66k | for (const auto& action : config.actions()) { |
476 | 0 | const auto& name = action.name(); |
477 | 0 | const auto symbol = action_symbol_table_.get(name); |
478 | 0 | ENVOY_LOG(debug, "Adding overload action {}", name); |
479 | | |
480 | | // Validate that this is a well known overload action. |
481 | 0 | const auto& well_known_actions = OverloadActionNames::get().WellKnownActions; |
482 | 0 | if (std::find(well_known_actions.begin(), well_known_actions.end(), name) == |
483 | 0 | well_known_actions.end()) { |
484 | 0 | creation_status = |
485 | 0 | absl::InvalidArgumentError(absl::StrCat("Unknown Overload Manager Action ", name)); |
486 | 0 | return; |
487 | 0 | } |
488 | | |
489 | | // TODO: use in place construction once https://github.com/abseil/abseil-cpp/issues/388 is |
490 | | // addressed |
491 | | // We cannot currently use in place construction as the OverloadAction constructor may fail, |
492 | | // causing an inconsistent internal state of the actions_ map, which on destruction results in |
493 | | // an invalid free. |
494 | 0 | auto action_or_error = OverloadAction::create(action, stats_scope); |
495 | 0 | SET_AND_RETURN_IF_NOT_OK(action_or_error.status(), creation_status); |
496 | 0 | auto result = actions_.try_emplace(symbol, std::move(*action_or_error)); |
497 | 0 | if (!result.second) { |
498 | 0 | creation_status = |
499 | 0 | absl::InvalidArgumentError(absl::StrCat("Duplicate overload action ", name)); |
500 | 0 | return; |
501 | 0 | } |
502 | | |
503 | 0 | if (name == OverloadActionNames::get().ReduceTimeouts) { |
504 | 0 | auto timer_or_error = parseTimerMinimums(action.typed_config(), validation_visitor); |
505 | 0 | SET_AND_RETURN_IF_NOT_OK(timer_or_error.status(), creation_status); |
506 | 0 | timer_minimums_ = |
507 | 0 | std::make_shared<const Event::ScaledTimerTypeMap>(std::move(*timer_or_error)); |
508 | 0 | } else if (name == OverloadActionNames::get().ResetStreams) { |
509 | 0 | if (!config.has_buffer_factory_config()) { |
510 | 0 | creation_status = absl::InvalidArgumentError( |
511 | 0 | fmt::format("Overload action \"{}\" requires buffer_factory_config.", name)); |
512 | 0 | return; |
513 | 0 | } |
514 | 0 | makeCounter(api.rootScope(), OverloadActionStatsNames::get().ResetStreamsCount); |
515 | 0 | } else if (action.has_typed_config()) { |
516 | 0 | creation_status = absl::InvalidArgumentError(fmt::format( |
517 | 0 | "Overload action \"{}\" has an unexpected value for the typed_config field", name)); |
518 | 0 | return; |
519 | 0 | } |
520 | | |
521 | 0 | for (const auto& trigger : action.triggers()) { |
522 | 0 | const std::string& resource = trigger.name(); |
523 | 0 | auto proactive_resource_it = |
524 | 0 | OverloadProactiveResources::get().proactive_action_name_to_resource_.find(resource); |
525 | |
|
526 | 0 | if (resources_.find(resource) == resources_.end() && |
527 | 0 | proactive_resource_it == |
528 | 0 | OverloadProactiveResources::get().proactive_action_name_to_resource_.end()) { |
529 | 0 | creation_status = absl::InvalidArgumentError( |
530 | 0 | fmt::format("Unknown trigger resource {} for overload action {}", resource, name)); |
531 | 0 | return; |
532 | 0 | } |
533 | 0 | resource_to_actions_.insert(std::make_pair(resource, symbol)); |
534 | 0 | } |
535 | 0 | } |
536 | | |
537 | | // Validate the trigger resources for Load shedPoints. |
538 | 8.66k | for (const auto& point : config.loadshed_points()) { |
539 | 0 | for (const auto& trigger : point.triggers()) { |
540 | 0 | if (!resources_.contains(trigger.name())) { |
541 | 0 | creation_status = absl::InvalidArgumentError(fmt::format( |
542 | 0 | "Unknown trigger resource {} for loadshed point {}", trigger.name(), point.name())); |
543 | 0 | return; |
544 | 0 | } |
545 | 0 | } |
546 | | |
547 | 0 | auto load_shed_or_error = |
548 | 0 | LoadShedPointImpl::create(point, api.rootScope(), api.randomGenerator()); |
549 | 0 | SET_AND_RETURN_IF_NOT_OK(load_shed_or_error.status(), creation_status); |
550 | 0 | const auto result = loadshed_points_.try_emplace(point.name(), *std::move(load_shed_or_error)); |
551 | |
|
552 | 0 | if (!result.second) { |
553 | 0 | creation_status = |
554 | 0 | absl::InvalidArgumentError(absl::StrCat("Duplicate loadshed point ", point.name())); |
555 | 0 | return; |
556 | 0 | } |
557 | 0 | } |
558 | 8.66k | } |
559 | | |
560 | 1.97k | void OverloadManagerImpl::start() { |
561 | 1.97k | ASSERT(!started_); |
562 | 1.97k | started_ = true; |
563 | | |
564 | 3.95k | tls_.set([this](Event::Dispatcher&) { |
565 | 3.95k | return std::make_shared<ThreadLocalOverloadStateImpl>(action_symbol_table_, |
566 | 3.95k | proactive_resources_); |
567 | 3.95k | }); |
568 | | |
569 | 1.97k | if (resources_.empty()) { |
570 | 1.97k | return; |
571 | 1.97k | } |
572 | | |
573 | 0 | timer_ = dispatcher_.createTimer([this]() -> void { |
574 | | // Guarantee that all resource updates get flushed after no more than one refresh_interval_. |
575 | 0 | flushResourceUpdates(); |
576 | | |
577 | | // Start a new flush epoch. If all resource updates complete before this callback runs, the last |
578 | | // resource update will call flushResourceUpdates to flush the whole batch early. |
579 | 0 | ++flush_epoch_; |
580 | 0 | flush_awaiting_updates_ = resources_.size(); |
581 | |
|
582 | 0 | for (auto& resource : resources_) { |
583 | 0 | resource.second.update(flush_epoch_); |
584 | 0 | } |
585 | | |
586 | | // Record delay. |
587 | 0 | auto now = time_source_.monotonicTime(); |
588 | 0 | std::chrono::milliseconds delay = |
589 | 0 | std::chrono::duration_cast<std::chrono::milliseconds>(now - time_resources_last_measured_); |
590 | 0 | refresh_interval_delays_.recordValue(delay.count()); |
591 | 0 | time_resources_last_measured_ = now; |
592 | |
|
593 | 0 | timer_->enableTimer(refresh_interval_); |
594 | 0 | }); |
595 | |
|
596 | 0 | time_resources_last_measured_ = time_source_.monotonicTime(); |
597 | 0 | timer_->enableTimer(refresh_interval_); |
598 | 0 | } |
599 | | |
600 | 4.47k | void OverloadManagerImpl::stop() { |
601 | | // Disable any pending timeouts. |
602 | 4.47k | if (timer_) { |
603 | 0 | timer_->disableTimer(); |
604 | 0 | } |
605 | | |
606 | | // Clear the resource map to block on any pending updates. |
607 | 4.47k | resources_.clear(); |
608 | | |
609 | | // TODO(nezdolik): wrap proactive monitors into atomic? and clear it here |
610 | 4.47k | } |
611 | | |
612 | | bool OverloadManagerImpl::registerForAction(const std::string& action, |
613 | | Event::Dispatcher& dispatcher, |
614 | 22.2k | OverloadActionCb callback) { |
615 | 22.2k | ASSERT(!started_); |
616 | 22.2k | const auto symbol = action_symbol_table_.get(action); |
617 | | |
618 | 22.2k | if (actions_.find(symbol) == actions_.end()) { |
619 | 22.2k | ENVOY_LOG(debug, "No overload action is configured for {}.", action); |
620 | 22.2k | return false; |
621 | 22.2k | } |
622 | | |
623 | 0 | action_to_callbacks_.emplace(std::piecewise_construct, std::forward_as_tuple(symbol), |
624 | 0 | std::forward_as_tuple(dispatcher, callback)); |
625 | 0 | return true; |
626 | 22.2k | } |
627 | | |
628 | 6.89k | ThreadLocalOverloadState& OverloadManagerImpl::getThreadLocalOverloadState() { return *tls_; } |
629 | 4.45k | Event::ScaledRangeTimerManagerFactory OverloadManagerImpl::scaledTimerFactory() { |
630 | 4.45k | return [this](Event::Dispatcher& dispatcher) { |
631 | 4.45k | auto manager = createScaledRangeTimerManager(dispatcher, timer_minimums_); |
632 | 4.45k | registerForAction(OverloadActionNames::get().ReduceTimeouts, dispatcher, |
633 | 4.45k | [manager = manager.get()](OverloadActionState scale_state) { |
634 | 0 | manager->setScaleFactor( |
635 | | // The action state is 0 for no overload up to 1 for maximal overload, |
636 | | // but the scale factor for timers is 1 for no scaling and 0 for maximal |
637 | | // scaling, so invert the value to pass in (1-value). |
638 | 0 | scale_state.value().invert()); |
639 | 0 | }); |
640 | 4.45k | return manager; |
641 | 4.45k | }; |
642 | 4.45k | } |
643 | | |
644 | 10.8k | LoadShedPoint* OverloadManagerImpl::getLoadShedPoint(absl::string_view point_name) { |
645 | 10.8k | if (auto it = loadshed_points_.find(point_name); it != loadshed_points_.end()) { |
646 | 0 | return it->second.get(); |
647 | 0 | } |
648 | 10.8k | return nullptr; |
649 | 10.8k | } |
650 | | |
651 | | Event::ScaledRangeTimerManagerPtr OverloadManagerImpl::createScaledRangeTimerManager( |
652 | | Event::Dispatcher& dispatcher, |
653 | 4.45k | const Event::ScaledTimerTypeMapConstSharedPtr& timer_minimums) const { |
654 | 4.45k | return std::make_unique<Event::ScaledRangeTimerManagerImpl>(dispatcher, timer_minimums); |
655 | 4.45k | } |
656 | | |
657 | | void OverloadManagerImpl::updateResourcePressure(const std::string& resource, double pressure, |
658 | 0 | FlushEpochId flush_epoch) { |
659 | 0 | auto [start, end] = resource_to_actions_.equal_range(resource); |
660 | |
|
661 | 0 | std::for_each(start, end, [&](ResourceToActionMap::value_type& entry) { |
662 | 0 | const NamedOverloadActionSymbolTable::Symbol action = entry.second; |
663 | 0 | auto action_it = actions_.find(action); |
664 | 0 | ASSERT(action_it != actions_.end()); |
665 | 0 | const OverloadActionState old_state = action_it->second->getState(); |
666 | 0 | if (action_it->second->updateResourcePressure(resource, pressure)) { |
667 | 0 | const auto state = action_it->second->getState(); |
668 | |
|
669 | 0 | if (old_state.isSaturated() != state.isSaturated()) { |
670 | 0 | ENVOY_LOG(debug, "Overload action {} became {}", action_symbol_table_.name(action), |
671 | 0 | (state.isSaturated() ? "saturated" : "scaling")); |
672 | 0 | } |
673 | | |
674 | | // Record the updated value to be sent to workers on the next thread-local-state flush, along |
675 | | // with any update callbacks. This might overwrite a previous action state change caused by a |
676 | | // pressure update for a different resource that hasn't been flushed yet. That's okay because |
677 | | // the state recorded here includes the information from all previous resource updates. So |
678 | | // even if resource 1 causes an action to have value A, and a later update to resource 2 |
679 | | // causes the action to have value B, B would have been the result for whichever order the |
680 | | // updates to resources 1 and 2 came in. |
681 | 0 | state_updates_to_flush_.insert_or_assign(action, state); |
682 | 0 | auto [callbacks_start, callbacks_end] = action_to_callbacks_.equal_range(action); |
683 | 0 | std::for_each(callbacks_start, callbacks_end, [&](ActionToCallbackMap::value_type& cb_entry) { |
684 | 0 | callbacks_to_flush_.insert_or_assign(&cb_entry.second, state); |
685 | 0 | }); |
686 | 0 | } |
687 | 0 | }); |
688 | |
|
689 | 0 | for (auto& loadshed_point : loadshed_points_) { |
690 | 0 | loadshed_point.second->updateResource(resource, pressure); |
691 | 0 | } |
692 | | |
693 | | // Eagerly flush updates if this is the last call to updateResourcePressure expected for the |
694 | | // current epoch. This assert is always valid because flush_awaiting_updates_ is initialized |
695 | | // before each batch of updates, and even if a resource monitor performs a double update, or a |
696 | | // previous update callback is late, the logic in OverloadManager::Resource::update() will prevent |
697 | | // unexpected calls to this function. |
698 | 0 | ASSERT(flush_awaiting_updates_ > 0); |
699 | 0 | --flush_awaiting_updates_; |
700 | 0 | if (flush_epoch == flush_epoch_ && flush_awaiting_updates_ == 0) { |
701 | 0 | flushResourceUpdates(); |
702 | 0 | } |
703 | 0 | } |
704 | | |
705 | 0 | void OverloadManagerImpl::flushResourceUpdates() { |
706 | 0 | if (!state_updates_to_flush_.empty()) { |
707 | 0 | auto shared_updates = std::make_shared< |
708 | 0 | absl::flat_hash_map<NamedOverloadActionSymbolTable::Symbol, OverloadActionState>>(); |
709 | 0 | std::swap(*shared_updates, state_updates_to_flush_); |
710 | |
|
711 | 0 | tls_.runOnAllThreads( |
712 | 0 | [updates = std::move(shared_updates)](OptRef<ThreadLocalOverloadStateImpl> overload_state) { |
713 | 0 | for (const auto& [action, state] : *updates) { |
714 | 0 | overload_state->setState(action, state); |
715 | 0 | } |
716 | 0 | }); |
717 | 0 | } |
718 | |
|
719 | 0 | for (const auto& [cb, state] : callbacks_to_flush_) { |
720 | 0 | cb->dispatcher_.post([cb = cb, state = state]() { cb->callback_(state); }); |
721 | 0 | } |
722 | 0 | callbacks_to_flush_.clear(); |
723 | 0 | } |
724 | | |
725 | | OverloadManagerImpl::Resource::Resource(const std::string& name, ResourceMonitorPtr monitor, |
726 | | OverloadManagerImpl& manager, Stats::Scope& stats_scope) |
727 | | : name_(name), monitor_(std::move(monitor)), manager_(manager), |
728 | | pressure_gauge_( |
729 | | makeGauge(stats_scope, name, "pressure", Stats::Gauge::ImportMode::NeverImport)), |
730 | | failed_updates_counter_(makeCounter(stats_scope, name, "failed_updates")), |
731 | 0 | skipped_updates_counter_(makeCounter(stats_scope, name, "skipped_updates")) {} |
732 | | |
733 | 0 | void OverloadManagerImpl::Resource::update(FlushEpochId flush_epoch) { |
734 | 0 | if (!pending_update_) { |
735 | 0 | pending_update_ = true; |
736 | 0 | flush_epoch_ = flush_epoch; |
737 | 0 | monitor_->updateResourceUsage(*this); |
738 | 0 | return; |
739 | 0 | } |
740 | 0 | ENVOY_LOG(debug, "Skipping update for resource {} which has pending update", name_); |
741 | 0 | skipped_updates_counter_.inc(); |
742 | 0 | } |
743 | | |
744 | 0 | void OverloadManagerImpl::Resource::onSuccess(const ResourceUsage& usage) { |
745 | 0 | pending_update_ = false; |
746 | 0 | manager_.updateResourcePressure(name_, usage.resource_pressure_, flush_epoch_); |
747 | 0 | pressure_gauge_.set(usage.resource_pressure_ * 100); // convert to percent |
748 | 0 | } |
749 | | |
750 | 0 | void OverloadManagerImpl::Resource::onFailure(const EnvoyException& error) { |
751 | 0 | pending_update_ = false; |
752 | 0 | ENVOY_LOG(info, "Failed to update resource {}: {}", name_, error.what()); |
753 | 0 | failed_updates_counter_.inc(); |
754 | 0 | } |
755 | | |
756 | | } // namespace Server |
757 | | } // namespace Envoy |