1
#include "source/common/stats/allocator.h"
2

            
3
#include <algorithm>
4
#include <cstdint>
5

            
6
#include "envoy/stats/sink.h"
7
#include "envoy/stats/stats.h"
8

            
9
#include "source/common/common/hash.h"
10
#include "source/common/common/lock_guard.h"
11
#include "source/common/common/logger.h"
12
#include "source/common/common/thread.h"
13
#include "source/common/common/thread_annotations.h"
14
#include "source/common/common/utility.h"
15
#include "source/common/stats/metric_impl.h"
16
#include "source/common/stats/stat_merger.h"
17
#include "source/common/stats/symbol_table.h"
18

            
19
#include "absl/container/flat_hash_set.h"
20
#include "absl/strings/string_view.h"
21

            
22
namespace Envoy {
23
namespace Stats {
24

            
25
const char Allocator::DecrementToZeroSyncPoint[] = "decrement-zero";
26

            
27
1837566
Allocator::~Allocator() {
28
1837566
  ASSERT(counters_.empty());
29
1837566
  ASSERT(gauges_.empty());
30

            
31
#ifndef NDEBUG
32
  // Move deleted stats into the sets for the ASSERTs in removeFromSetLockHeld to function.
33
  for (auto& counter : deleted_counters_) {
34
    auto insertion = counters_.insert(counter.get());
35
    // Assert that there were no duplicates.
36
    ASSERT(insertion.second);
37
  }
38
  for (auto& gauge : deleted_gauges_) {
39
    auto insertion = gauges_.insert(gauge.get());
40
    // Assert that there were no duplicates.
41
    ASSERT(insertion.second);
42
  }
43
  for (auto& text_readout : deleted_text_readouts_) {
44
    auto insertion = text_readouts_.insert(text_readout.get());
45
    // Assert that there were no duplicates.
46
    ASSERT(insertion.second);
47
  }
48
#endif
49
1837566
}
50

            
51
#ifndef ENVOY_CONFIG_COVERAGE
52
void Allocator::debugPrint() {
53
  Thread::LockGuard lock(mutex_);
54
  for (Counter* counter : counters_) {
55
    ENVOY_LOG_MISC(info, "counter: {}", symbolTable().toString(counter->statName()));
56
  }
57
  for (Gauge* gauge : gauges_) {
58
    ENVOY_LOG_MISC(info, "gauge: {}", symbolTable().toString(gauge->statName()));
59
  }
60
}
61
#endif
62

            
63
// Counter, Gauge and TextReadout inherit from RefcountInterface and
64
// Metric. MetricImpl takes care of most of the Metric API, but we need to cover
65
// symbolTable() here, which we don't store directly, but get it via the alloc,
66
// which we need in order to clean up the counter and gauge maps in that class
67
// when they are destroyed.
68
//
69
// We implement the RefcountInterface API to avoid weak counter and destructor overhead in
70
// shared_ptr.
71
template <class BaseClass> class StatsSharedImpl : public MetricImpl<BaseClass> {
72
public:
73
  StatsSharedImpl(StatName name, Allocator& alloc, StatName tag_extracted_name,
74
                  const StatNameTagVector& stat_name_tags)
75
29660928
      : MetricImpl<BaseClass>(name, tag_extracted_name, stat_name_tags, alloc.symbolTable()),
76
29660928
        alloc_(alloc) {}
77

            
78
29659597
  ~StatsSharedImpl() override {
79
    // MetricImpl must be explicitly cleared() before destruction, otherwise it
80
    // will not be able to access the SymbolTable& to free the symbols. An RAII
81
    // alternative would be to store the SymbolTable reference in the
82
    // MetricImpl, costing 8 bytes per stat.
83
29659597
    this->clear(symbolTable());
84
29659597
  }
85

            
86
  // Metric
87
81988944
  SymbolTable& symbolTable() final { return alloc_.symbolTable(); }
88
32592
  bool used() const override { return flags_ & Metric::Flags::Used; }
89
6
  void markUnused() override { flags_ &= ~Metric::Flags::Used; }
90
1370874
  bool hidden() const override { return flags_ & Metric::Flags::Hidden; }
91

            
92
  // RefcountInterface
93
154734646
  void incRefCount() override { ++ref_count_; }
94
154733053
  bool decRefCount() override {
95
    // We must, unfortunately, hold the allocator's lock when decrementing the
96
    // refcount. Otherwise another thread may simultaneously try to allocate the
97
    // same name'd stat after we decrement it, and we'll wind up with a
98
    // dtor/update race. To avoid this we must hold the lock until the stat is
99
    // removed from the map.
100
    //
101
    // It might be worth thinking about a race-free way to decrement ref-counts
102
    // without a lock, for the case where ref_count > 2, and we don't need to
103
    // destruct anything. But it seems preferable at to be conservative here,
104
    // as stats will only go out of scope when a scope is destructed (during
105
    // xDS) or during admin stats operations.
106
154733053
    Thread::LockGuard lock(alloc_.mutex_);
107
154733053
    ASSERT(ref_count_ >= 1);
108
154733053
    if (--ref_count_ == 0) {
109
29659597
      alloc_.sync().syncPoint(Allocator::DecrementToZeroSyncPoint);
110
29659597
      removeFromSetLockHeld();
111
29659597
      return true;
112
29659597
    }
113
125073456
    return false;
114
154733053
  }
115
80
  uint32_t use_count() const override { return ref_count_; }
116

            
117
  /**
118
   * We must atomically remove the counter/gauges from the allocator's sets when
119
   * our ref-count decrement hits zero. The counters and gauges are held in
120
   * distinct sets so we virtualize this removal helper.
121
   */
122
  virtual void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) PURE;
123

            
124
protected:
125
  Allocator& alloc_;
126

            
127
  // ref_count_ can be incremented as an atomic, without taking a new lock, as
128
  // the critical 0->1 transition occurs in makeCounter and makeGauge, which
129
  // already hold the lock. Increment also occurs when copying shared pointers,
130
  // but these are always in transition to ref-count 2 or higher, and thus
131
  // cannot race with a decrement to zero.
132
  //
133
  // However, we must hold alloc_.mutex_ when decrementing ref_count_ so that
134
  // when it hits zero we can atomically remove it from alloc_.counters_ or
135
  // alloc_.gauges_. We leave it atomic to avoid taking the lock on increment.
136
  std::atomic<uint32_t> ref_count_{0};
137

            
138
  std::atomic<uint16_t> flags_{0};
139
};
140

            
141
class CounterImpl : public StatsSharedImpl<Counter> {
142
public:
143
  CounterImpl(StatName name, Allocator& alloc, StatName tag_extracted_name,
144
              const StatNameTagVector& stat_name_tags)
145
23149685
      : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {}
146

            
147
23148468
  void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
148
23148468
    const size_t count = alloc_.counters_.erase(statName());
149
23148468
    ASSERT(count == 1);
150
23148468
    alloc_.sinked_counters_.erase(this);
151
23148468
  }
152

            
153
  // Stats::Counter
154
9760406
  void add(uint64_t amount) override {
155
    // Note that a reader may see a new value but an old pending_increment_ or
156
    // used(). From a system perspective this should be eventually consistent.
157
9760406
    value_ += amount;
158
9760406
    pending_increment_ += amount;
159
9760406
    flags_ |= Flags::Used;
160
9760406
  }
161
6355912
  void inc() override { add(1); }
162
4317991
  uint64_t latch() override { return pending_increment_.exchange(0); }
163
2937
  void reset() override { value_ = 0; }
164
268062
  uint64_t value() const override { return value_; }
165

            
166
private:
167
  std::atomic<uint64_t> value_{0};
168
  std::atomic<uint64_t> pending_increment_{0};
169
};
170

            
171
class GaugeImpl : public StatsSharedImpl<Gauge> {
172
public:
173
  GaugeImpl(StatName name, Allocator& alloc, StatName tag_extracted_name,
174
            const StatNameTagVector& stat_name_tags, ImportMode import_mode)
175
6496415
      : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {
176
6496415
    switch (import_mode) {
177
4476636
    case ImportMode::Accumulate:
178
4476636
      flags_ |= Flags::LogicAccumulate;
179
4476636
      break;
180
2019372
    case ImportMode::NeverImport:
181
2019372
      flags_ |= Flags::NeverImport;
182
2019372
      break;
183
25
    case ImportMode::Uninitialized:
184
      // Note that we don't clear any flag bits for import_mode==Uninitialized,
185
      // as we may have an established import_mode when this stat was created in
186
      // an alternate scope. See
187
      // https://github.com/envoyproxy/envoy/issues/7227.
188
25
      break;
189
382
    case ImportMode::HiddenAccumulate:
190
382
      flags_ |= Flags::Hidden;
191
382
      flags_ |= Flags::LogicAccumulate;
192
382
      break;
193
6496415
    }
194
6496415
  }
195

            
196
6496300
  void removeFromSetLockHeld() override ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) {
197
6496300
    const size_t count = alloc_.gauges_.erase(statName());
198
6496300
    ASSERT(count == 1);
199
6496300
    alloc_.sinked_gauges_.erase(this);
200
6496300
  }
