1
#include "source/common/stats/allocator.h"
2

            
3
#include <algorithm>
4
#include <cstdint>
5

            
6
#include "envoy/stats/sink.h"
7
#include "envoy/stats/stats.h"
8

            
9
#include "source/common/common/hash.h"
10
#include "source/common/common/lock_guard.h"
11
#include "source/common/common/logger.h"
12
#include "source/common/common/thread.h"
13
#include "source/common/common/thread_annotations.h"
14
#include "source/common/common/utility.h"
15
#include "source/common/stats/metric_impl.h"
16
#include "source/common/stats/stat_merger.h"
17
#include "source/common/stats/symbol_table.h"
18

            
19
#include "absl/container/flat_hash_set.h"
20
#include "absl/strings/string_view.h"
21

            
22
namespace Envoy {
23
namespace Stats {
24

            
25
const char Allocator::DecrementToZeroSyncPoint[] = "decrement-zero";
26

            
27
1836793
Allocator::~Allocator() {
28
1836793
  ASSERT(counters_.empty());
29
1836793
  ASSERT(gauges_.empty());
30

            
31
#ifndef NDEBUG
32
  // Move deleted stats into the sets for the ASSERTs in removeFromSetLockHeld to function.
33
  for (auto& counter : deleted_counters_) {
34
    auto insertion = counters_.insert(counter.get());
35
    // Assert that there were no duplicates.
36
    ASSERT(insertion.second);
37
  }
38
  for (auto& gauge : deleted_gauges_) {
39
    auto insertion = gauges_.insert(gauge.get());
40
    // Assert that there were no duplicates.
41
    ASSERT(insertion.second);
42
  }
43
  for (auto& text_readout : deleted_text_readouts_) {
44
    auto insertion = text_readouts_.insert(text_readout.get());
45
    // Assert that there were no duplicates.
46
    ASSERT(insertion.second);
47
  }
48
#endif
49
1836793
}
50

            
51
#ifndef ENVOY_CONFIG_COVERAGE
52
void Allocator::debugPrint() {
53
  Thread::LockGuard lock(mutex_);
54
  for (Counter* counter : counters_) {
55
    ENVOY_LOG_MISC(info, "counter: {}", symbolTable().toString(counter->statName()));
56
  }
57
  for (Gauge* gauge : gauges_) {
58
    ENVOY_LOG_MISC(info, "gauge: {}", symbolTable().toString(gauge->statName()));
59
  }
60
}
61
#endif
62

            
63
// Counter, Gauge and TextReadout inherit from RefcountInterface and
64
// Metric. MetricImpl takes care of most of the Metric API, but we need to cover
65
// symbolTable() here, which we don't store directly, but get it via the alloc,
66
// which we need in order to clean up the counter and gauge maps in that class
67
// when they are destroyed.
68
//
69
// We implement the RefcountInterface API to avoid weak counter and destructor overhead in
70
// shared_ptr.
71
template <class BaseClass> class StatsSharedImpl : public MetricImpl<BaseClass> {
72
public:
73
  StatsSharedImpl(StatName name, Allocator& alloc, StatName tag_extracted_name,
74
                  const StatNameTagVector& stat_name_tags)
75
29652582
      : MetricImpl<BaseClass>(name, tag_extracted_name, stat_name_tags, alloc.symbolTable()),
76
29652582
        alloc_(alloc) {}
77

            
78
29651250
  ~StatsSharedImpl() override {
79
    // MetricImpl must be explicitly cleared() before destruction, otherwise it
80
    // will not be able to access the SymbolTable& to free the symbols. An RAII
81
    // alternative would be to store the SymbolTable reference in the
82
    // MetricImpl, costing 8 bytes per stat.
83
29651250
    this->clear(symbolTable());
84
29651250
  }
85

            
86
  // Metric
87
81255333
  SymbolTable& symbolTable() final { return alloc_.symbolTable(); }
88
33864
  bool used() const override { return flags_ & Metric::Flags::Used; }
89
6
  void markUnused() override { flags_ &= ~Metric::Flags::Used; }
90
1195207
  bool hidden() const override { return flags_ & Metric::Flags::Hidden; }
91

            
92
  // RefcountInterface
93
156483891
  void incRefCount() override { ++ref_count_; }
94
156482334
  bool decRefCount() override {
95
    // We must, unfortunately, hold the allocator's lock when decrementing the
96
    // refcount. Otherwise another thread may simultaneously try to allocate the
97
    // same name'd stat after we decrement it, and we'll wind up with a
98
    // dtor/update race. To avoid this we must hold the lock until the stat is
99
    // removed from the map.
100
    //
101
    // It might be worth thinking about a race-free way to decrement ref-counts
102
    // without a lock, for the case where ref_count > 2, and we don't need to
103
    // destruct anything. But it seems preferable at to be conservative here,
104
    // as stats will only go out of scope when a scope is destructed (during
105
    // xDS) or during admin stats operations.
106
156482334
    Thread::LockGuard lock(alloc_.mutex_);
107
156482334
    ASSERT(ref_count_ >= 1);
108
156482334
    if (--ref_count_ == 0) {
109
29651249
      alloc_.sync().syncPoint(Allocator::DecrementToZeroSyncPoint);
110
29651249
      removeFromSetLockHeld();
111
29651249
      return true;
112
29651249
    }
113
126831085
    return false;
114
156482334
  }
115
68
  uint32_t use_count() const override { return ref_count_; }
116

            
117
  /**
118
   * We must atomically remove the counter/gauges from the allocator's sets when
119
   * our ref-count decrement hits zero. The counters and gauges are held in
120
   * distinct sets so we virtualize this removal helper.
121
   */
122
  virtual void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) PURE;
123

            
124
protected:
125
  Allocator& alloc_;
126

            
127
  // ref_count_ can be incremented as an atomic, without taking a new lock, as
128
  // the critical 0->1 transition occurs in makeCounter and makeGauge, which
129
  // already hold the lock. Increment also occurs when copying shared pointers,
130
  // but these are always in transition to ref-count 2 or higher, and thus
131
  // cannot race with a decrement to zero.
132
  //
133
  // However, we must hold alloc_.mutex_ when decrementing ref_count_ so that
134
  // when it hits zero we can atomically remove it from alloc_.counters_ or
135
  // alloc_.gauges_. We leave it atomic to avoid taking the lock on increment.
136
  std::atomic<uint32_t> ref_count_{0};
137

            
138
  std::atomic<uint16_t> flags_{0};
139
};
140

            
141
class CounterImpl : public StatsSharedImpl<Counter> {
142
public:
143
  CounterImpl(StatName name, Allocator& alloc, StatName tag_extracted_name,
144
              const StatNameTagVector& stat_name_tags)
145
23141514
      : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {}
146

            
147
23140296
  void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
148
23140296
    const size_t count = alloc_.counters_.erase(statName());
149
23140296
    ASSERT(count == 1);
150
23140296
    alloc_.sinked_counters_.erase(this);
151
23140296
  }
152

            
153
  // Stats::Counter
154
7864780
  void add(uint64_t amount) override {
155
    // Note that a reader may see a new value but an old pending_increment_ or
156
    // used(). From a system perspective this should be eventually consistent.
157
7864780
    value_ += amount;
158
7864780
    pending_increment_ += amount;
159
7864780
    flags_ |= Flags::Used;
160
7864780
  }
161
4466891
  void inc() override { add(1); }
162
4320215
  uint64_t latch() override { return pending_increment_.exchange(0); }
163
2937
  void reset() override { value_ = 0; }
164
278731
  uint64_t value() const override { return value_; }
165

            
166
private:
167
  std::atomic<uint64_t> value_{0};
168
  std::atomic<uint64_t> pending_increment_{0};
169
};
170

            
171
class GaugeImpl : public StatsSharedImpl<Gauge> {
172
public:
173
  GaugeImpl(StatName name, Allocator& alloc, StatName tag_extracted_name,
174
            const StatNameTagVector& stat_name_tags, ImportMode import_mode)
175
6496190
      : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {
176
6496190
    switch (import_mode) {
177
4475309
    case ImportMode::Accumulate:
178
4475309
      flags_ |= Flags::LogicAccumulate;
179
4475309
      break;
180
2020474
    case ImportMode::NeverImport:
181
2020474
      flags_ |= Flags::NeverImport;
182
2020474
      break;
183
25
    case ImportMode::Uninitialized:
184
      // Note that we don't clear any flag bits for import_mode==Uninitialized,
185
      // as we may have an established import_mode when this stat was created in
186
      // an alternate scope. See
187
      // https://github.com/envoyproxy/envoy/issues/7227.
188
25
      break;
189
382
    case ImportMode::HiddenAccumulate:
190
382
      flags_ |= Flags::Hidden;
191
382
      flags_ |= Flags::LogicAccumulate;
192
382
      break;
193
6496190
    }
194
6496190
  }
195

            
196
6496075
  void removeFromSetLockHeld() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) {
197
6496075
    const size_t count = alloc_.gauges_.erase(statName());
198
6496075
    ASSERT(count == 1);
199
6496075
    alloc_.sinked_gauges_.erase(this);
200
6496075
  }
