1
#include "source/common/stats/thread_local_store.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <list>
6
#include <memory>
7
#include <string>
8

            
9
#include "envoy/stats/histogram.h"
10
#include "envoy/stats/sink.h"
11
#include "envoy/stats/stats.h"
12

            
13
#include "source/common/common/lock_guard.h"
14
#include "source/common/runtime/runtime_features.h"
15
#include "source/common/stats/allocator.h"
16
#include "source/common/stats/histogram_impl.h"
17
#include "source/common/stats/stats_matcher_impl.h"
18
#include "source/common/stats/tag_producer_impl.h"
19
#include "source/common/stats/tag_utility.h"
20

            
21
#include "absl/strings/str_join.h"
22

            
23
namespace Envoy {
24
namespace Stats {
25

            
26
const char ThreadLocalStoreImpl::DeleteScopeSync[] = "delete-scope";
27
const char ThreadLocalStoreImpl::IterateScopeSync[] = "iterate-scope";
28
const char ThreadLocalStoreImpl::MainDispatcherCleanupSync[] = "main-dispatcher-cleanup";
29

            
30
ThreadLocalStoreImpl::ThreadLocalStoreImpl(Allocator& alloc)
31
10800
    : alloc_(alloc), tag_producer_(std::make_unique<TagProducerImpl>()),
32
10800
      stats_matcher_(std::make_unique<StatsMatcherImpl>()),
33
10800
      histogram_settings_(std::make_unique<HistogramSettingsImpl>()),
34
10800
      null_counter_(alloc.symbolTable()), null_gauge_(alloc.symbolTable()),
35
10800
      null_histogram_(alloc.symbolTable()), null_text_readout_(alloc.symbolTable()),
36
10800
      well_known_tags_(alloc.symbolTable().makeSet("well_known_tags")) {
37
226800
  for (const auto& desc : Config::TagNames::get().descriptorVec()) {
38
226800
    well_known_tags_->rememberBuiltin(desc.name_);
39
226800
  }
40
10800
  StatNameManagedStorage empty("", alloc.symbolTable());
41
10800
  auto new_scope = std::make_shared<ScopeImpl>(*this, StatName(empty.statName()), false);
42
10800
  addScope(new_scope);
43
10800
  default_scope_ = new_scope;
44
10800
}
45

            
46
10800
ThreadLocalStoreImpl::~ThreadLocalStoreImpl() {
47
10800
  ASSERT(shutting_down_ || !threading_ever_initialized_);
48
10800
  default_scope_.reset();
49
10800
  ASSERT(scopes_.empty());
50
10800
  ASSERT(scopes_to_cleanup_.empty());
51
10800
  ASSERT(central_cache_entries_to_cleanup_.empty());
52
10800
  ASSERT(histograms_to_cleanup_.empty());
53
10800
}
54

            
55
10607
void ThreadLocalStoreImpl::setHistogramSettings(HistogramSettingsConstPtr&& histogram_settings) {
56
10607
  iterateScopes([this](const ScopeImplSharedPtr& scope)
57
21211
                    ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) -> bool {
58
21211
                      assertLocked(*scope);
59
21211
                      ASSERT(scope->centralCacheLockHeld()->histograms_.empty());
60
21211
                      return true;
61
21211
                    });
62
10607
  histogram_settings_ = std::move(histogram_settings);
63
10607
}
64

            
65
10644
void ThreadLocalStoreImpl::setStatsMatcher(StatsMatcherPtr&& stats_matcher) {
66
10644
  stats_matcher_ = std::move(stats_matcher);
67
10644
  if (stats_matcher_->acceptsAll()) {
68
10604
    return;
69
10604
  }
70

            
71
  // The Filesystem and potentially other stat-registering objects are
72
  // constructed prior to the stat-matcher, and those add stats
73
  // in the default_scope. There should be no requests, so there will
74
  // be no copies in TLS caches.
75
40
  Thread::LockGuard lock(lock_);
76
40
  const uint32_t first_histogram_index = deleted_histograms_.size();
77
40
  iterateScopesLockHeld([this](const ScopeImplSharedPtr& scope) ABSL_EXCLUSIVE_LOCKS_REQUIRED(
78
65
                            lock_) -> bool {
79
65
    assertLocked(*scope);
80
    // Scopes with their own matcher are unaffected by global matcher changes.
81
65
    if (scope->scope_matcher_ != nullptr) {
82
1
      return true;
83
1
    }
84
64
    const CentralCacheEntrySharedPtr& central_cache = scope->centralCacheLockHeld();
85
64
    removeRejectedStats<CounterSharedPtr>(central_cache->counters_,
86
67
                                          [this](const CounterSharedPtr& counter) mutable {
87
12
                                            alloc_.markCounterForDeletion(counter);
88
12
                                          });
89
64
    removeRejectedStats<GaugeSharedPtr>(
90
64
        central_cache->gauges_,
91
64
        [this](const GaugeSharedPtr& gauge) mutable { alloc_.markGaugeForDeletion(gauge); });
92
64
    removeRejectedStats(central_cache->histograms_, deleted_histograms_);
93
64
    removeRejectedStats<TextReadoutSharedPtr>(
94
64
        central_cache->text_readouts_, [this](const TextReadoutSharedPtr& text_readout) mutable {
95
2
          alloc_.markTextReadoutForDeletion(text_readout);
96
2
        });
97
64
    return true;
98
65
  });
99

            
100
  // Remove any newly rejected histograms from histogram_set_.
101
40
  {
102
40
    Thread::LockGuard hist_lock(hist_mutex_);
103
63
    for (uint32_t i = first_histogram_index; i < deleted_histograms_.size(); ++i) {
104
23
      uint32_t erased = histogram_set_.erase(deleted_histograms_[i].get());
105
23
      ASSERT(erased == 1);
106
23
      sinked_histograms_.erase(deleted_histograms_[i].get());
107
23
    }
108
40
  }
109
40
}
110

            
111
template <class StatMapClass, class StatListClass>
112
64
void ThreadLocalStoreImpl::removeRejectedStats(StatMapClass& map, StatListClass& list) {
113
64
  StatNameVec remove_list;
114
64
  for (auto& stat : map) {
115
24
    if (rejects(stat.first)) {
116
23
      remove_list.push_back(stat.first);
117
23
    }
118
24
  }
119
64
  for (StatName stat_name : remove_list) {
120
23
    auto iter = map.find(stat_name);
121
23
    ASSERT(iter != map.end());
122
23
    list.push_back(iter->second); // Save SharedPtr to the list to avoid invalidating refs to stat.
123
23
    map.erase(iter);
124
23
  }
125
64
}
126

            
127
template <class StatSharedPtr>
128
void ThreadLocalStoreImpl::removeRejectedStats(
129
192
    StatNameHashMap<StatSharedPtr>& map, std::function<void(const StatSharedPtr&)> f_deletion) {
130
192
  StatNameVec remove_list;
131
192
  for (auto& stat : map) {
132
57
    if (rejects(stat.first)) {
133
18
      remove_list.push_back(stat.first);
134
18
    }
135
57
  }
136
192
  for (StatName stat_name : remove_list) {
137
18
    auto iter = map.find(stat_name);
138
18
    ASSERT(iter != map.end());
139
18
    f_deletion(iter->second);
140
18
    map.erase(iter);
141
18
  }
142
192
}
143

            
144
StatsMatcher::FastResult ThreadLocalStoreImpl::fastRejects(StatName stat_name) const {
145
  return stats_matcher_->fastRejects(stat_name);
146
}
147

            
148
bool ThreadLocalStoreImpl::slowRejects(StatsMatcher::FastResult fast_reject_result,
149
                                       StatName stat_name) const {
150
  return stats_matcher_->slowRejects(fast_reject_result, stat_name);
151
}
152

            
153
271137
std::vector<CounterSharedPtr> ThreadLocalStoreImpl::counters() const {
154
  // Handle de-dup due to overlapping scopes.
155
271137
  std::vector<CounterSharedPtr> ret;
156
271137
  forEachCounter([&ret](std::size_t size) { ret.reserve(size); },
157
86827163
                 [&ret](Counter& counter) { ret.emplace_back(CounterSharedPtr(&counter)); });
158
271137
  return ret;
159
271137
}
160

            
161
ScopeSharedPtr ThreadLocalStoreImpl::ScopeImpl::createScope(const std::string& name, bool evictable,
162
                                                            const ScopeStatsLimitSettings& limits,
163
232516
                                                            StatsMatcherSharedPtr matcher) {
164
232516
  StatNameManagedStorage stat_name_storage(Utility::sanitizeStatsName(name), symbolTable());
165
232516
  return scopeFromStatName(stat_name_storage.statName(), evictable, limits, std::move(matcher));
166
232516
}
167

            
168
ScopeSharedPtr
169
ThreadLocalStoreImpl::ScopeImpl::scopeFromStatName(StatName name, bool evictable,
170
                                                   const ScopeStatsLimitSettings& limits,
171
243486
                                                   StatsMatcherSharedPtr matcher) {
172
243486
  SymbolTable::StoragePtr joined = symbolTable().join({prefix_.statName(), name});
173
  // Use explicit matcher if provided; otherwise inherit scope_matcher_ (which may be null,
174
  // meaning the store-level matcher is used).
175
243486
  StatsMatcherSharedPtr child_matcher = matcher ? std::move(matcher) : scope_matcher_;
176
243486
  auto new_scope = std::make_shared<ScopeImpl>(parent_, StatName(joined.get()), evictable, limits,
177
243486
                                               std::move(child_matcher));
178
243486
  parent_.addScope(new_scope);
179
243486
  return new_scope;
180
243486
}
181

            
182
254286
void ThreadLocalStoreImpl::addScope(std::shared_ptr<ScopeImpl>& new_scope) {
183
254286
  Thread::LockGuard lock(lock_);
184
254286
  scopes_[new_scope.get()] = std::weak_ptr<ScopeImpl>(new_scope);
185
254286
}
186

            
187
19022
std::vector<GaugeSharedPtr> ThreadLocalStoreImpl::gauges() const {
188
  // Handle de-dup due to overlapping scopes.
189
19022
  std::vector<GaugeSharedPtr> ret;
190
19022
  forEachGauge([&ret](std::size_t size) { ret.reserve(size); },
191
2107573
               [&ret](Gauge& gauge) { ret.emplace_back(GaugeSharedPtr(&gauge)); });
192
19022
  return ret;
193
19022
}
194

            
195
25
std::vector<TextReadoutSharedPtr> ThreadLocalStoreImpl::textReadouts() const {
196
  // Handle de-dup due to overlapping scopes.
197
25
  std::vector<TextReadoutSharedPtr> ret;
198
25
  forEachTextReadout(
199
25
      [&ret](std::size_t size) { ret.reserve(size); },
200
25
      [&ret](TextReadout& text_readout) { ret.emplace_back(TextReadoutSharedPtr(&text_readout)); });
201
25
  return ret;
202
25
}
203

            
204
31697
std::vector<ParentHistogramSharedPtr> ThreadLocalStoreImpl::histograms() const {
205
31697
  std::vector<ParentHistogramSharedPtr> ret;
206
31697
  forEachHistogram([&ret](std::size_t size) mutable { ret.reserve(size); },
207
596251
                   [&ret](ParentHistogram& histogram) mutable {
208
596246
                     ret.emplace_back(ParentHistogramSharedPtr(&histogram));
209
596246
                   });
210
31697
  return ret;
211
31697
}
212

            
213
void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_dispatcher,
214
10740
                                               ThreadLocal::Instance& tls) {
215
10740
  threading_ever_initialized_ = true;
216
10740
  main_thread_dispatcher_ = &main_thread_dispatcher;
217
10740
  tls_cache_ = ThreadLocal::TypedSlot<TlsCache>::makeUnique(tls);
218
10740
  tls_cache_->set(
219
21402
      [](Event::Dispatcher&) -> std::shared_ptr<TlsCache> { return std::make_shared<TlsCache>(); });
220
10740
  tls_ = tls;
221
10740
}
222

            
223
10801
void ThreadLocalStoreImpl::shutdownThreading() {
224
  // This will block both future cache fills as well as cache flushes.
225
10801
  shutting_down_ = true;
226
10801
  ASSERT(!tls_.has_value() || tls_->isShutdown());
227

            
228
  // We can't call runOnAllThreads here as global threading has already been shutdown. It is okay
229
  // to simply clear the scopes and central cache entries here as they will be cleaned up during
230
  // thread local data cleanup in InstanceImpl::shutdownThread().
231
10801
  {
232
10801
    Thread::LockGuard lock(lock_);
233
10801
    scopes_to_cleanup_.clear();
234
10801
    central_cache_entries_to_cleanup_.clear();
235
10801
  }
236

            
237
10801
  Thread::LockGuard lock(hist_mutex_);
238
195512
  for (ParentHistogramImpl* histogram : histogram_set_) {
239
195401
    histogram->setShuttingDown(true);
240
195401
  }
241
10801
  histogram_set_.clear();
242
10801
  sinked_histograms_.clear();
243
10801
}
244

            
245
11643
void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) {
246
11643
  if (!shutting_down_) {
247
1172
    ASSERT(!merge_in_progress_);
248
1172
    merge_in_progress_ = true;
249
1172
    tls_cache_->runOnAllThreads(
250
2383
        [](OptRef<TlsCache> tls_cache) {
251
6394
          for (const auto& id_hist : tls_cache->tls_histogram_cache_) {
252
6283
            const TlsHistogramSharedPtr& tls_hist = id_hist.second;
253
6283
            tls_hist->beginMerge();
254
6283
          }
255
2383
        },
256
1172
        [this, merge_complete_cb]() -> void { mergeInternal(merge_complete_cb); });
257
10813
  } else {
258
    // If server is shutting down, just call the callback to allow flush to continue.
259
10471
    merge_complete_cb();
260
10471
  }
261
11643
}
262

            
263
1170
void ThreadLocalStoreImpl::mergeInternal(PostMergeCb merge_complete_cb) {
264
1170
  if (!shutting_down_) {
265
17911
    forEachHistogram(nullptr, [](ParentHistogram& histogram) { histogram.merge(); });
266
1170
    merge_complete_cb();
267
1170
    merge_in_progress_ = false;
268
1170
  }
269
1170
}
270

            
271
254286
ThreadLocalStoreImpl::CentralCacheEntry::~CentralCacheEntry() {
272
  // Assert that the symbol-table is valid, so we get good test coverage of
273
  // the validity of the symbol table at the time this destructor runs. This
274
  // is because many tests will not populate rejected_stats_.
275
254286
  ASSERT(symbol_table_.toString(StatNameManagedStorage("Hello.world", symbol_table_).statName()) ==
276
254286
         "Hello.world");
277
254286
  rejected_stats_.free(symbol_table_); // NOLINT(clang-analyzer-unix.Malloc)
278
254286
}
279

            
280
254286
void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) {
281
254286
  Thread::ReleasableLockGuard lock(lock_);
282
254286
  ASSERT(scopes_.count(scope) == 1);
283
254286
  scopes_.erase(scope);
284

            
285
  // This method is called directly from the ScopeImpl destructor, but we can't
286
  // destroy scope->central_cache_ until all the TLS caches are be destroyed, as
287
  // the TLS caches reference the Counters and Gauges owned by the central
288
  // cache. We don't want the maps in the TLS caches to bump the
289
  // reference-count, as decrementing the count requires an allocator lock,
290
  // which would cause a storm of contention during scope destruction.
291
  //
292
  // So instead we have a 2-phase destroy:
293
  //   1. destroy all the TLS caches
294
  //   2. destroy the central cache.
295
  //
296
  // Since this is called from ScopeImpl's destructor, we must bump the
297
  // ref-count of the central-cache by copying to a local scoped pointer, and
298
  // keep that reference alive until all the TLS caches are clear. This is done by keeping a
299
  // separate vector of shared_ptrs which will be destructed once all threads have completed.
300

            
301
  // This can happen from any thread. We post() back to the main thread which will initiate the
302
  // cache flush operation.
303
254286
  if (!shutting_down_ && main_thread_dispatcher_) {
304
    // Clear scopes in a batch. It's possible that many different scopes will be deleted at
305
    // the same time, before the main thread gets a chance to run cleanScopesFromCaches. If a new
306
    // scope is deleted before that post runs, we add it to our list of scopes to clear, and there
307
    // is no need to issue another post. This greatly reduces the overhead when there are tens of
308
    // thousands of scopes to clear in a short period. i.e.: VHDS updates with tens of thousands of
309
    // VirtualHosts.
310
115038
    bool need_post = scopes_to_cleanup_.empty();
311
115038
    scopes_to_cleanup_.push_back(scope->scope_id_);
312
115038
    assertLocked(*scope);
313
115038
    central_cache_entries_to_cleanup_.push_back(scope->centralCacheLockHeld());
314
115038
    lock.release();
315

            
316
115038
    if (need_post) {
317
14569
      main_thread_dispatcher_->post([this]() {
318
14568
        sync_.syncPoint(MainDispatcherCleanupSync);
319
14568
        clearScopesFromCaches();
320
14568
      });
321
14569
    }
322
115038
  }
323
254286
}
324

            
325
197488
void ThreadLocalStoreImpl::releaseHistogramCrossThread(uint64_t histogram_id) {
326
  // This can happen from any thread. We post() back to the main thread which will initiate the
327
  // cache flush operation.
328
197488
  if (!shutting_down_ && main_thread_dispatcher_) {
329
    // It's possible that many different histograms will be deleted at the same
330
    // time, before the main thread gets a chance to run
331
    // clearHistogramsFromCaches. If a new histogram is deleted before that
332
    // post runs, we add it to our list of histograms to clear, and there's no
333
    // need to issue another post.
334
2037
    bool need_post = false;
335
2037
    {
336
2037
      Thread::LockGuard lock(hist_mutex_);
337
2037
      need_post = histograms_to_cleanup_.empty();
338
2037
      histograms_to_cleanup_.push_back(histogram_id);
339
2037
    }
340
2037
    if (need_post) {
341
710
      main_thread_dispatcher_->post([this]() { clearHistogramsFromCaches(); });
342
710
    }
343
2037
  }
344
197488
}
345

            
346
ThreadLocalStoreImpl::TlsCacheEntry&
347
4779732
ThreadLocalStoreImpl::TlsCache::insertScope(uint64_t scope_id) {
348
4779732
  return scope_cache_[scope_id];
349
4779732
}
350

            
351
43864
void ThreadLocalStoreImpl::TlsCache::eraseScopes(const std::vector<uint64_t>& scope_ids) {
352
355201
  for (uint64_t scope_id : scope_ids) {
353
355201
    scope_cache_.erase(scope_id);
354
355201
  }
355
43864
}
356

            
357
1424
void ThreadLocalStoreImpl::TlsCache::eraseHistograms(const std::vector<uint64_t>& histograms) {
358
  // This is called for every histogram in every thread, even though the
359
  // histogram may not have been cached in each thread yet. So we don't
360
  // want to check whether the erase() call erased anything.
361
4078
  for (uint64_t histogram_id : histograms) {
362
4078
    tls_histogram_cache_.erase(histogram_id);
363
4078
  }
364
1424
}
365

            
366
14568
void ThreadLocalStoreImpl::clearScopesFromCaches() {
367
  // If we are shutting down we no longer perform cache flushes as workers may be shutting down
368
  // at the same time.
369
14568
  if (!shutting_down_) {
370
    // Perform a cache flush on all threads.
371

            
372
    // Capture all the pending scope ids in a local, clearing the list held in
373
    // this. Once this occurs, if a new scope is deleted, a new post will be
374
    // required.
375
14568
    auto scope_ids = std::make_shared<std::vector<uint64_t>>();
376
    // Capture all the central cache entries for scopes we're deleting. These will be freed after
377
    // all threads have completed.
378
14568
    auto central_caches = std::make_shared<std::vector<CentralCacheEntrySharedPtr>>();
379
14568
    {
380
14568
      Thread::LockGuard lock(lock_);
381
14568
      *scope_ids = std::move(scopes_to_cleanup_);
382
14568
      scopes_to_cleanup_.clear();
383
14568
      *central_caches = std::move(central_cache_entries_to_cleanup_);
384
14568
      central_cache_entries_to_cleanup_.clear();
385
14568
    }
386

            
387
14568
    tls_cache_->runOnAllThreads(
388
43864
        [scope_ids](OptRef<TlsCache> tls_cache) { tls_cache->eraseScopes(*scope_ids); },
389
14568
        [central_caches]() { /* Holds onto central_caches until all tls caches are clear */ });
390
14568
  }
391
14568
}
392

            
393
710
void ThreadLocalStoreImpl::clearHistogramsFromCaches() {
394
  // If we are shutting down we no longer perform cache flushes as workers may be shutting down
395
  // at the same time.
396
710
  if (!shutting_down_) {
397
    // Move the histograms pending cleanup into a local variable. Future histogram deletions will be
398
    // batched until the next time this function is called.
399
710
    auto histograms = std::make_shared<std::vector<uint64_t>>();
400
710
    {
401
710
      Thread::LockGuard lock(hist_mutex_);
402
710
      histograms->swap(histograms_to_cleanup_);
403
710
    }
404

            
405
710
    tls_cache_->runOnAllThreads(
406
1424
        [histograms](OptRef<TlsCache> tls_cache) { tls_cache->eraseHistograms(*histograms); });
407
710
  }
408
710
}
409

            
410
ThreadLocalStoreImpl::ScopeImpl::ScopeImpl(ThreadLocalStoreImpl& parent, StatName prefix,
411
                                           bool evictable, const ScopeStatsLimitSettings& limits,
412
                                           StatsMatcherSharedPtr scope_matcher)