201

            
202
  // Stats::Gauge
203
1134314
  void add(uint64_t amount) override {
204
1134314
    child_value_ += amount;
205
1134314
    flags_ |= Flags::Used;
206
1134314
  }
207
573638
  void dec() override { sub(1); }
208
576299
  void inc() override { add(1); }
209
3813292
  void set(uint64_t value) override {
210
3813292
    child_value_ = value;
211
3813292
    flags_ |= Flags::Used;
212
3813292
  }
213
1334133
  void sub(uint64_t amount) override {
214
1334133
    ASSERT(child_value_ >= amount);
215
1334133
    ASSERT(used() || amount == 0);
216
1334133
    child_value_ -= amount;
217
1334133
  }
218
24754
  uint64_t value() const override { return child_value_ + parent_value_; }
219

            
220
  // TODO(diazalan): Rename importMode and to more generic name
221
6506074
  ImportMode importMode() const override {
222
6506074
    if (flags_ & Flags::NeverImport) {
223
2021414
      return ImportMode::NeverImport;
224
4487628
    } else if ((flags_ & Flags::Hidden) && (flags_ & Flags::LogicAccumulate)) {
225
738
      return ImportMode::HiddenAccumulate;
226
4483933
    } else if (flags_ & Flags::LogicAccumulate) {
227
4483851
      return ImportMode::Accumulate;
228
4483851
    }
229
71
    return ImportMode::Uninitialized;
230
6506074
  }
