Line data Source code
1 : #include "source/common/stats/allocator_impl.h"
2 :
3 : #include <algorithm>
4 : #include <cstdint>
5 :
6 : #include "envoy/stats/sink.h"
7 : #include "envoy/stats/stats.h"
8 :
9 : #include "source/common/common/hash.h"
10 : #include "source/common/common/lock_guard.h"
11 : #include "source/common/common/logger.h"
12 : #include "source/common/common/thread.h"
13 : #include "source/common/common/thread_annotations.h"
14 : #include "source/common/common/utility.h"
15 : #include "source/common/stats/metric_impl.h"
16 : #include "source/common/stats/stat_merger.h"
17 : #include "source/common/stats/symbol_table.h"
18 :
19 : #include "absl/container/flat_hash_set.h"
20 :
21 : namespace Envoy {
22 : namespace Stats {
23 :
24 : const char AllocatorImpl::DecrementToZeroSyncPoint[] = "decrement-zero";
25 :
26 21460 : AllocatorImpl::~AllocatorImpl() {
27 21460 : ASSERT(counters_.empty());
28 21460 : ASSERT(gauges_.empty());
29 :
30 : #ifndef NDEBUG
31 : // Move deleted stats into the sets for the ASSERTs in removeFromSetLockHeld to function.
32 : for (auto& counter : deleted_counters_) {
33 : auto insertion = counters_.insert(counter.get());
34 : // Assert that there were no duplicates.
35 : ASSERT(insertion.second);
36 : }
37 : for (auto& gauge : deleted_gauges_) {
38 : auto insertion = gauges_.insert(gauge.get());
39 : // Assert that there were no duplicates.
40 : ASSERT(insertion.second);
41 : }
42 : for (auto& text_readout : deleted_text_readouts_) {
43 : auto insertion = text_readouts_.insert(text_readout.get());
44 : // Assert that there were no duplicates.
45 : ASSERT(insertion.second);
46 : }
47 : #endif
48 21460 : }
49 :
50 : #ifndef ENVOY_CONFIG_COVERAGE
51 : void AllocatorImpl::debugPrint() {
52 : Thread::LockGuard lock(mutex_);
53 : for (Counter* counter : counters_) {
54 : ENVOY_LOG_MISC(info, "counter: {}", symbolTable().toString(counter->statName()));
55 : }
56 : for (Gauge* gauge : gauges_) {
57 : ENVOY_LOG_MISC(info, "gauge: {}", symbolTable().toString(gauge->statName()));
58 : }
59 : }
60 : #endif
61 :
62 : // Counter, Gauge and TextReadout inherit from RefcountInterface and
63 : // Metric. MetricImpl takes care of most of the Metric API, but we need to cover
64 : // symbolTable() here, which we don't store directly, but get it via the alloc,
65 : // which we need in order to clean up the counter and gauge maps in that class
66 : // when they are destroyed.
67 : //
68 : // We implement the RefcountInterface API to avoid weak counter and destructor overhead in
69 : // shared_ptr.
70 : template <class BaseClass> class StatsSharedImpl : public MetricImpl<BaseClass> {
71 : public:
72 : StatsSharedImpl(StatName name, AllocatorImpl& alloc, StatName tag_extracted_name,
73 : const StatNameTagVector& stat_name_tags)
74 : : MetricImpl<BaseClass>(name, tag_extracted_name, stat_name_tags, alloc.symbolTable()),
75 327633 : alloc_(alloc) {}
76 :
77 327633 : ~StatsSharedImpl() override {
78 : // MetricImpl must be explicitly cleared() before destruction, otherwise it
79 : // will not be able to access the SymbolTable& to free the symbols. An RAII
80 : // alternative would be to store the SymbolTable reference in the
81 : // MetricImpl, costing 8 bytes per stat.
82 327633 : this->clear(symbolTable());
83 327633 : }
84 :
85 : // Metric
86 901809 : SymbolTable& symbolTable() final { return alloc_.symbolTable(); }
87 760 : bool used() const override { return flags_ & Metric::Flags::Used; }
88 9658 : bool hidden() const override { return flags_ & Metric::Flags::Hidden; }
89 :
90 : // RefcountInterface
91 1647624 : void incRefCount() override { ++ref_count_; }
92 1647625 : bool decRefCount() override {
93 : // We must, unfortunately, hold the allocator's lock when decrementing the
94 : // refcount. Otherwise another thread may simultaneously try to allocate the
95 : // same name'd stat after we decrement it, and we'll wind up with a
96 : // dtor/update race. To avoid this we must hold the lock until the stat is
97 : // removed from the map.
98 : //
99 : // It might be worth thinking about a race-free way to decrement ref-counts
100 : // without a lock, for the case where ref_count > 2, and we don't need to
101 : // destruct anything. But it seems preferable at to be conservative here,
102 : // as stats will only go out of scope when a scope is destructed (during
103 : // xDS) or during admin stats operations.
104 1647625 : Thread::LockGuard lock(alloc_.mutex_);
105 1647625 : ASSERT(ref_count_ >= 1);
106 1647625 : if (--ref_count_ == 0) {
107 327633 : alloc_.sync().syncPoint(AllocatorImpl::DecrementToZeroSyncPoint);
108 327633 : removeFromSetLockHeld();
109 327633 : return true;
110 327633 : }
111 1319992 : return false;
112 1647625 : }
113 0 : uint32_t use_count() const override { return ref_count_; }
114 :
115 : /**
116 : * We must atomically remove the counter/gauges from the allocator's sets when
117 : * our ref-count decrement hits zero. The counters and gauges are held in
118 : * distinct sets so we virtualize this removal helper.
119 : */
120 : virtual void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) PURE;
121 :
122 : protected:
123 : AllocatorImpl& alloc_;
124 :
125 : // ref_count_ can be incremented as an atomic, without taking a new lock, as
126 : // the critical 0->1 transition occurs in makeCounter and makeGauge, which
127 : // already hold the lock. Increment also occurs when copying shared pointers,
128 : // but these are always in transition to ref-count 2 or higher, and thus
129 : // cannot race with a decrement to zero.
130 : //
131 : // However, we must hold alloc_.mutex_ when decrementing ref_count_ so that
132 : // when it hits zero we can atomically remove it from alloc_.counters_ or
133 : // alloc_.gauges_. We leave it atomic to avoid taking the lock on increment.
134 : std::atomic<uint32_t> ref_count_{0};
135 :
136 : std::atomic<uint16_t> flags_{0};
137 : };
138 :
139 : class CounterImpl : public StatsSharedImpl<Counter> {
140 : public:
141 : CounterImpl(StatName name, AllocatorImpl& alloc, StatName tag_extracted_name,
142 : const StatNameTagVector& stat_name_tags)
143 257809 : : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {}
144 :
145 257809 : void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
146 257809 : const size_t count = alloc_.counters_.erase(statName());
147 257809 : ASSERT(count == 1);
148 257809 : alloc_.sinked_counters_.erase(this);
149 257809 : }
150 :
151 : // Stats::Counter
152 2258633 : void add(uint64_t amount) override {
153 : // Note that a reader may see a new value but an old pending_increment_ or
154 : // used(). From a system perspective this should be eventually consistent.
155 2258633 : value_ += amount;
156 2258633 : pending_increment_ += amount;
157 2258633 : flags_ |= Flags::Used;
158 2258633 : }
159 2237708 : void inc() override { add(1); }
160 36262 : uint64_t latch() override { return pending_increment_.exchange(0); }
161 0 : void reset() override { value_ = 0; }
162 2127 : uint64_t value() const override { return value_; }
163 :
164 : private:
165 : std::atomic<uint64_t> value_{0};
166 : std::atomic<uint64_t> pending_increment_{0};
167 : };
168 :
169 : class GaugeImpl : public StatsSharedImpl<Gauge> {
170 : public:
171 : GaugeImpl(StatName name, AllocatorImpl& alloc, StatName tag_extracted_name,
172 : const StatNameTagVector& stat_name_tags, ImportMode import_mode)
173 69590 : : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {
174 69590 : switch (import_mode) {
175 45566 : case ImportMode::Accumulate:
176 45566 : flags_ |= Flags::LogicAccumulate;
177 45566 : break;
178 24024 : case ImportMode::NeverImport:
179 24024 : flags_ |= Flags::NeverImport;
180 24024 : break;
181 0 : case ImportMode::Uninitialized:
182 : // Note that we don't clear any flag bits for import_mode==Uninitialized,
183 : // as we may have an established import_mode when this stat was created in
184 : // an alternate scope. See
185 : // https://github.com/envoyproxy/envoy/issues/7227.
186 0 : break;
187 0 : case ImportMode::HiddenAccumulate:
188 0 : flags_ |= Flags::Hidden;
189 0 : flags_ |= Flags::LogicAccumulate;
190 0 : break;
191 69590 : }
192 69590 : }
193 :
194 69590 : void removeFromSetLockHeld() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) {
195 69590 : const size_t count = alloc_.gauges_.erase(statName());
196 69590 : ASSERT(count == 1);
197 69590 : alloc_.sinked_gauges_.erase(this);
198 69590 : }
199 :
200 : // Stats::Gauge
201 11582 : void add(uint64_t amount) override {
202 11582 : child_value_ += amount;
203 11582 : flags_ |= Flags::Used;
204 11582 : }
205 8389 : void dec() override { sub(1); }
206 8397 : void inc() override { add(1); }
207 123031 : void set(uint64_t value) override {
208 123031 : child_value_ = value;
209 123031 : flags_ |= Flags::Used;
210 123031 : }
211 34144 : void sub(uint64_t amount) override {
212 34144 : ASSERT(child_value_ >= amount);
213 34144 : ASSERT(used() || amount == 0);
214 34144 : child_value_ -= amount;
215 34144 : }
216 529 : uint64_t value() const override { return child_value_ + parent_value_; }
217 :
218 : // TODO(diazalan): Rename importMode and to more generic name
219 78719 : ImportMode importMode() const override {
220 78719 : if (flags_ & Flags::NeverImport) {
221 24052 : return ImportMode::NeverImport;
222 58019 : } else if ((flags_ & Flags::Hidden) && (flags_ & Flags::LogicAccumulate)) {
223 0 : return ImportMode::HiddenAccumulate;
224 54667 : } else if (flags_ & Flags::LogicAccumulate) {
225 54667 : return ImportMode::Accumulate;
226 54667 : }
227 0 : return ImportMode::Uninitialized;
228 78719 : }
229 :
230 : // TODO(diazalan): Rename mergeImportMode and to more generic name
231 78719 : void mergeImportMode(ImportMode import_mode) override {
232 78719 : ImportMode current = importMode();
233 78719 : if (current == import_mode) {
234 78719 : return;
235 78719 : }
236 :
237 0 : switch (import_mode) {
238 0 : case ImportMode::Uninitialized:
239 : // mergeImportNode(ImportMode::Uninitialized) is called when merging an
240 : // existing stat with importMode() == Accumulate or NeverImport.
241 0 : break;
242 0 : case ImportMode::Accumulate:
243 0 : ASSERT(current == ImportMode::Uninitialized);
244 0 : flags_ |= Flags::LogicAccumulate;
245 0 : break;
246 0 : case ImportMode::NeverImport:
247 0 : ASSERT(current == ImportMode::Uninitialized);
248 : // A previous revision of Envoy may have transferred a gauge that it
249 : // thought was Accumulate. But the new version thinks it's NeverImport, so
250 : // we clear the accumulated value.
251 0 : parent_value_ = 0;
252 0 : flags_ &= ~Flags::Used;
253 0 : flags_ |= Flags::NeverImport;
254 0 : break;
255 0 : case ImportMode::HiddenAccumulate:
256 0 : ASSERT(current == ImportMode::Uninitialized);
257 0 : flags_ |= Flags::Hidden;
258 0 : flags_ |= Flags::LogicAccumulate;
259 0 : break;
260 0 : }
261 0 : }
262 :
263 0 : void setParentValue(uint64_t value) override { parent_value_ = value; }
264 :
265 : private:
266 : std::atomic<uint64_t> parent_value_{0};
267 : std::atomic<uint64_t> child_value_{0};
268 : };
269 :
270 : class TextReadoutImpl : public StatsSharedImpl<TextReadout> {
271 : public:
272 : TextReadoutImpl(StatName name, AllocatorImpl& alloc, StatName tag_extracted_name,
273 : const StatNameTagVector& stat_name_tags)
274 234 : : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {}
275 :
276 234 : void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
277 234 : const size_t count = alloc_.text_readouts_.erase(statName());
278 234 : ASSERT(count == 1);
279 234 : alloc_.sinked_text_readouts_.erase(this);
280 234 : }
281 :
282 : // Stats::TextReadout
283 314 : void set(absl::string_view value) override {
284 314 : std::string value_copy(value);
285 314 : absl::MutexLock lock(&mutex_);
286 314 : value_ = std::move(value_copy);
287 314 : flags_ |= Flags::Used;
288 314 : }
289 0 : std::string value() const override {
290 0 : absl::MutexLock lock(&mutex_);
291 0 : return value_;
292 0 : }
293 :
294 : private:
295 : mutable absl::Mutex mutex_;
296 : std::string value_ ABSL_GUARDED_BY(mutex_);
297 : };
298 :
299 : CounterSharedPtr AllocatorImpl::makeCounter(StatName name, StatName tag_extracted_name,
300 261960 : const StatNameTagVector& stat_name_tags) {
301 261960 : Thread::LockGuard lock(mutex_);
302 261960 : ASSERT(gauges_.find(name) == gauges_.end());
303 261960 : ASSERT(text_readouts_.find(name) == text_readouts_.end());
304 261960 : auto iter = counters_.find(name);
305 261960 : if (iter != counters_.end()) {
306 4151 : return {*iter};
307 4151 : }
308 257809 : auto counter = CounterSharedPtr(makeCounterInternal(name, tag_extracted_name, stat_name_tags));
309 257809 : counters_.insert(counter.get());
310 : // Add counter to sinked_counters_ if it matches the sink predicate.
311 257809 : if (sink_predicates_ != nullptr && sink_predicates_->includeCounter(*counter)) {
312 0 : auto val = sinked_counters_.insert(counter.get());
313 0 : ASSERT(val.second);
314 0 : }
315 257809 : return counter;
316 261960 : }
317 :
318 : GaugeSharedPtr AllocatorImpl::makeGauge(StatName name, StatName tag_extracted_name,
319 : const StatNameTagVector& stat_name_tags,
320 70016 : Gauge::ImportMode import_mode) {
321 70016 : Thread::LockGuard lock(mutex_);
322 70016 : ASSERT(counters_.find(name) == counters_.end());
323 70016 : ASSERT(text_readouts_.find(name) == text_readouts_.end());
324 70016 : auto iter = gauges_.find(name);
325 70016 : if (iter != gauges_.end()) {
326 426 : return {*iter};
327 426 : }
328 69590 : auto gauge =
329 69590 : GaugeSharedPtr(new GaugeImpl(name, *this, tag_extracted_name, stat_name_tags, import_mode));
330 69590 : gauges_.insert(gauge.get());
331 : // Add gauge to sinked_gauges_ if it matches the sink predicate.
332 69590 : if (sink_predicates_ != nullptr && sink_predicates_->includeGauge(*gauge)) {
333 0 : auto val = sinked_gauges_.insert(gauge.get());
334 0 : ASSERT(val.second);
335 0 : }
336 69590 : return gauge;
337 70016 : }
338 :
339 : TextReadoutSharedPtr AllocatorImpl::makeTextReadout(StatName name, StatName tag_extracted_name,
340 234 : const StatNameTagVector& stat_name_tags) {
341 234 : Thread::LockGuard lock(mutex_);
342 234 : ASSERT(counters_.find(name) == counters_.end());
343 234 : ASSERT(gauges_.find(name) == gauges_.end());
344 234 : auto iter = text_readouts_.find(name);
345 234 : if (iter != text_readouts_.end()) {
346 0 : return {*iter};
347 0 : }
348 234 : auto text_readout =
349 234 : TextReadoutSharedPtr(new TextReadoutImpl(name, *this, tag_extracted_name, stat_name_tags));
350 234 : text_readouts_.insert(text_readout.get());
351 : // Add text_readout to sinked_text_readouts_ if it matches the sink predicate.
352 234 : if (sink_predicates_ != nullptr && sink_predicates_->includeTextReadout(*text_readout)) {
353 0 : auto val = sinked_text_readouts_.insert(text_readout.get());
354 0 : ASSERT(val.second);
355 0 : }
356 234 : return text_readout;
357 234 : }
358 :
359 0 : bool AllocatorImpl::isMutexLockedForTest() {
360 0 : bool locked = mutex_.tryLock();
361 0 : if (locked) {
362 0 : mutex_.unlock();
363 0 : }
364 0 : return !locked;
365 0 : }
366 :
367 : Counter* AllocatorImpl::makeCounterInternal(StatName name, StatName tag_extracted_name,
368 257809 : const StatNameTagVector& stat_name_tags) {
369 257809 : return new CounterImpl(name, *this, tag_extracted_name, stat_name_tags);
370 257809 : }
371 :
372 2252 : void AllocatorImpl::forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
373 2252 : Thread::LockGuard lock(mutex_);
374 2252 : if (f_size != nullptr) {
375 2154 : f_size(counters_.size());
376 2154 : }
377 862200 : for (auto& counter : counters_) {
378 862200 : f_stat(*counter);
379 862200 : }
380 2252 : }
381 :
382 1170 : void AllocatorImpl::forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
383 1170 : Thread::LockGuard lock(mutex_);
384 1170 : if (f_size != nullptr) {
385 1072 : f_size(gauges_.size());
386 1072 : }
387 153437 : for (auto& gauge : gauges_) {
388 153437 : f_stat(*gauge);
389 153437 : }
390 1170 : }
391 :
392 98 : void AllocatorImpl::forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
393 98 : Thread::LockGuard lock(mutex_);
394 98 : if (f_size != nullptr) {
395 98 : f_size(text_readouts_.size());
396 98 : }
397 228 : for (auto& text_readout : text_readouts_) {
398 228 : f_stat(*text_readout);
399 228 : }
400 98 : }
401 :
402 98 : void AllocatorImpl::forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
403 98 : if (sink_predicates_ != nullptr) {
404 0 : Thread::LockGuard lock(mutex_);
405 0 : f_size(sinked_counters_.size());
406 0 : for (auto counter : sinked_counters_) {
407 0 : f_stat(*counter);
408 0 : }
409 98 : } else {
410 98 : forEachCounter(f_size, f_stat);
411 98 : }
412 98 : }
413 :
414 98 : void AllocatorImpl::forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
415 98 : if (sink_predicates_ != nullptr) {
416 0 : Thread::LockGuard lock(mutex_);
417 0 : f_size(sinked_gauges_.size());
418 0 : for (auto gauge : sinked_gauges_) {
419 0 : f_stat(*gauge);
420 0 : }
421 98 : } else {
422 9658 : forEachGauge(f_size, [&f_stat](Gauge& gauge) {
423 9658 : if (!gauge.hidden()) {
424 9658 : f_stat(gauge);
425 9658 : }
426 9658 : });
427 98 : }
428 98 : }
429 :
430 98 : void AllocatorImpl::forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
431 98 : if (sink_predicates_ != nullptr) {
432 0 : Thread::LockGuard lock(mutex_);
433 0 : f_size(sinked_text_readouts_.size());
434 0 : for (auto text_readout : sinked_text_readouts_) {
435 0 : f_stat(*text_readout);
436 0 : }
437 98 : } else {
438 98 : forEachTextReadout(f_size, f_stat);
439 98 : }
440 98 : }
441 :
442 0 : void AllocatorImpl::setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) {
443 0 : Thread::LockGuard lock(mutex_);
444 0 : ASSERT(sink_predicates_ == nullptr);
445 0 : sink_predicates_ = std::move(sink_predicates);
446 0 : sinked_counters_.clear();
447 0 : sinked_gauges_.clear();
448 0 : sinked_text_readouts_.clear();
449 : // Add counters to the set of sinked counters.
450 0 : for (auto& counter : counters_) {
451 0 : if (sink_predicates_->includeCounter(*counter)) {
452 0 : sinked_counters_.emplace(counter);
453 0 : }
454 0 : }
455 : // Add gauges to the set of sinked gauges.
456 0 : for (auto& gauge : gauges_) {
457 0 : if (sink_predicates_->includeGauge(*gauge)) {
458 0 : sinked_gauges_.insert(gauge);
459 0 : }
460 0 : }
461 : // Add text_readouts to the set of sinked text readouts.
462 0 : for (auto& text_readout : text_readouts_) {
463 0 : if (sink_predicates_->includeTextReadout(*text_readout)) {
464 0 : sinked_text_readouts_.insert(text_readout);
465 0 : }
466 0 : }
467 0 : }
468 :
469 0 : void AllocatorImpl::markCounterForDeletion(const CounterSharedPtr& counter) {
470 0 : Thread::LockGuard lock(mutex_);
471 0 : auto iter = counters_.find(counter->statName());
472 0 : if (iter == counters_.end()) {
473 : // This has already been marked for deletion.
474 0 : return;
475 0 : }
476 0 : ASSERT(counter.get() == *iter);
477 : // Duplicates are ASSERTed in ~AllocatorImpl.
478 0 : deleted_counters_.emplace_back(*iter);
479 0 : counters_.erase(iter);
480 0 : sinked_counters_.erase(counter.get());
481 0 : }
482 :
483 0 : void AllocatorImpl::markGaugeForDeletion(const GaugeSharedPtr& gauge) {
484 0 : Thread::LockGuard lock(mutex_);
485 0 : auto iter = gauges_.find(gauge->statName());
486 0 : if (iter == gauges_.end()) {
487 : // This has already been marked for deletion.
488 0 : return;
489 0 : }
490 0 : ASSERT(gauge.get() == *iter);
491 : // Duplicates are ASSERTed in ~AllocatorImpl.
492 0 : deleted_gauges_.emplace_back(*iter);
493 0 : gauges_.erase(iter);
494 0 : sinked_gauges_.erase(gauge.get());
495 0 : }
496 :
497 0 : void AllocatorImpl::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) {
498 0 : Thread::LockGuard lock(mutex_);
499 0 : auto iter = text_readouts_.find(text_readout->statName());
500 0 : if (iter == text_readouts_.end()) {
501 : // This has already been marked for deletion.
502 0 : return;
503 0 : }
504 0 : ASSERT(text_readout.get() == *iter);
505 : // Duplicates are ASSERTed in ~AllocatorImpl.
506 0 : deleted_text_readouts_.emplace_back(*iter);
507 0 : text_readouts_.erase(iter);
508 0 : sinked_text_readouts_.erase(text_readout.get());
509 0 : }
510 :
511 : } // namespace Stats
512 : } // namespace Envoy
|