201

            
202
  // Stats::Gauge
203
1135326
  void add(uint64_t amount) override {
204
1135326
    child_value_ += amount;
205
1135326
    flags_ |= Flags::Used;
206
1135326
  }
207
574623
  void dec() override { sub(1); }
208
577280
  void inc() override { add(1); }
209
3817266
  void set(uint64_t value) override {
210
3817266
    child_value_ = value;
211
3817266
    flags_ |= Flags::Used;
212
3817266
  }
213
1335649
  void sub(uint64_t amount) override {
214
1335649
    ASSERT(child_value_ >= amount);
215
1335649
    ASSERT(used() || amount == 0);
216
1335649
    child_value_ -= amount;
217
1335649
  }
218
25521
  uint64_t value() const override { return child_value_ + parent_value_; }
219

            
220
  // TODO(diazalan): Rename importMode and to more generic name
221
6503396
  ImportMode importMode() const override {
222
6503396
    if (flags_ & Flags::NeverImport) {
223
2020365
      return ImportMode::NeverImport;
224
4485999
    } else if ((flags_ & Flags::Hidden) && (flags_ & Flags::LogicAccumulate)) {
225
738
      return ImportMode::HiddenAccumulate;
226
4482304
    } else if (flags_ & Flags::LogicAccumulate) {
227
4482222
      return ImportMode::Accumulate;
228
4482222
    }
229
71
    return ImportMode::Uninitialized;
230
6503396
  }