413
254286
    : scope_id_(parent.next_scope_id_++), parent_(parent), evictable_(evictable), limits_(limits),
414
254286
      scope_matcher_(std::move(scope_matcher)), prefix_(prefix, parent.alloc_.symbolTable()),
415
254286
      central_cache_(new CentralCacheEntry(parent.alloc_.symbolTable())) {
416
254286
  parent_.ensureOverflowStats(limits_);
417
254286
}
418

            
419
254286
ThreadLocalStoreImpl::ScopeImpl::~ScopeImpl() {
420
  // Helps reproduce a previous race condition by pausing here in tests while we
421
  // loop over scopes. 'this' will not have been removed from the scopes_ table
422
  // yet, so we need to be careful.
423
254286
  parent_.sync_.syncPoint(DeleteScopeSync);
424

            
425
  // Note that scope iteration is thread-safe due to the lock held in
426
  // releaseScopeCrossThread. For more details see the comment in
427
  // `ThreadLocalStoreImpl::iterHelper`, and the lock it takes prior to the loop.
428
254286
  parent_.releaseScopeCrossThread(this);
429
254286
  prefix_.free(symbolTable());
430
254286
}
431

            
432
// Helper for managing the potential truncation of tags from the metric names and
433
// converting them to StatName. Making the tag extraction optional within this class simplifies the
434
// RAII ergonomics (as opposed to making the construction of this object conditional).
435
//
436
// The StatNameTagVector returned by this object will be valid as long as this object is in scope
437
// and the provided stat_name_tags are valid.
438
//
439
// When tag extraction is not done, this class is just a passthrough for the provided name/tags.
440
class StatNameTagHelper {
441
public:
442
  StatNameTagHelper(ThreadLocalStoreImpl& tls, StatName name,
443
                    const absl::optional<StatNameTagVector>& stat_name_tags)
444
5482163
      : pool_(tls.symbolTable()), stat_name_tags_(stat_name_tags.value_or(StatNameTagVector())) {
445
5482163
    if (!stat_name_tags) {
446
5481171
      TagVector tags;
447
5481171
      tag_extracted_name_ =
448
5481171
          pool_.add(tls.tagProducer().produceTags(tls.symbolTable().toString(name), tags));
449
5481171
      StatName empty;
450
5481237
      for (const auto& tag : tags) {
451
4567631
        StatName tag_name = tls.wellKnownTags().getBuiltin(tag.name_, empty);
452
4567631
        if (tag_name.empty()) {
453
2224842
          tag_name = pool_.add(tag.name_);
454
2224842
        }
455
4567631
        stat_name_tags_.emplace_back(tag_name, pool_.add(tag.value_));
456
4567631
      }
457
5481171
    } else {
458
992
      tag_extracted_name_ = name;
459
992
    }
460
5482163
  }
461

            
462
5479834
  const StatNameTagVector& statNameTags() const { return stat_name_tags_; }
463
5479834
  StatName tagExtractedName() const { return tag_extracted_name_; }
464

            
465
private:
466
  StatNamePool pool_;
467
  StatNameTagVector stat_name_tags_;
468
  StatName tag_extracted_name_;
469
};
470

            
471
bool ThreadLocalStoreImpl::checkAndRememberRejection(StatName name,
472
                                                     StatsMatcher::FastResult fast_reject_result,
473
                                                     StatNameStorageSet& central_rejected_stats,
474
                                                     StatNameHashSet* tls_rejected_stats,
475
5456044
                                                     const StatsMatcher& matcher) {
476
5456044
  if (matcher.acceptsAll()) {
477
5383995
    return false;
478
5383995
  }
479

            
480
72049
  auto iter = central_rejected_stats.find(name);
481
72049
  const StatNameStorage* rejected_name = nullptr;
482
72049
  if (iter != central_rejected_stats.end()) {
483
16
    rejected_name = &(*iter);
484
72033
  } else {
485
72033
    if (matcher.slowRejects(fast_reject_result, name)) {
486
69784
      auto insertion = central_rejected_stats.insert(StatNameStorage(name, symbolTable()));
487
69784
      const StatNameStorage& rejected_name_ref = *(insertion.first);
488
69784
      rejected_name = &rejected_name_ref;
489
69784
    }
490
72033
  }
491
72049
  if (rejected_name != nullptr) {
492
69800
    if (tls_rejected_stats != nullptr) {
493
69574
      tls_rejected_stats->insert(rejected_name->statName());
494
69574
    }
495
69800
    return true;
496
69800
  }
497
2249
  return false;
498
72049
}
499

            
500
template <class StatType>
501
StatType& ThreadLocalStoreImpl::ScopeImpl::safeMakeStat(
502
    StatName full_stat_name, StatName name_no_tags,
503
    const absl::optional<StatNameTagVector>& stat_name_tags,
504
    StatNameHashMap<RefcountPtr<StatType>>& central_cache_map,
505
    StatsMatcher::FastResult fast_reject_result, StatNameStorageSet& central_rejected_stats,
506
    MakeStatFn<StatType> make_stat, StatRefMap<StatType>* tls_cache,
507
5599818
    StatNameHashSet* tls_rejected_stats, StatType& null_stat) {
508

            
509
5599818
  if (tls_rejected_stats != nullptr &&
510
5599818
      tls_rejected_stats->find(full_stat_name) != tls_rejected_stats->end()) {
511
12
    return null_stat;
512
12
  }
513

            
514
  // If we have a valid cache entry, return it.
515
5599806
  if (tls_cache) {
516
4530995
    auto pos = tls_cache->find(full_stat_name);
517
4530995
    if (pos != tls_cache->end()) {
518
332919
      return pos->second;
519
332919
    }
520
4530995
  }
521

            
522
  // We must now look in the central store so we must be locked. We grab a reference to the
523
  // central store location. It might contain nothing. In this case, we allocate a new stat.
524
5266887
  Thread::LockGuard lock(parent_.lock_);
525
5266887
  auto iter = central_cache_map.find(full_stat_name);
526
5266887
  RefcountPtr<StatType>* central_ref = nullptr;
527
5266887
  if (iter != central_cache_map.end()) {
528
10695
    central_ref = &(iter->second);
529
5256192
  } else if (parent_.checkAndRememberRejection(full_stat_name, fast_reject_result,
530
5256192
                                               central_rejected_stats, tls_rejected_stats,
531
5256192
                                               effectiveMatcher())) {
532
69766
    return null_stat;
533
5186426
  } else {
534
    // Stat creation here. Check limits.
535
5186426
    if constexpr (std::is_same_v<StatType, Counter>) {
536
4110010
      if (limits_.max_counters != 0 && central_cache_map.size() >= limits_.max_counters) {
537
2
        parent_.counters_overflow_->inc();
538
2
        return null_stat;
539
2
      }
540
4110014
    } else if constexpr (std::is_same_v<StatType, Gauge>) {
541
1062054
      if (limits_.max_gauges != 0 && central_cache_map.size() >= limits_.max_gauges) {
542
2
        parent_.gauges_overflow_->inc();
543
2
        return null_stat;
544
2
      }
545
1062054
    } else {
546
      // TextReadouts are currently not limited, but we must ensure they are the only
547
      // other type being handled. This static_assert will trigger a compilation error
548
      // if a new StatType is introduced in the future, forcing the developer to
549
      // explicitly decide how to handle its limits.
550
14362
      static_assert(std::is_same_v<StatType, TextReadout>, "Unexpected StatType");
551
14362
    }
552
5172060
    StatNameTagHelper tag_helper(parent_, name_no_tags, stat_name_tags);
553

            
554
5186426
    RefcountPtr<StatType> stat = make_stat(
555
5186426
        parent_.alloc_, full_stat_name, tag_helper.tagExtractedName(), tag_helper.statNameTags());
556
5186426
    ASSERT(stat != nullptr);
557
5186426
    central_ref = &central_cache_map[stat->statName()];
558
5186426
    *central_ref = stat;
559
5186426
  }
560

            
561
  // If we have a TLS cache, insert the stat.
562
5197121
  StatType& ret = **central_ref;
563
5197121
  if (tls_cache) {
564
4128518
    tls_cache->insert(std::make_pair(ret.statName(), std::reference_wrapper<StatType>(ret)));
565
4128518
  }
566

            
567
  // Finally we return the reference.
568
5197121
  return ret;
569
5266887
}
570

            
571
Counter& ThreadLocalStoreImpl::ScopeImpl::counterFromStatNameWithTags(
572
4590070
    const StatName& name, StatNameTagVectorOptConstRef stat_name_tags) {
573
4590070
  if (scopeRejectsAll()) {
574
10
    return parent_.null_counter_;
575
10
  }
576

            
577
  // Determine the final name based on the prefix and the passed name.
578
4590060
  TagUtility::TagStatNameJoiner joiner(prefix_.statName(), name, stat_name_tags, symbolTable());
579
4590060
  Stats::StatName final_stat_name = joiner.nameWithTags();
580

            
581
4590060
  StatsMatcher::FastResult fast_reject_result = scopeFastRejects(final_stat_name);
582
4590060
  if (fast_reject_result == StatsMatcher::FastResult::Rejects) {
583
69119
    return parent_.null_counter_;
584
69119
  }
585

            
586
  // We now find the TLS cache. This might remain null if we don't have TLS
587
  // initialized currently.
588
4520941
  StatRefMap<Counter>* tls_cache = nullptr;
589
4520941
  StatNameHashSet* tls_rejected_stats = nullptr;
590
4520943
  if (!parent_.shutting_down_ && parent_.tls_cache_) {
591
3781465
    TlsCacheEntry& entry = parent_.tlsCache().insertScope(this->scope_id_);
592
3781465
    tls_cache = &entry.counters_;
593
3781465
    tls_rejected_stats = &entry.rejected_stats_;
594
3781465
  }
595

            
596
4520941
  const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
597
4520941
  return safeMakeStat<Counter>(
598
4520941
      final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->counters_,
599
4520941
      fast_reject_result, central_cache->rejected_stats_,
600
4520941
      [](Allocator& allocator, StatName name, StatName tag_extracted_name,
601
4520941
         const StatNameTagVector& tags) -> CounterSharedPtr {
602
4110009
        return allocator.makeCounter(name, tag_extracted_name, tags);
603
4110009
      },
604
4520941
      tls_cache, tls_rejected_stats, parent_.null_counter_);
605
4590060
}
606

            
607
485165
void ThreadLocalStoreImpl::deliverHistogramToSinks(const Histogram& histogram, uint64_t value) {
608
  // Thread local deliveries must be blocked outright for histograms and timers during shutdown.
609
  // This is because the sinks may end up trying to create new connections via the thread local
610
  // cluster manager which may already be destroyed (there is no way to sequence this because the
611
  // cluster manager destroying can create deliveries). We special case this explicitly to avoid
612
  // having to implement a shutdown() method (or similar) on every TLS object.
613
485165
  if (shutting_down_) {
614
944
    return;
615
944
  }
616

            
617
484221
  for (Sink& sink : timer_sinks_) {
618
9964
    sink.onHistogramComplete(histogram, value);
619
9964
  }
620
484221
}
621

            
622
Gauge& ThreadLocalStoreImpl::ScopeImpl::gaugeFromStatNameWithTags(
623
    const StatName& name, StatNameTagVectorOptConstRef stat_name_tags,
624
1064374
    Gauge::ImportMode import_mode) {
625
  // If a gauge is "hidden" it should not be rejected as these are used for deferred stats.
626
1064374
  if (scopeRejectsAll() && import_mode != Gauge::ImportMode::HiddenAccumulate) {
627
11
    return parent_.null_gauge_;
628
11
  }
629

            
630
  // See comments in counter(). There is no super clean way (via templates or otherwise) to
631
  // share this code so I'm leaving it largely duplicated for now.
632
1064363
  TagUtility::TagStatNameJoiner joiner(prefix_.statName(), name, stat_name_tags, symbolTable());
633
1064363
  StatName final_stat_name = joiner.nameWithTags();
634

            
635
1064363
  StatsMatcher::FastResult fast_reject_result;
636
1064363
  if (import_mode != Gauge::ImportMode::HiddenAccumulate) {
637
1064022
    fast_reject_result = scopeFastRejects(final_stat_name);
638
1064035
  } else {
639
341
    fast_reject_result = StatsMatcher::FastResult::Matches;
640
341
  }
641
1064363
  if (fast_reject_result == StatsMatcher::FastResult::Rejects) {
642
63
    return parent_.null_gauge_;
643
63
  }
644

            
645
1064300
  StatRefMap<Gauge>* tls_cache = nullptr;
646
1064300
  StatNameHashSet* tls_rejected_stats = nullptr;
647
1064300
  if (!parent_.shutting_down_ && parent_.tls_cache_) {
648
735040
    TlsCacheEntry& entry = parent_.tlsCache().insertScope(this->scope_id_);
649
735040
    tls_cache = &entry.gauges_;
650
735040
    tls_rejected_stats = &entry.rejected_stats_;
651
735040
  }
652

            
653
1064300
  const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
654
1064300
  Gauge& gauge = safeMakeStat<Gauge>(
655
1064300
      final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->gauges_,
656
1064300
      fast_reject_result, central_cache->rejected_stats_,
657
1064300
      [import_mode](Allocator& allocator, StatName name, StatName tag_extracted_name,
658
1064300
                    const StatNameTagVector& tags) -> GaugeSharedPtr {
659
1062052
        return allocator.makeGauge(name, tag_extracted_name, tags, import_mode);
660
1062052
      },
661
1064300
      tls_cache, tls_rejected_stats, parent_.null_gauge_);
662
1064300
  gauge.mergeImportMode(import_mode);
663
1064300
  return gauge;
664
1064363
}
665

            
666
Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags(
667
291199
    const StatName& name, StatNameTagVectorOptConstRef stat_name_tags, Histogram::Unit unit) {
668
  // See safety analysis comment in counterFromStatNameWithTags above.
669

            
670
291199
  if (scopeRejectsAll()) {
671
10
    return parent_.null_histogram_;
672
10
  }
673

            
674
  // See comments in counter(). There is no super clean way (via templates or otherwise) to
675
  // share this code so I'm leaving it largely duplicated for now.
676
291189
  TagUtility::TagStatNameJoiner joiner(prefix_.statName(), name, stat_name_tags, symbolTable());
677
291189
  StatName final_stat_name = joiner.nameWithTags();
678

            
679
291189
  StatsMatcher::FastResult fast_reject_result = scopeFastRejects(final_stat_name);
680
291189
  if (fast_reject_result == StatsMatcher::FastResult::Rejects) {
681
5
    return parent_.null_histogram_;
682
5
  }
683

            
684
291184
  StatNameHashMap<ParentHistogramSharedPtr>* tls_cache = nullptr;
685
291184
  StatNameHashSet* tls_rejected_stats = nullptr;
686
291184
  if (!parent_.shutting_down_ && parent_.tls_cache_) {
687
248717
    TlsCacheEntry& entry = parent_.tlsCache().insertScope(this->scope_id_);
688
248717
    tls_cache = &entry.parent_histograms_;
689
248717
    auto iter = tls_cache->find(final_stat_name);
690
248717
    if (iter != tls_cache->end()) {
691
90793
      return *iter->second;
692
90793
    }
693
157924
    tls_rejected_stats = &entry.rejected_stats_;
694
157924
    if (tls_rejected_stats->find(final_stat_name) != tls_rejected_stats->end()) {
695
4
      return parent_.null_histogram_;
696
4
    }
697
157924
  }
698

            
699
200387
  Thread::LockGuard lock(parent_.lock_);
700
200387
  const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
701
200387
  auto iter = central_cache->histograms_.find(final_stat_name);
702
200387
  ParentHistogramImplSharedPtr* central_ref = nullptr;
703
200387
  if (iter != central_cache->histograms_.end()) {
704
536
    central_ref = &iter->second;
705
199851
  } else if (parent_.checkAndRememberRejection(final_stat_name, fast_reject_result,
706
199851
                                               central_cache->rejected_stats_, tls_rejected_stats,
707
199851
                                               effectiveMatcher())) {
708
34
    return parent_.null_histogram_;
709
199817
  } else {
710
199817
    StatNameTagHelper tag_helper(parent_, joiner.tagExtractedName(), stat_name_tags);
711

            
712
199817
    ConstSupportedBuckets* buckets = nullptr;
713
199817
    const auto string_stat_name = symbolTable().toString(final_stat_name);
714
199817
    buckets = &parent_.histogram_settings_->buckets(string_stat_name);
715
199817
    const auto bins = parent_.histogram_settings_->bins(string_stat_name);
716

            
717
199817
    RefcountPtr<ParentHistogramImpl> stat;
718
199817
    {
719
199817
      Thread::LockGuard lock(parent_.hist_mutex_);
720
199817
      auto iter = parent_.histogram_set_.find(final_stat_name);
721
199817
      if (iter != parent_.histogram_set_.end()) {
722
2327
        stat = RefcountPtr<ParentHistogramImpl>(*iter);
723
197490
      } else {
724
197490
        if (limits_.max_histograms != 0 &&
725
197490
            central_cache->histograms_.size() >= limits_.max_histograms) {
726
2
          parent_.histograms_overflow_->inc();
727
2
          return parent_.null_histogram_;
728
2
        }
729
197488
        stat = new ParentHistogramImpl(final_stat_name, unit, parent_,
730
197488
                                       tag_helper.tagExtractedName(), tag_helper.statNameTags(),
731
197488
                                       *buckets, bins, parent_.next_histogram_id_++);
732
197488
        if (!parent_.shutting_down_) {
733
197486
          parent_.histogram_set_.insert(stat.get());
734
197486
          if (parent_.sink_predicates_.has_value() &&
735
197486
              parent_.sink_predicates_->includeHistogram(*stat)) {
736
3
            parent_.sinked_histograms_.insert(stat.get());
737
3
          }
738
197486
        }
739
197488
      }
740
199817
    }
741

            
742
199815
    central_ref = &central_cache->histograms_[stat->statName()];
743
199815
    *central_ref = stat;
744
199815
  }
745

            
746
200351
  if (tls_cache != nullptr) {
747
157902
    tls_cache->insert(std::make_pair((*central_ref)->statName(), *central_ref));
748
157902
  }
749
200351
  return **central_ref;
750
200387
}
751

            
752
TextReadout& ThreadLocalStoreImpl::ScopeImpl::textReadoutFromStatNameWithTags(
753
14588
    const StatName& name, StatNameTagVectorOptConstRef stat_name_tags) {
754
14588
  if (scopeRejectsAll()) {
755
10
    return parent_.null_text_readout_;
756
10
  }
757

            
758
  // Determine the final name based on the prefix and the passed name.
759
14578
  TagUtility::TagStatNameJoiner joiner(prefix_.statName(), name, stat_name_tags, symbolTable());
760
14578
  Stats::StatName final_stat_name = joiner.nameWithTags();
761

            
762
14578
  StatsMatcher::FastResult fast_reject_result = scopeFastRejects(final_stat_name);
763
14578
  if (fast_reject_result == StatsMatcher::FastResult::Rejects) {
764
1
    return parent_.null_text_readout_;
765
1
  }
766

            
767
  // We now find the TLS cache. This might remain null if we don't have TLS
768
  // initialized currently.
769
14577
  StatRefMap<TextReadout>* tls_cache = nullptr;
770
14577
  StatNameHashSet* tls_rejected_stats = nullptr;
771
14577
  if (!parent_.shutting_down_ && parent_.tls_cache_) {
772
14505
    TlsCacheEntry& entry = parent_.tlsCache().insertScope(this->scope_id_);
773
14505
    tls_cache = &entry.text_readouts_;
774
14505
    tls_rejected_stats = &entry.rejected_stats_;
775
14505
  }
776

            
777
14577
  const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
778
14577
  return safeMakeStat<TextReadout>(
779
14577
      final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->text_readouts_,
780
14577
      fast_reject_result, central_cache->rejected_stats_,
781
14577
      [](Allocator& allocator, StatName name, StatName tag_extracted_name,
782
14577
         const StatNameTagVector& tags) -> TextReadoutSharedPtr {
783
14362
        return allocator.makeTextReadout(name, tag_extracted_name, tags);
784
14362
      },
785
14577
      tls_cache, tls_rejected_stats, parent_.null_text_readout_);
786
14578
}
787

            
788
7
CounterOptConstRef ThreadLocalStoreImpl::ScopeImpl::findCounter(StatName name) const {
789
7
  Thread::LockGuard lock(parent_.lock_);
790
7
  return findStatLockHeld<Counter>(name, central_cache_->counters_);
791
7
}
792

            
793
63
GaugeOptConstRef ThreadLocalStoreImpl::ScopeImpl::findGauge(StatName name) const {
794
63
  Thread::LockGuard lock(parent_.lock_);
795
63
  return findStatLockHeld<Gauge>(name, central_cache_->gauges_);
796
63
}
797

            
798
5
HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogram(StatName name) const {
799
5
  Thread::LockGuard lock(parent_.lock_);
800
5
  return findHistogramLockHeld(name);
801
5
}
802

            
803
HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogramLockHeld(StatName name) const
804
5
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_.lock_) {
805
5
  auto iter = central_cache_->histograms_.find(name);
806
5
  if (iter == central_cache_->histograms_.end()) {
807
1
    return absl::nullopt;
808
1
  }
809

            
810
4
  RefcountPtr<Histogram> histogram_ref(iter->second);
811
4
  return std::cref(*histogram_ref);
812
5
}
813

            
814
2
TextReadoutOptConstRef ThreadLocalStoreImpl::ScopeImpl::findTextReadout(StatName name) const {
815
2
  Thread::LockGuard lock(parent_.lock_);
816
2
  return findStatLockHeld<TextReadout>(name, central_cache_->text_readouts_);
817
2
}
818

            
819
485161
Histogram& ThreadLocalStoreImpl::tlsHistogram(ParentHistogramImpl& parent, uint64_t id) {
820
  // tlsHistogram() is generally not called for a histogram that is rejected by
821
  // the matcher, so no further rejection-checking is needed at this level.
822
  // TlsHistogram inherits its reject/accept status from ParentHistogram.
823

            
824
  // See comments in counterFromStatName() which explains the logic here.
825

            
826
485161
  TlsHistogramSharedPtr* tls_histogram = nullptr;
827
485161
  if (!shutting_down_ && tls_cache_) {
828
484219
    tls_histogram = &(tlsCache().tls_histogram_cache_[id]);
829
484219
    if (*tls_histogram != nullptr) {
830
389238
      return **tls_histogram;
831
389238
    }
832
484219
  }
833

            
834
95923
  StatNameTagHelper tag_helper(*this, parent.statName(), absl::nullopt);
835

            
836
95923
  TlsHistogramSharedPtr hist_tls_ptr(
837
95923
      new ThreadLocalHistogramImpl(parent.statName(), parent.unit(), tag_helper.tagExtractedName(),
838
95923
                                   tag_helper.statNameTags(), symbolTable(), parent.bins()));
839

            
840
95923
  parent.addTlsHistogram(hist_tls_ptr);
841

            
842
95923
  if (tls_histogram != nullptr) {
843
94981
    *tls_histogram = hist_tls_ptr;
844
94981
  }
845

            
846
95923
  return *hist_tls_ptr;
847
485161
}
848

            
849
ThreadLocalHistogramImpl::ThreadLocalHistogramImpl(StatName name, Histogram::Unit unit,
850
                                                   StatName tag_extracted_name,
851
                                                   const StatNameTagVector& stat_name_tags,
852
                                                   SymbolTable& symbol_table,
853
                                                   absl::optional<uint32_t> bins)