231

            
232
  // TODO(diazalan): Rename mergeImportMode and to more generic name
233
6506017
  void mergeImportMode(ImportMode import_mode) override {
234
6506017
    ImportMode current = importMode();
235
6506017
    if (current == import_mode) {
236
6505970
      return;
237
6505970
    }
238

            
239
47
    switch (import_mode) {
240
25
    case ImportMode::Uninitialized:
241
      // mergeImportNode(ImportMode::Uninitialized) is called when merging an
242
      // existing stat with importMode() == Accumulate or NeverImport.
243
25
      break;
244
2
    case ImportMode::Accumulate:
245
2
      ASSERT(current == ImportMode::Uninitialized);
246
2
      flags_ |= Flags::LogicAccumulate;
247
2
      break;
248
20
    case ImportMode::NeverImport:
249
20
      ASSERT(current == ImportMode::Uninitialized);
250
      // A previous revision of Envoy may have transferred a gauge that it
251
      // thought was Accumulate. But the new version thinks it's NeverImport, so
252
      // we clear the accumulated value.
253
20
      parent_value_ = 0;
254
20
      flags_ &= ~Flags::Used;
255
20
      flags_ |= Flags::NeverImport;
256
20
      break;
257
    case ImportMode::HiddenAccumulate:
258
      ASSERT(current == ImportMode::Uninitialized);
259
      flags_ |= Flags::Hidden;
260
      flags_ |= Flags::LogicAccumulate;
261
      break;
262
47
    }
263
47
  }