231

            
232
  // TODO(diazalan): Rename mergeImportMode and to more generic name
233
6503339
  void mergeImportMode(ImportMode import_mode) override {
234
6503339
    ImportMode current = importMode();
235
6503339
    if (current == import_mode) {
236
6503292
      return;
237
6503292
    }
238

            
239
47
    switch (import_mode) {
240
25
    case ImportMode::Uninitialized:
241
      // mergeImportNode(ImportMode::Uninitialized) is called when merging an
242
      // existing stat with importMode() == Accumulate or NeverImport.
243
25
      break;
244
2
    case ImportMode::Accumulate:
245
2
      ASSERT(current == ImportMode::Uninitialized);
246
2
      flags_ |= Flags::LogicAccumulate;
247
2
      break;
248
20
    case ImportMode::NeverImport:
249
20
      ASSERT(current == ImportMode::Uninitialized);
250
      // A previous revision of Envoy may have transferred a gauge that it
251
      // thought was Accumulate. But the new version thinks it's NeverImport, so
252
      // we clear the accumulated value.
253
20
      parent_value_ = 0;
254
20
      flags_ &= ~Flags::Used;
255
20
      flags_ |= Flags::NeverImport;
256
20
      break;
257
    case ImportMode::HiddenAccumulate:
258
      ASSERT(current == ImportMode::Uninitialized);
259
      flags_ |= Flags::Hidden;
260
      flags_ |= Flags::LogicAccumulate;
261
      break;
262
47
    }
263
47
  }