854
95923
    : HistogramImplHelper(name, tag_extracted_name, stat_name_tags, symbol_table), unit_(unit),
855
95923
      used_(false), created_thread_id_(std::this_thread::get_id()), symbol_table_(symbol_table) {
856
95923
  histograms_[0] = bins ? hist_alloc_nbins(bins.value()) : hist_alloc();
857
95923
  histograms_[1] = bins ? hist_alloc_nbins(bins.value()) : hist_alloc();
858
95923
}
859

            
860
95923
ThreadLocalHistogramImpl::~ThreadLocalHistogramImpl() {
861
95923
  MetricImpl::clear(symbol_table_);
862
95923
  hist_free(histograms_[0]);
863
95923
  hist_free(histograms_[1]);
864
95923
}
865

            
866
485161
void ThreadLocalHistogramImpl::recordValue(uint64_t value) {
867
485161
  ASSERT(std::this_thread::get_id() == created_thread_id_);
868
485161
  hist_insert_intscale(histograms_[current_active_], value, 0, 1);
869
485161
  used_ = true;
870
485161
}
871

            
872
6272
void ThreadLocalHistogramImpl::merge(histogram_t* target) {
873
6272
  histogram_t** other_histogram = &histograms_[otherHistogramIndex()];
874
6272
  hist_accumulate(target, other_histogram, 1);
875
6272
  hist_clear(*other_histogram);
876
6272
}
877

            
878
ParentHistogramImpl::ParentHistogramImpl(StatName name, Histogram::Unit unit,
879
                                         ThreadLocalStoreImpl& thread_local_store,
880
                                         StatName tag_extracted_name,
881
                                         const StatNameTagVector& stat_name_tags,
882
                                         ConstSupportedBuckets& supported_buckets,
883
                                         absl::optional<uint32_t> bins, uint64_t id)