264

            
265
54
  void setParentValue(uint64_t value) override { parent_value_ = value; }
266

            
267
private:
268
  std::atomic<uint64_t> parent_value_{0};
269
  std::atomic<uint64_t> child_value_{0};
270
};
271

            
272
class TextReadoutImpl : public StatsSharedImpl<TextReadout> {
273
public:
274
  TextReadoutImpl(StatName name, Allocator& alloc, StatName tag_extracted_name,
275
                  const StatNameTagVector& stat_name_tags)
276
14829
      : StatsSharedImpl(name, alloc, tag_extracted_name, stat_name_tags) {}
277

            
278
14829
  void removeFromSetLockHeld() ABSL_EXCLUSIVE_LOCKS_REQUIRED(alloc_.mutex_) override {
279
14829
    const size_t count = alloc_.text_readouts_.erase(statName());
280
14829
    ASSERT(count == 1);
281
14829
    alloc_.sinked_text_readouts_.erase(this);
282
14829
  }
283

            
284
  // Stats::TextReadout
285
14497
  void set(absl::string_view value) override {
286
14497
    std::string value_copy(value);
287
14497
    absl::MutexLock lock(mutex_);
288
14497
    value_ = std::move(value_copy);
289
14497
    flags_ |= Flags::Used;
290
14497
  }
291
288
  std::string value() const override {
292
288
    absl::MutexLock lock(mutex_);
293
288
    return value_;
294
288
  }
295

            
296
private:
297
  mutable absl::Mutex mutex_;
298
  std::string value_ ABSL_GUARDED_BY(mutex_);
299
};
300

            
301
CounterSharedPtr Allocator::makeCounter(StatName name, StatName tag_extracted_name,
302
23438783
                                        const StatNameTagVector& stat_name_tags) {
303
23438783
  Thread::LockGuard lock(mutex_);
304
23438783
  ASSERT(gauges_.find(name) == gauges_.end());
305
23438783
  ASSERT(text_readouts_.find(name) == text_readouts_.end());
306
23438783
  auto iter = counters_.find(name);
307
23438783
  if (iter != counters_.end()) {
308
289098
    return {*iter};
309
289098
  }
310
23149685
  auto counter = CounterSharedPtr(makeCounterInternal(name, tag_extracted_name, stat_name_tags));
311
23149685
  counters_.insert(counter.get());
312
  // Add counter to sinked_counters_ if it matches the sink predicate.
313
23149685
  if (sink_predicates_ != nullptr && sink_predicates_->includeCounter(*counter)) {
314
5
    auto val = sinked_counters_.insert(counter.get());
315
5
    ASSERT(val.second);
316
5
  }
317
23149685
  return counter;
318
23438783
}
319

            
320
GaugeSharedPtr Allocator::makeGauge(StatName name, StatName tag_extracted_name,
321
                                    const StatNameTagVector& stat_name_tags,
322
6621944
                                    Gauge::ImportMode import_mode) {
323
6621944
  Thread::LockGuard lock(mutex_);
324
6621944
  ASSERT(counters_.find(name) == counters_.end());
325
6621944
  ASSERT(text_readouts_.find(name) == text_readouts_.end());
326
6621944
  auto iter = gauges_.find(name);
327
6621944
  if (iter != gauges_.end()) {
328
125529
    return {*iter};
329
125529
  }
330
6496415
  auto gauge =
331
6496415
      GaugeSharedPtr(new GaugeImpl(name, *this, tag_extracted_name, stat_name_tags, import_mode));
332
6496415
  gauges_.insert(gauge.get());
333
  // Add gauge to sinked_gauges_ if it matches the sink predicate.
334
6496415
  if (sink_predicates_ != nullptr && sink_predicates_->includeGauge(*gauge)) {
335
6
    auto val = sinked_gauges_.insert(gauge.get());
336
6
    ASSERT(val.second);
337
6
  }
338
6496415
  return gauge;
339
6621944
}
340

            
341
TextReadoutSharedPtr Allocator::makeTextReadout(StatName name, StatName tag_extracted_name,
342
14883
                                                const StatNameTagVector& stat_name_tags) {
343
14883
  Thread::LockGuard lock(mutex_);
344
14883
  ASSERT(counters_.find(name) == counters_.end());
345
14883
  ASSERT(gauges_.find(name) == gauges_.end());
346
14883
  auto iter = text_readouts_.find(name);
347
14883
  if (iter != text_readouts_.end()) {
348
54
    return {*iter};
349
54
  }
350
14829
  auto text_readout =
351
14829
      TextReadoutSharedPtr(new TextReadoutImpl(name, *this, tag_extracted_name, stat_name_tags));
352
14829
  text_readouts_.insert(text_readout.get());
353
  // Add text_readout to sinked_text_readouts_ if it matches the sink predicate.
354
14829
  if (sink_predicates_ != nullptr && sink_predicates_->includeTextReadout(*text_readout)) {
355
7
    auto val = sinked_text_readouts_.insert(text_readout.get());
356
7
    ASSERT(val.second);
357
7
  }
358
14829
  return text_readout;
359
14883
}
360

            
361
2
bool Allocator::isMutexLockedForTest() {
362
2
  bool locked = mutex_.tryLock();
363
2
  if (locked) {
364
1
    mutex_.unlock();
365
1
  }
366
2
  return !locked;
367
2
}
368

            
369
Counter* Allocator::makeCounterInternal(StatName name, StatName tag_extracted_name,
370
23149686
                                        const StatNameTagVector& stat_name_tags) {
371
23149686
  return new CounterImpl(name, *this, tag_extracted_name, stat_name_tags);
372
23149686
}
373

            
374
282971
void Allocator::forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
375
282971
  Thread::LockGuard lock(mutex_);