264

            
265
54
  void setParentValue(uint64_t value) override { parent_value_ = value; }
266

            
267
private:
268
  std::atomic<uint64_t> parent_value_{0};
269
  std::atomic<uint64_t> child_value_{0};
270
};
271

            
272
class TextReadoutImpl : public StatsSharedImpl<TextReadout> {
273
public:
274
  TextReadoutImpl(StatName name, Allocator& alloc, StatName tag_extracted_name,
275
                  const StatNameTagVector& stat_name_tags)
276
14878
      : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {}
277

            
278
14878
  void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
279
14878
    const size_t count = alloc_.text_readouts_.erase(statName());
280
14878
    ASSERT(count == 1);
281
14878
    alloc_.sinked_text_readouts_.erase(this);
282
14878
  }
283

            
284
  // Stats::TextReadout
285
14533
  void set(absl::string_view value) override {
286
14533
    std::string value_copy(value);
287
14533
    absl::MutexLock lock(mutex_);
288
14533
    value_ = std::move(value_copy);
289
14533
    flags_ |= Flags::Used;
290
14533
  }
291
288
  std::string value() const override {
292
288
    absl::MutexLock lock(mutex_);
293
288
    return value_;
294
288
  }
295

            
296
private:
297
  mutable absl::Mutex mutex_;
298
  std::string value_ ABSL_GUARDED_BY(mutex_);