884
197488
    : MetricImpl(name, tag_extracted_name, stat_name_tags, thread_local_store.symbolTable()),
885
197488
      unit_(unit), bins_(bins), thread_local_store_(thread_local_store),
886
197488
      interval_histogram_(hist_alloc()), cumulative_histogram_(hist_alloc()),
887
197488
      interval_statistics_(interval_histogram_, unit, supported_buckets),
888
197488
      cumulative_statistics_(cumulative_histogram_, unit, supported_buckets), id_(id) {}
889

            
890
197488
ParentHistogramImpl::~ParentHistogramImpl() {
891
197488
  thread_local_store_.releaseHistogramCrossThread(id_);
892
197488
  ASSERT(ref_count_ == 0);
893
197488
  MetricImpl::clear(thread_local_store_.symbolTable());
894
197488
  hist_free(interval_histogram_);
895
197488
  hist_free(cumulative_histogram_);
896
197488
}
897

            
898
1364908
void ParentHistogramImpl::incRefCount() { ++ref_count_; }
899

            
900
1364908
bool ParentHistogramImpl::decRefCount() {
901
1364908
  bool ret;
902
1364908
  if (shutting_down_) {
903
    // When shutting down, we cannot reference thread_local_store_, as
904
    // histograms can outlive the store. So we decrement the ref-count without
905
    // the stores' lock. We will not be removing the object from the store's
906
    // histogram map in this scenario, as the set was cleared during shutdown,
907
    // and will not be repopulated in histogramFromStatNameWithTags after
908
    // initiating shutdown.
909
352275
    ret = --ref_count_ == 0;
910
1012633
  } else {
911
    // We delegate to the Store object to decrement the ref-count so it can hold
912
    // the lock to the map. If we don't hold a lock, another thread may
913
    // simultaneously try to allocate the same name'd histogram after we
914
    // decrement it, and we'll wind up with a dtor/update race. To avoid this we
915
    // must hold the lock until the histogram is removed from the map.
916
    //
917
    // See also StatsSharedImpl::decRefCount() in allocator.cc, which has
918
    // the same issue.
919
1012633
    ret = thread_local_store_.decHistogramRefCount(*this, ref_count_);
920
1012633
  }
921
1364908
  return ret;
922
1364908
}
923

            
924
bool ThreadLocalStoreImpl::decHistogramRefCount(ParentHistogramImpl& hist,
925
1012633
                                                std::atomic<uint32_t>& ref_count) {
926
  // We must hold the store's histogram lock when decrementing the
927
  // refcount. Otherwise another thread may simultaneously try to allocate the
928
  // same name'd stat after we decrement it, and we'll wind up with a
929
  // dtor/update race. To avoid this we must hold the lock until the stat is
930
  // removed from the map.
931
1012633
  Thread::LockGuard lock(hist_mutex_);
932
1012633
  ASSERT(ref_count >= 1);
933
1012633
  if (--ref_count == 0) {
934
2087
    if (!shutting_down_) {
935
2062
      const size_t count = histogram_set_.erase(hist.statName());
936
2062
      ASSERT(shutting_down_ || count == 1);
937
2062
      sinked_histograms_.erase(&hist);
938
2062
    }
939
2087
    return true;
940
2087
  }
941
1010546
  return false;
942
1012633
}
943

            
944
370310
SymbolTable& ParentHistogramImpl::symbolTable() { return thread_local_store_.symbolTable(); }
945

            
946
471386
Histogram::Unit ParentHistogramImpl::unit() const { return unit_; }
947

            
948
485161
void ParentHistogramImpl::recordValue(uint64_t value) {
949
485161
  Histogram& tls_histogram = thread_local_store_.tlsHistogram(*this, id_);
950
485161
  tls_histogram.recordValue(value);
951
485161
  thread_local_store_.deliverHistogramToSinks(*this, value);
952
485161
}
953

            
954
680
bool ParentHistogramImpl::used() const {
955
  // Consider ParentHistogram used only if has ever been merged.
956
680
  return merged_;
957
680
}
958

            
959
2
void ParentHistogramImpl::markUnused() {
960
2
  merged_ = false;
961
2
  Thread::LockGuard lock(merge_lock_);
962
2
  for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) {
963
2
    tls_histogram->markUnused();
964
2
  }