376
282971
  if (f_size != nullptr) {
377
282967
    f_size(counters_.size());
378
282967
  }
379
91139001
  for (auto& counter : counters_) {
380
91138996
    f_stat(*counter);
381
91138996
  }
382
282971
}
383

            
384
30857
void Allocator::forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
385
30857
  Thread::LockGuard lock(mutex_);
386
30857
  if (f_size != nullptr) {
387
30853
    f_size(gauges_.size());
388
30853
  }
389
3270819
  for (auto& gauge : gauges_) {
390
3270810
    f_stat(*gauge);
391
3270810
  }
392
30857
}
393

            
394
11855
void Allocator::forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
395
11855
  Thread::LockGuard lock(mutex_);
396
11855
  if (f_size != nullptr) {
397
11854
    f_size(text_readouts_.size());
398
11854
  }
399
16101
  for (auto& text_readout : text_readouts_) {
400
16036
    f_stat(*text_readout);
401
16036
  }
402
11855
}
403

            
404
11827
void Allocator::forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
405
11827
  if (sink_predicates_ != nullptr) {
406
3
    Thread::LockGuard lock(mutex_);
407
3
    f_size(sinked_counters_.size());
408
5
    for (auto counter : sinked_counters_) {
409
5
      f_stat(*counter);
410
5
    }
411
11827
  } else {
412
11824
    forEachCounter(f_size, f_stat);
413
11824
  }
414
11827
}
415

            
416
11829
void Allocator::forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
417
11829
  if (sink_predicates_ != nullptr) {
418
4
    Thread::LockGuard lock(mutex_);
419
4
    f_size(sinked_gauges_.size());
420
6
    for (auto gauge : sinked_gauges_) {
421
6
      f_stat(*gauge);
422
6
    }
423
11828
  } else {
424
1162891
    forEachGauge(f_size, [&f_stat](Gauge& gauge) {
425
1162891
      if (!gauge.hidden()) {
426
1162613
        f_stat(gauge);
427
1162613
      }
428
1162891
    });
429
11825
  }