299
};
300

            
301
CounterSharedPtr Allocator::makeCounter(StatName name, StatName tag_extracted_name,
302
23404998
                                        const StatNameTagVector& stat_name_tags) {
303
23404998
  Thread::LockGuard lock(mutex_);
304
23404998
  ASSERT(gauges_.find(name) == gauges_.end());
305
23404998
  ASSERT(text_readouts_.find(name) == text_readouts_.end());
306
23404998
  auto iter = counters_.find(name);
307
23404998
  if (iter != counters_.end()) {
308
263485
    return {*iter};
309
263485
  }
310
23141513
  auto counter = CounterSharedPtr(makeCounterInternal(name, tag_extracted_name, stat_name_tags));
311
23141513
  counters_.insert(counter.get());
312
  // Add counter to sinked_counters_ if it matches the sink predicate.
313
23141513
  if (sink_predicates_ != nullptr && sink_predicates_->includeCounter(*counter)) {
314
5
    auto val = sinked_counters_.insert(counter.get());
315
5
    ASSERT(val.second);
316
5
  }
317
23141513
  return counter;
318
23404998
}
319

            
320
GaugeSharedPtr Allocator::makeGauge(StatName name, StatName tag_extracted_name,
321
                                    const StatNameTagVector& stat_name_tags,
322
6619298
                                    Gauge::ImportMode import_mode) {
323
6619298
  Thread::LockGuard lock(mutex_);
324
6619298
  ASSERT(counters_.find(name) == counters_.end());
325
6619298
  ASSERT(text_readouts_.find(name) == text_readouts_.end());
326
6619298
  auto iter = gauges_.find(name);
327
6619298
  if (iter != gauges_.end()) {
328
123108
    return {*iter};
329
123108
  }
330
6496190
  auto gauge =
331
6496190
      GaugeSharedPtr(new GaugeImpl(name, *this, tag_extracted_name, stat_name_tags, import_mode));
332
6496190
  gauges_.insert(gauge.get());
333
  // Add gauge to sinked_gauges_ if it matches the sink predicate.
334
6496190
  if (sink_predicates_ != nullptr && sink_predicates_->includeGauge(*gauge)) {
335
6
    auto val = sinked_gauges_.insert(gauge.get());
336
6
    ASSERT(val.second);
337
6
  }
338
6496190
  return gauge;
339
6619298
}
340

            
341
TextReadoutSharedPtr Allocator::makeTextReadout(StatName name, StatName tag_extracted_name,
342
14932
                                                const StatNameTagVector& stat_name_tags) {
343
14932
  Thread::LockGuard lock(mutex_);
344
14932
  ASSERT(counters_.find(name) == counters_.end());
345
14932
  ASSERT(gauges_.find(name) == gauges_.end());
346
14932
  auto iter = text_readouts_.find(name);
347
14932
  if (iter != text_readouts_.end()) {
348
54
    return {*iter};
349
54
  }
350
14878
  auto text_readout =
351
14878
      TextReadoutSharedPtr(new TextReadoutImpl(name, *this, tag_extracted_name, stat_name_tags));
352
14878
  text_readouts_.insert(text_readout.get());
353
  // Add text_readout to sinked_text_readouts_ if it matches the sink predicate.
354
14878
  if (sink_predicates_ != nullptr && sink_predicates_->includeTextReadout(*text_readout)) {
355
7
    auto val = sinked_text_readouts_.insert(text_readout.get());
356
7
    ASSERT(val.second);
357
7
  }
358
14878
  return text_readout;
359
14932
}
360

            
361
2
bool Allocator::isMutexLockedForTest() {
362
2
  bool locked = mutex_.tryLock();
363
2
  if (locked) {
364
1
    mutex_.unlock();
365
1
  }
366
2
  return !locked;
367
2
}
368

            
369
Counter* Allocator::makeCounterInternal(StatName name, StatName tag_extracted_name,
370
23141514
                                        const StatNameTagVector& stat_name_tags) {
371
23141514
  return new CounterImpl(name, *this, tag_extracted_name, stat_name_tags);
372
23141514
}
373

            
374
294197
void Allocator::forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
375
294197
  Thread::LockGuard lock(mutex_);
376
294197
  if (f_size != nullptr) {
377
294193
    f_size(counters_.size());
378
294193
  }
379
92821835
  for (auto& counter : counters_) {
380
92821830
    f_stat(*counter);
381
92821830
  }
382
294197
}
383

            
384
32190
void Allocator::forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
385
32190
  Thread::LockGuard lock(mutex_);
386
32190
  if (f_size != nullptr) {
387
32186
    f_size(gauges_.size());
388
32186
  }
389
3397830
  for (auto& gauge : gauges_) {
390
3397821
    f_stat(*gauge);
391
3397821
  }
392
32190
}
393

            
394
11859
void Allocator::forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
395
11859
  Thread::LockGuard lock(mutex_);
396
11859
  if (f_size != nullptr) {
397
11858
    f_size(text_readouts_.size());
398
11858
  }
399
16197
  for (auto& text_readout : text_readouts_) {
400
16132
    f_stat(*text_readout);
401
16132
  }
402
11859
}
403

            
404
11831
void Allocator::forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
405
11831
  if (sink_predicates_ != nullptr) {
406
3
    Thread::LockGuard lock(mutex_);
407
3
    f_size(sinked_counters_.size());
408
5
    for (auto counter : sinked_counters_) {
409
5
      f_stat(*counter);
410
5
    }
411
11831
  } else {
412
11828
    forEachCounter(f_size, f_stat);
413
11828
  }
414
11831
}
415

            
416
11833
void Allocator::forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
417
11833
  if (sink_predicates_ != nullptr) {
418
4
    Thread::LockGuard lock(mutex_);
419
4
    f_size(sinked_gauges_.size());
420
6
    for (auto gauge : sinked_gauges_) {
421
6
      f_stat(*gauge);
422
6
    }
423
11832
  } else {
424
1163534
    forEachGauge(f_size, [&f_stat](Gauge& gauge) {
425
1163534
      if (!gauge.hidden()) {
426
1163256
        f_stat(gauge);
427
1163256
      }
428
1163534
    });
429
11829
  }