965
2
}
966

            
967
665
bool ParentHistogramImpl::hidden() const { return false; }
968

            
969
17911
void ParentHistogramImpl::merge() {
970
17911
  Thread::ReleasableLockGuard lock(merge_lock_);
971
17911
  if (merged_ || usedLockHeld()) {
972
6233
    hist_clear(interval_histogram_);
973
    // Here we could copy all the pointers to TLS histograms in the tls_histogram_ list,
974
    // then release the lock before we do the actual merge. However it is not a big deal
975
    // because the tls_histogram merge is not that expensive as it is a single histogram
976
    // merge and adding TLS histograms is rare.
977
6272
    for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) {
978
6272
      tls_histogram->merge(interval_histogram_);
979
6272
    }
980
    // Since TLS merge is done, we can release the lock here.
981
6233
    lock.release();
982
6233
    hist_accumulate(cumulative_histogram_, &interval_histogram_, 1);
983
6233
    cumulative_statistics_.refresh(cumulative_histogram_);
984
6233
    interval_statistics_.refresh(interval_histogram_);
985
6233
    merged_ = true;
986
6233
  }
987
17911
}
988

            
989
7
std::string ParentHistogramImpl::quantileSummary() const {
990
7
  if (used()) {
991
7
    std::vector<std::string> summary;
992
7
    const std::vector<double>& supported_quantiles_ref = interval_statistics_.supportedQuantiles();
993
7
    summary.reserve(supported_quantiles_ref.size());
994
77
    for (size_t i = 0; i < supported_quantiles_ref.size(); ++i) {
995
70
      summary.push_back(fmt::format("P{:g}({},{})", 100 * supported_quantiles_ref[i],
996
70
                                    interval_statistics_.computedQuantiles()[i],
997
70
                                    cumulative_statistics_.computedQuantiles()[i]));
998
70
    }
999
7
    return absl::StrJoin(summary, " ");
7
  } else {
    return {"No recorded values"};
  }
7
}
7
std::string ParentHistogramImpl::bucketSummary() const {
7
  if (used()) {
6
    std::vector<std::string> bucket_summary;
6
    ConstSupportedBuckets& supported_buckets = interval_statistics_.supportedBuckets();
6
    bucket_summary.reserve(supported_buckets.size());
120
    for (size_t i = 0; i < supported_buckets.size(); ++i) {
114
      bucket_summary.push_back(fmt::format("B{:g}({},{})", supported_buckets[i],
114
                                           interval_statistics_.computedBuckets()[i],
114
                                           cumulative_statistics_.computedBuckets()[i]));
114
    }
6
    return absl::StrJoin(bucket_summary, " ");
6
  } else {
1
    return {"No recorded values"};
1
  }
7
}
std::vector<Stats::ParentHistogram::Bucket>
23
ParentHistogramImpl::detailedlBucketsHelper(const histogram_t& histogram) const {
23
  const uint32_t num_buckets = hist_num_buckets(&histogram);
23
  std::vector<Stats::ParentHistogram::Bucket> buckets(num_buckets);
23
  hist_bucket_t hist_bucket;
336
  for (uint32_t i = 0; i < num_buckets; ++i) {
313
    ParentHistogram::Bucket& bucket = buckets[i];
313
    hist_bucket_idx_bucket(&histogram, i, &hist_bucket, &bucket.count_);
313
    bucket.lower_bound_ = hist_bucket_to_double(hist_bucket);
313
    bucket.width_ = hist_bucket_to_double_bin_width(hist_bucket);
313
    if (unit_ == Histogram::Unit::Percent) {
9
      constexpr double percent_scale = 1.0 / Histogram::PercentScale;
9
      bucket.lower_bound_ *= percent_scale;
9
      bucket.width_ *= percent_scale;
9
    }
313
  }
23
  return buckets;
23
}
147
uint64_t ParentHistogramImpl::cumulativeCountLessThanOrEqualToValue(double value) const {
  // `hist_approx_count_below` is slightly misnamed. It's documentation states:
  //
  // Returns the number of values in buckets that are entirely lower than or equal to threshold.
147
  const double raw_value =
147
      (unit_ == Histogram::Unit::Percent) ? (value * Histogram::PercentScale) : value;
147
  return hist_approx_count_below(cumulative_histogram_, raw_value);
147
}
95923
void ParentHistogramImpl::addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr) {
95923
  Thread::LockGuard lock(merge_lock_);
95923
  tls_histograms_.emplace_back(hist_ptr);
95923
}
14986
bool ParentHistogramImpl::usedLockHeld() const {
14986
  for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) {
3308
    if (tls_histogram->used()) {
3308
      return true;
3308
    }
3308
  }