430
11829
}
431

            
432
11827
void Allocator::forEachSinkedTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
433
11827
  if (sink_predicates_ != nullptr) {
434
3
    Thread::LockGuard lock(mutex_);
435
3
    f_size(sinked_text_readouts_.size());
436
7
    for (auto text_readout : sinked_text_readouts_) {
437
7
      f_stat(*text_readout);
438
7
    }
439
11827
  } else {
440
11824
    forEachTextReadout(f_size, f_stat);
441
11824
  }
442
11827
}
443

            
444
7
void Allocator::setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) {
445
7
  Thread::LockGuard lock(mutex_);
446
7
  ASSERT(sink_predicates_ == nullptr);
447
7
  sink_predicates_ = std::move(sink_predicates);
448
7
  sinked_counters_.clear();
449
7
  sinked_gauges_.clear();
450
7
  sinked_text_readouts_.clear();
451
  // Add counters to the set of sinked counters.
452
7
  for (auto& counter : counters_) {
453
    if (sink_predicates_->includeCounter(*counter)) {
454
      sinked_counters_.emplace(counter);
455
    }
456
  }
457
  // Add gauges to the set of sinked gauges.
458
7
  for (auto& gauge : gauges_) {
459
    if (sink_predicates_->includeGauge(*gauge)) {
460
      sinked_gauges_.insert(gauge);
461
    }
462
  }
463
  // Add text_readouts to the set of sinked text readouts.
464
7
  for (auto& text_readout : text_readouts_) {
465
    if (sink_predicates_->includeTextReadout(*text_readout)) {
466
      sinked_text_readouts_.insert(text_readout);
467
    }
468
  }
469
7
}
470

            
471
14
void Allocator::markCounterForDeletion(const CounterSharedPtr& counter) {
472
14
  Thread::LockGuard lock(mutex_);
473
14
  auto iter = counters_.find(counter->statName());
474
14
  if (iter == counters_.end()) {
475
    // This has already been marked for deletion.
476
    return;
477
  }
478
14
  ASSERT(counter.get() == *iter);
479
  // Duplicates are ASSERTed in ~Allocator. These might occur if there was
480
  // a race bug in reference counting, causing a stat to be double-deleted.
481
14
  deleted_counters_.emplace_back(*iter);
482
14
  counters_.erase(iter);
483
14
  sinked_counters_.erase(counter.get());
484
14
}
485

            
486
6
void Allocator::markGaugeForDeletion(const GaugeSharedPtr& gauge) {
487
6
  Thread::LockGuard lock(mutex_);
488
6
  auto iter = gauges_.find(gauge->statName());
489
6
  if (iter == gauges_.end()) {
490
    // This has already been marked for deletion.
491
    return;
492
  }
493
6
  ASSERT(gauge.get() == *iter);
494
  // Duplicates are ASSERTed in ~Allocator.
495
6
  deleted_gauges_.emplace_back(*iter);
496
6
  gauges_.erase(iter);
497
6
  sinked_gauges_.erase(gauge.get());
498
6
}
499

            
500
4
void Allocator::markTextReadoutForDeletion(const TextReadoutSharedPtr& text_readout) {
501
4
  Thread::LockGuard lock(mutex_);
502
4
  auto iter = text_readouts_.find(text_readout->statName());
503
4
  if (iter == text_readouts_.end()) {
504
    // This has already been marked for deletion.
505
    return;
506
  }
507
4
  ASSERT(text_readout.get() == *iter);
508
  // Duplicates are ASSERTed in ~Allocator.
509
4
  deleted_text_readouts_.emplace_back(*iter);
510
4
  text_readouts_.erase(iter);
511
4
  sinked_text_readouts_.erase(text_readout.get());
512
4
}
513

            
514
} // namespace Stats
515
} // namespace Envoy