430
11833
}
431

            
432
11831
void Allocator::forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
433
11831
  if (sink_predicates_ != nullptr) {
434
3
    Thread::LockGuard lock(mutex_);
435
3
    f_size(sinked_text_readouts_.size());
436
7
    for (auto text_readout : sinked_text_readouts_) {
437
7
      f_stat(*text_readout);
438
7
    }
439
11831
  } else {
440
11828
    forEachTextReadout(f_size, f_stat);
441
11828
  }
442
11831
}
443

            
444
7
void Allocator::setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) {
445
7
  Thread::LockGuard lock(mutex_);
446
7
  ASSERT(sink_predicates_ == nullptr);
447
7
  sink_predicates_ = std::move(sink_predicates);
448
7
  sinked_counters_.clear();
449
7
  sinked_gauges_.clear();
450
7
  sinked_text_readouts_.clear();
451
  // Add counters to the set of sinked counters.
452
7
  for (auto& counter : counters_) {
453
    if (sink_predicates_->includeCounter(*counter)) {
454
      sinked_counters_.emplace(counter);
455
    }
456
  }
457
  // Add gauges to the set of sinked gauges.
458
7
  for (auto& gauge : gauges_) {
459
    if (sink_predicates_->includeGauge(*gauge)) {
460
      sinked_gauges_.insert(gauge);
461
    }
462
  }
463
  // Add text_readouts to the set of sinked text readouts.
464
7
  for (auto& text_readout : text_readouts_) {
465
    if (sink_predicates_->includeTextReadout(*text_readout)) {
466
      sinked_text_readouts_.insert(text_readout);
467
    }
468
  }
469
7
}
470

            
471
14
void Allocator::markCounterForDeletion(const CounterSharedPtr& counter) {
472
14
  Thread::LockGuard lock(mutex_);
473
14
  auto iter = counters_.find(counter->statName());
474
14
  if (iter == counters_.end()) {
475
    // This has already been marked for deletion.
476
    return;
477
  }
478
14
  ASSERT(counter.get() == *iter);
479
  // Duplicates are ASSERTed in ~Allocator. These might occur if there was
480
  // a race bug in reference counting, causing a stat to be double-deleted.
481
14
  deleted_counters_.emplace_back(*iter);
482
14
  counters_.erase(iter);
483
14
  sinked_counters_.erase(counter.get());
484
14
}
485

            
486
6
void Allocator::markGaugeForDeletion(const GaugeSharedPtr& gauge) {
487
6
  Thread::LockGuard lock(mutex_);
488
6
  auto iter = gauges_.find(gauge->statName());
489
6
  if (iter == gauges_.end()) {
490
    // This has already been marked for deletion.
491
    return;
492
  }
493
6
  ASSERT(gauge.get() == *iter);
494
  // Duplicates are ASSERTed in ~Allocator.
495
6
  deleted_gauges_.emplace_back(*iter);
496
6
  gauges_.erase(iter);
497
6
  sinked_gauges_.erase(gauge.get());
498
6
}
499

            
500
4
void Allocator::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) {
501
4
  Thread::LockGuard lock(mutex_);
502
4
  auto iter = text_readouts_.find(text_readout->statName());
503
4
  if (iter == text_readouts_.end()) {
504
    // This has already been marked for deletion.
505
    return;
506
  }
507
4
  ASSERT(text_readout.get() == *iter);
508
  // Duplicates are ASSERTed in ~Allocator.
509
4
  deleted_text_readouts_.emplace_back(*iter);
510
4
  text_readouts_.erase(iter);
511
4
  sinked_text_readouts_.erase(text_readout.get());
512
4
}
513

            
514
} // namespace Stats
515
} // namespace Envoy