11678
  return false;
14986
}
271143
void ThreadLocalStoreImpl::forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
271143
  alloc_.forEachCounter(f_size, f_stat);
271143
}
19028
void ThreadLocalStoreImpl::forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
19028
  alloc_.forEachGauge(f_size, f_stat);
19028
}
27
void ThreadLocalStoreImpl::forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
27
  alloc_.forEachTextReadout(f_size, f_stat);
27
}
44697
void ThreadLocalStoreImpl::forEachHistogram(SizeFn f_size, StatFn<ParentHistogram> f_stat) const {
44697
  Thread::LockGuard lock(hist_mutex_);
44697
  if (f_size != nullptr) {
43524
    f_size(histogram_set_.size());
43524
  }
640231
  for (ParentHistogramImpl* histogram : histogram_set_) {
634439
    f_stat(*histogram);
634439
  }
44697
}
void ThreadLocalStoreImpl::forEachScope(std::function<void(std::size_t)> f_size,
356
                                        StatFn<const Scope> f_scope) const {
356
  std::vector<ScopeSharedPtr> scopes;
39185
  iterateScopes([&scopes](const ScopeImplSharedPtr& scope) -> bool {
39185
    scopes.push_back(scope);
39185
    return true;
39185
  });
356
  if (f_size != nullptr) {
351
    f_size(scopes.size());
351
  }
39185
  for (const ScopeSharedPtr& scope : scopes) {
39185
    f_scope(*scope);
39185
  }
356
}
namespace {
struct MetricBag {
8
  explicit MetricBag(uint64_t scope_id) : scope_id_(scope_id) {}
  const uint64_t scope_id_;
  StatNameHashMap<CounterSharedPtr> counters_;
  StatNameHashMap<GaugeSharedPtr> gauges_;
  StatNameHashMap<ParentHistogramImplSharedPtr> histograms_;
  StatNameHashMap<TextReadoutSharedPtr> text_readouts_;
8
  bool empty() const {
8
    return counters_.empty() && gauges_.empty() && histograms_.empty() && text_readouts_.empty();
8
  }
};
} // namespace
6
void ThreadLocalStoreImpl::evictUnused() {
6
  ASSERT_IS_MAIN_OR_TEST_THREAD();
  // If we are shutting down, we no longer perform eviction as workers may be shutting down
  // and not able to complete their work.
6
  if (shutting_down_ || !tls_cache_) {
    return;
  }
6
  auto evicted_metrics = std::make_shared<std::vector<MetricBag>>();
6
  {
6
    Thread::LockGuard lock(lock_);
14
    iterateScopesLockHeld([evicted_metrics](const ScopeImplSharedPtr& scope) -> bool {
14
      if (scope->evictable_) {
8
        MetricBag metrics(scope->scope_id_);
8
        CentralCacheEntrySharedPtr& central_cache = scope->centralCacheMutableNoThreadAnalysis();
32
        auto filter_unused = []<typename T>(StatNameHashMap<T>& unused_metrics) {
32
          return [&unused_metrics](std::pair<StatName, T> kv) {
21
            const auto& [name, metric] = kv;
21
            if (metric->used()) {
8
              metric->markUnused();
8
              return false;
13
            } else {
13
              unused_metrics.try_emplace(name, metric);
13
              return true;
13
            }
21
          };
32
        };
8
        absl::erase_if(central_cache->counters_, filter_unused(metrics.counters_));
8
        absl::erase_if(central_cache->gauges_, filter_unused(metrics.gauges_));
8
        absl::erase_if(central_cache->text_readouts_, filter_unused(metrics.text_readouts_));
8
        absl::erase_if(central_cache->histograms_, filter_unused(metrics.histograms_));
8
        if (!metrics.empty()) {
5
          evicted_metrics->push_back(std::move(metrics));
5
        }
8
      }
14
      return true;
14
    });
6
  }
  // At this point, central caches no longer return the evicted stats, but we
  // need to keep the storage for the evicted stats until after the thread
  // local caches are cleared.
6
  if (!evicted_metrics->empty()) {
4
    tls_cache_->runOnAllThreads(
4
        [evicted_metrics](OptRef<TlsCache> tls_cache) {
5
          for (const auto& metrics : *evicted_metrics) {
5
            TlsCacheEntry& entry = tls_cache->insertScope(metrics.scope_id_);
5
            absl::erase_if(entry.counters_,
5
                           [&](std::pair<StatName, std::reference_wrapper<Counter>> kv) {
4
                             return metrics.counters_.contains(kv.first);
4
                           });
5
            absl::erase_if(entry.gauges_,
5
                           [&](std::pair<StatName, std::reference_wrapper<Gauge>> kv) {
5
                             return metrics.gauges_.contains(kv.first);
5
                           });
5
            absl::erase_if(entry.text_readouts_,
5
                           [&](std::pair<StatName, std::reference_wrapper<TextReadout>> kv) {
3
                             return metrics.text_readouts_.contains(kv.first);
3
                           });
5
            absl::erase_if(entry.parent_histograms_,
5
                           [&](std::pair<StatName, ParentHistogramSharedPtr> kv) {
4
                             return metrics.histograms_.contains(kv.first);
4
                           });
5
          }
4
        },
4
        [evicted_metrics]() {
          // We want to delete stale stats on the main thread since stat
          // destructors lock the stats allocator. Note that we might have
          // received fresh values on the stale cache-local stats after deleting them from the
          // central cache.. Eventually, we might also want to defer the deletion further in the
          // allocator until the values are flushed to the sinks.
4
          size_t scopes = 0, counters = 0, gauges = 0, readouts = 0, histograms = 0;
5
          for (const auto& metrics : *evicted_metrics) {
5
            scopes += 1;
5
            counters += metrics.counters_.size();
5
            gauges += metrics.gauges_.size();
5
            readouts += metrics.text_readouts_.size();
5
            histograms += metrics.histograms_.size();
5
          }
4
          ENVOY_LOG(debug,
4
                    "deleted stale {} counters, {} gauges, {} text readouts, {} histograms from "
4
                    "{} scopes",
4
                    counters, gauges, readouts, histograms, scopes);
4
        });
4
  }
6
}
bool ThreadLocalStoreImpl::iterateScopesLockHeld(
    const std::function<bool(const ScopeImplSharedPtr&)> fn) const
11020
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
60523
  for (auto& iter : scopes_) {
60523
    sync_.syncPoint(ThreadLocalStoreImpl::IterateScopeSync);
    // We keep the scopes as a map from Scope* to weak_ptr<Scope> so that if,
    // during the iteration, the last reference to a ScopeSharedPtr is dropped,
    // we can test for that here by attempting to lock the weak pointer, and
    // skip those that are nullptr.
60523
    const std::weak_ptr<ScopeImpl>& scope = iter.second;
60523
    const ScopeImplSharedPtr& locked = scope.lock();
60523
    if (locked != nullptr && !fn(locked)) {
5
      return false;
5
    }
60523
  }
11015
  return true;
11020
}
11825
void ThreadLocalStoreImpl::forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
11825
  alloc_.forEachSinkedCounter(f_size, f_stat);
11825
}
11825
void ThreadLocalStoreImpl::forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
11825
  alloc_.forEachSinkedGauge(f_size, f_stat);
11825
}
void ThreadLocalStoreImpl::forEachSinkedTextReadout(SizeFn f_size,
11825
                                                    StatFn<TextReadout> f_stat) const {
11825
  alloc_.forEachSinkedTextReadout(f_size, f_stat);
11825
}
void ThreadLocalStoreImpl::forEachSinkedHistogram(SizeFn f_size,
11827
                                                  StatFn<ParentHistogram> f_stat) const {
11827
  if (sink_predicates_.has_value()) {
3
    Thread::LockGuard lock(hist_mutex_);
3
    if (f_size != nullptr) {
3
      f_size(sinked_histograms_.size());
3
    }
4
    for (auto histogram : sinked_histograms_) {
4
      f_stat(*histogram);
4
    }
11827
  } else {
11824
    forEachHistogram(f_size, f_stat);
11824
  }
11827
}
3
void ThreadLocalStoreImpl::setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) {
3
  ASSERT(sink_predicates != nullptr);
3
  if (sink_predicates != nullptr) {
3
    sink_predicates_.emplace(*sink_predicates);
3
    alloc_.setSinkPredicates(std::move(sink_predicates));
    // Add histograms to the set of sinked histograms.
3
    Thread::LockGuard lock(hist_mutex_);
3
    sinked_histograms_.clear();
5
    for (auto& histogram : histogram_set_) {
5
      if (sink_predicates_->includeHistogram(*histogram)) {
1
        sinked_histograms_.insert(histogram);
1
      }
5
    }
3
  }
3
}
void ThreadLocalStoreImpl::extractAndAppendTags(StatName name, StatNamePool& pool,
4
                                                StatNameTagVector& stat_tags) {
4
  extractAndAppendTags(symbolTable().toString(name), pool, stat_tags);
4
}
void ThreadLocalStoreImpl::extractAndAppendTags(absl::string_view name, StatNamePool& pool,
4
                                                StatNameTagVector& stat_tags) {
4
  TagVector tags;
4
  tagProducer().produceTags(name, tags);
4
  for (const auto& tag : tags) {
3
    stat_tags.emplace_back(pool.add(tag.name_), pool.add(tag.value_));
3
  }
4
}
254286
void ThreadLocalStoreImpl::ensureOverflowStats(const ScopeStatsLimitSettings& limits) {
254286
  const bool need_counter_overflow_stat = limits.max_counters != 0;
254286
  const bool need_gauge_overflow_stat = limits.max_gauges != 0;
254286
  const bool need_histogram_overflow_stat = limits.max_histograms != 0;
254286
  if (!need_counter_overflow_stat && !need_gauge_overflow_stat && !need_histogram_overflow_stat) {
254285
    return;
254285
  }
1
  Thread::LockGuard lock(lock_);
1
  if (need_counter_overflow_stat && counters_overflow_ == nullptr) {
1
    StatNamePool pool(symbolTable());
1
    StatName name = pool.add("server.stats_overflow.counter");
1
    counters_overflow_ = alloc_.makeCounter(name, name, {});
1
  }
1
  if (need_gauge_overflow_stat && gauges_overflow_ == nullptr) {
1
    StatNamePool pool(symbolTable());
1
    StatName name = pool.add("server.stats_overflow.gauge");
1
    gauges_overflow_ = alloc_.makeCounter(name, name, {});
1
  }
1
  if (need_histogram_overflow_stat && histograms_overflow_ == nullptr) {
1
    StatNamePool pool(symbolTable());
1
    StatName name = pool.add("server.stats_overflow.histogram");
1
    histograms_overflow_ = alloc_.makeCounter(name, name, {});
1
  }
1
}
} // namespace Stats
} // namespace Envoy