Line data Source code
1 : #include "source/common/stats/thread_local_store.h"
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <list>
6 : #include <memory>
7 : #include <string>
8 :
9 : #include "envoy/stats/allocator.h"
10 : #include "envoy/stats/histogram.h"
11 : #include "envoy/stats/sink.h"
12 : #include "envoy/stats/stats.h"
13 :
14 : #include "source/common/common/lock_guard.h"
15 : #include "source/common/runtime/runtime_features.h"
16 : #include "source/common/stats/histogram_impl.h"
17 : #include "source/common/stats/stats_matcher_impl.h"
18 : #include "source/common/stats/tag_producer_impl.h"
19 : #include "source/common/stats/tag_utility.h"
20 :
21 : #include "absl/strings/str_join.h"
22 :
23 : namespace Envoy {
24 : namespace Stats {
25 :
26 : const char ThreadLocalStoreImpl::DeleteScopeSync[] = "delete-scope";
27 : const char ThreadLocalStoreImpl::IterateScopeSync[] = "iterate-scope";
28 : const char ThreadLocalStoreImpl::MainDispatcherCleanupSync[] = "main-dispatcher-cleanup";
29 :
30 : ThreadLocalStoreImpl::ThreadLocalStoreImpl(Allocator& alloc)
31 : : alloc_(alloc), tag_producer_(std::make_unique<TagProducerImpl>()),
32 : stats_matcher_(std::make_unique<StatsMatcherImpl>()),
33 : histogram_settings_(std::make_unique<HistogramSettingsImpl>()),
34 : null_counter_(alloc.symbolTable()), null_gauge_(alloc.symbolTable()),
35 : null_histogram_(alloc.symbolTable()), null_text_readout_(alloc.symbolTable()),
36 98 : well_known_tags_(alloc.symbolTable().makeSet("well_known_tags")) {
37 1372 : for (const auto& desc : Config::TagNames::get().descriptorVec()) {
38 1372 : well_known_tags_->rememberBuiltin(desc.name_);
39 1372 : }
40 98 : StatNameManagedStorage empty("", alloc.symbolTable());
41 98 : auto new_scope = std::make_shared<ScopeImpl>(*this, StatName(empty.statName()));
42 98 : addScope(new_scope);
43 98 : default_scope_ = new_scope;
44 98 : }
45 :
46 98 : ThreadLocalStoreImpl::~ThreadLocalStoreImpl() {
47 98 : ASSERT(shutting_down_ || !threading_ever_initialized_);
48 98 : default_scope_.reset();
49 98 : ASSERT(scopes_.empty());
50 98 : ASSERT(scopes_to_cleanup_.empty());
51 98 : ASSERT(central_cache_entries_to_cleanup_.empty());
52 98 : ASSERT(histograms_to_cleanup_.empty());
53 98 : }
54 :
55 98 : void ThreadLocalStoreImpl::setHistogramSettings(HistogramSettingsConstPtr&& histogram_settings) {
56 196 : iterateScopes([](const ScopeImplSharedPtr& scope) -> bool {
57 196 : ASSERT(scope->centralCacheLockHeld()->histograms_.empty());
58 196 : return true;
59 196 : });
60 98 : histogram_settings_ = std::move(histogram_settings);
61 98 : }
62 :
63 98 : void ThreadLocalStoreImpl::setStatsMatcher(StatsMatcherPtr&& stats_matcher) {
64 98 : stats_matcher_ = std::move(stats_matcher);
65 98 : if (stats_matcher_->acceptsAll()) {
66 98 : return;
67 98 : }
68 :
69 : // The Filesystem and potentially other stat-registering objects are
70 : // constructed prior to the stat-matcher, and those add stats
71 : // in the default_scope. There should be no requests, so there will
72 : // be no copies in TLS caches.
73 0 : Thread::LockGuard lock(lock_);
74 0 : const uint32_t first_histogram_index = deleted_histograms_.size();
75 0 : iterateScopesLockHeld([this](const ScopeImplSharedPtr& scope) ABSL_EXCLUSIVE_LOCKS_REQUIRED(
76 0 : lock_) -> bool {
77 0 : const CentralCacheEntrySharedPtr& central_cache = scope->centralCacheLockHeld();
78 0 : removeRejectedStats<CounterSharedPtr>(central_cache->counters_,
79 0 : [this](const CounterSharedPtr& counter) mutable {
80 0 : alloc_.markCounterForDeletion(counter);
81 0 : });
82 0 : removeRejectedStats<GaugeSharedPtr>(
83 0 : central_cache->gauges_,
84 0 : [this](const GaugeSharedPtr& gauge) mutable { alloc_.markGaugeForDeletion(gauge); });
85 0 : removeRejectedStats(central_cache->histograms_, deleted_histograms_);
86 0 : removeRejectedStats<TextReadoutSharedPtr>(
87 0 : central_cache->text_readouts_, [this](const TextReadoutSharedPtr& text_readout) mutable {
88 0 : alloc_.markTextReadoutForDeletion(text_readout);
89 0 : });
90 0 : return true;
91 0 : });
92 :
93 : // Remove any newly rejected histograms from histogram_set_.
94 0 : {
95 0 : Thread::LockGuard hist_lock(hist_mutex_);
96 0 : for (uint32_t i = first_histogram_index; i < deleted_histograms_.size(); ++i) {
97 0 : uint32_t erased = histogram_set_.erase(deleted_histograms_[i].get());
98 0 : ASSERT(erased == 1);
99 0 : sinked_histograms_.erase(deleted_histograms_[i].get());
100 0 : }
101 0 : }
102 0 : }
103 :
104 : template <class StatMapClass, class StatListClass>
105 0 : void ThreadLocalStoreImpl::removeRejectedStats(StatMapClass& map, StatListClass& list) {
106 0 : StatNameVec remove_list;
107 0 : for (auto& stat : map) {
108 0 : if (rejects(stat.first)) {
109 0 : remove_list.push_back(stat.first);
110 0 : }
111 0 : }
112 0 : for (StatName stat_name : remove_list) {
113 0 : auto iter = map.find(stat_name);
114 0 : ASSERT(iter != map.end());
115 0 : list.push_back(iter->second); // Save SharedPtr to the list to avoid invalidating refs to stat.
116 0 : map.erase(iter);
117 0 : }
118 0 : }
119 :
120 : template <class StatSharedPtr>
121 : void ThreadLocalStoreImpl::removeRejectedStats(
122 0 : StatNameHashMap<StatSharedPtr>& map, std::function<void(const StatSharedPtr&)> f_deletion) {
123 0 : StatNameVec remove_list;
124 0 : for (auto& stat : map) {
125 0 : if (rejects(stat.first)) {
126 0 : remove_list.push_back(stat.first);
127 0 : }
128 0 : }
129 0 : for (StatName stat_name : remove_list) {
130 0 : auto iter = map.find(stat_name);
131 0 : ASSERT(iter != map.end());
132 0 : f_deletion(iter->second);
133 0 : map.erase(iter);
134 0 : }
135 0 : }
136 :
137 53227 : StatsMatcher::FastResult ThreadLocalStoreImpl::fastRejects(StatName stat_name) const {
138 53227 : return stats_matcher_->fastRejects(stat_name);
139 53227 : }
140 :
141 : bool ThreadLocalStoreImpl::slowRejects(StatsMatcher::FastResult fast_reject_result,
142 0 : StatName stat_name) const {
143 0 : return stats_matcher_->slowRejects(fast_reject_result, stat_name);
144 0 : }
145 :
146 2056 : std::vector<CounterSharedPtr> ThreadLocalStoreImpl::counters() const {
147 : // Handle de-dup due to overlapping scopes.
148 2056 : std::vector<CounterSharedPtr> ret;
149 2056 : forEachCounter([&ret](std::size_t size) { ret.reserve(size); },
150 798052 : [&ret](Counter& counter) { ret.emplace_back(CounterSharedPtr(&counter)); });
151 2056 : return ret;
152 2056 : }
153 :
154 1239 : ScopeSharedPtr ThreadLocalStoreImpl::ScopeImpl::createScope(const std::string& name) {
155 1239 : StatNameManagedStorage stat_name_storage(Utility::sanitizeStatsName(name), symbolTable());
156 1239 : return scopeFromStatName(stat_name_storage.statName());
157 1239 : }
158 :
159 1345 : ScopeSharedPtr ThreadLocalStoreImpl::ScopeImpl::scopeFromStatName(StatName name) {
160 1345 : SymbolTable::StoragePtr joined = symbolTable().join({prefix_.statName(), name});
161 1345 : auto new_scope = std::make_shared<ScopeImpl>(parent_, StatName(joined.get()));
162 1345 : parent_.addScope(new_scope);
163 1345 : return new_scope;
164 1345 : }
165 :
166 1443 : void ThreadLocalStoreImpl::addScope(std::shared_ptr<ScopeImpl>& new_scope) {
167 1443 : Thread::LockGuard lock(lock_);
168 1443 : scopes_[new_scope.get()] = std::weak_ptr<ScopeImpl>(new_scope);
169 1443 : }
170 :
171 974 : std::vector<GaugeSharedPtr> ThreadLocalStoreImpl::gauges() const {
172 : // Handle de-dup due to overlapping scopes.
173 974 : std::vector<GaugeSharedPtr> ret;
174 974 : forEachGauge([&ret](std::size_t size) { ret.reserve(size); },
175 135488 : [&ret](Gauge& gauge) { ret.emplace_back(GaugeSharedPtr(&gauge)); });
176 974 : return ret;
177 974 : }
178 :
179 0 : std::vector<TextReadoutSharedPtr> ThreadLocalStoreImpl::textReadouts() const {
180 : // Handle de-dup due to overlapping scopes.
181 0 : std::vector<TextReadoutSharedPtr> ret;
182 0 : forEachTextReadout(
183 0 : [&ret](std::size_t size) { ret.reserve(size); },
184 0 : [&ret](TextReadout& text_readout) { ret.emplace_back(TextReadoutSharedPtr(&text_readout)); });
185 0 : return ret;
186 0 : }
187 :
188 0 : std::vector<ParentHistogramSharedPtr> ThreadLocalStoreImpl::histograms() const {
189 0 : std::vector<ParentHistogramSharedPtr> ret;
190 0 : forEachHistogram([&ret](std::size_t size) mutable { ret.reserve(size); },
191 0 : [&ret](ParentHistogram& histogram) mutable {
192 0 : ret.emplace_back(ParentHistogramSharedPtr(&histogram));
193 0 : });
194 0 : return ret;
195 0 : }
196 :
197 : void ThreadLocalStoreImpl::initializeThreading(Event::Dispatcher& main_thread_dispatcher,
198 98 : ThreadLocal::Instance& tls) {
199 98 : threading_ever_initialized_ = true;
200 98 : main_thread_dispatcher_ = &main_thread_dispatcher;
201 98 : tls_cache_ = ThreadLocal::TypedSlot<TlsCache>::makeUnique(tls);
202 98 : tls_cache_->set(
203 192 : [](Event::Dispatcher&) -> std::shared_ptr<TlsCache> { return std::make_shared<TlsCache>(); });
204 98 : tls_ = tls;
205 98 : }
206 :
207 98 : void ThreadLocalStoreImpl::shutdownThreading() {
208 : // This will block both future cache fills as well as cache flushes.
209 98 : shutting_down_ = true;
210 98 : ASSERT(!tls_.has_value() || tls_->isShutdown());
211 :
212 : // We can't call runOnAllThreads here as global threading has already been shutdown. It is okay
213 : // to simply clear the scopes and central cache entries here as they will be cleaned up during
214 : // thread local data cleanup in InstanceImpl::shutdownThread().
215 98 : {
216 98 : Thread::LockGuard lock(lock_);
217 98 : scopes_to_cleanup_.clear();
218 98 : central_cache_entries_to_cleanup_.clear();
219 98 : }
220 :
221 98 : Thread::LockGuard lock(hist_mutex_);
222 1539 : for (ParentHistogramImpl* histogram : histogram_set_) {
223 1539 : histogram->setShuttingDown(true);
224 1539 : }
225 98 : histogram_set_.clear();
226 98 : sinked_histograms_.clear();
227 98 : }
228 :
229 94 : void ThreadLocalStoreImpl::mergeHistograms(PostMergeCb merge_complete_cb) {
230 94 : if (!shutting_down_) {
231 0 : ASSERT(!merge_in_progress_);
232 0 : merge_in_progress_ = true;
233 0 : tls_cache_->runOnAllThreads(
234 0 : [](OptRef<TlsCache> tls_cache) {
235 0 : for (const auto& id_hist : tls_cache->tls_histogram_cache_) {
236 0 : const TlsHistogramSharedPtr& tls_hist = id_hist.second;
237 0 : tls_hist->beginMerge();
238 0 : }
239 0 : },
240 0 : [this, merge_complete_cb]() -> void { mergeInternal(merge_complete_cb); });
241 94 : } else {
242 : // If server is shutting down, just call the callback to allow flush to continue.
243 94 : merge_complete_cb();
244 94 : }
245 94 : }
246 :
247 0 : void ThreadLocalStoreImpl::mergeInternal(PostMergeCb merge_complete_cb) {
248 0 : if (!shutting_down_) {
249 0 : forEachHistogram(nullptr, [](ParentHistogram& histogram) { histogram.merge(); });
250 0 : merge_complete_cb();
251 0 : merge_in_progress_ = false;
252 0 : }
253 0 : }
254 :
255 1443 : ThreadLocalStoreImpl::CentralCacheEntry::~CentralCacheEntry() {
256 : // Assert that the symbol-table is valid, so we get good test coverage of
257 : // the validity of the symbol table at the time this destructor runs. This
258 : // is because many tests will not populate rejected_stats_.
259 1443 : ASSERT(symbol_table_.toString(StatNameManagedStorage("Hello.world", symbol_table_).statName()) ==
260 1443 : "Hello.world");
261 1443 : rejected_stats_.free(symbol_table_); // NOLINT(clang-analyzer-unix.Malloc)
262 1443 : }
263 :
264 1443 : void ThreadLocalStoreImpl::releaseScopeCrossThread(ScopeImpl* scope) {
265 1443 : Thread::ReleasableLockGuard lock(lock_);
266 1443 : ASSERT(scopes_.count(scope) == 1);
267 1443 : scopes_.erase(scope);
268 :
269 : // This method is called directly from the ScopeImpl destructor, but we can't
270 : // destroy scope->central_cache_ until all the TLS caches are be destroyed, as
271 : // the TLS caches reference the Counters and Gauges owned by the central
272 : // cache. We don't want the maps in the TLS caches to bump the
273 : // reference-count, as decrementing the count requires an allocator lock,
274 : // which would cause a storm of contention during scope destruction.
275 : //
276 : // So instead we have a 2-phase destroy:
277 : // 1. destroy all the TLS caches
278 : // 2. destroy the central cache.
279 : //
280 : // Since this is called from ScopeImpl's destructor, we must bump the
281 : // ref-count of the central-cache by copying to a local scoped pointer, and
282 : // keep that reference alive until all the TLS caches are clear. This is done by keeping a
283 : // separate vector of shared_ptrs which will be destructed once all threads have completed.
284 :
285 : // This can happen from any thread. We post() back to the main thread which will initiate the
286 : // cache flush operation.
287 1443 : if (!shutting_down_ && main_thread_dispatcher_) {
288 : // Clear scopes in a batch. It's possible that many different scopes will be deleted at
289 : // the same time, before the main thread gets a chance to run cleanScopesFromCaches. If a new
290 : // scope is deleted before that post runs, we add it to our list of scopes to clear, and there
291 : // is no need to issue another post. This greatly reduces the overhead when there are tens of
292 : // thousands of scopes to clear in a short period. i.e.: VHDS updates with tens of thousands of
293 : // VirtualHosts.
294 32 : bool need_post = scopes_to_cleanup_.empty();
295 32 : scopes_to_cleanup_.push_back(scope->scope_id_);
296 32 : central_cache_entries_to_cleanup_.push_back(scope->centralCacheLockHeld());
297 32 : lock.release();
298 :
299 32 : if (need_post) {
300 10 : main_thread_dispatcher_->post([this]() {
301 10 : sync_.syncPoint(MainDispatcherCleanupSync);
302 10 : clearScopesFromCaches();
303 10 : });
304 10 : }
305 32 : }
306 1443 : }
307 :
308 1543 : void ThreadLocalStoreImpl::releaseHistogramCrossThread(uint64_t histogram_id) {
309 : // This can happen from any thread. We post() back to the main thread which will initiate the
310 : // cache flush operation.
311 1543 : if (!shutting_down_ && main_thread_dispatcher_) {
312 : // It's possible that many different histograms will be deleted at the same
313 : // time, before the main thread gets a chance to run
314 : // clearHistogramsFromCaches. If a new histogram is deleted before that
315 : // post runs, we add it to our list of histograms to clear, and there's no
316 : // need to issue another post.
317 4 : bool need_post = false;
318 4 : {
319 4 : Thread::LockGuard lock(hist_mutex_);
320 4 : need_post = histograms_to_cleanup_.empty();
321 4 : histograms_to_cleanup_.push_back(histogram_id);
322 4 : }
323 4 : if (need_post) {
324 4 : main_thread_dispatcher_->post([this]() { clearHistogramsFromCaches(); });
325 4 : }
326 4 : }
327 1543 : }
328 :
329 : ThreadLocalStoreImpl::TlsCacheEntry&
330 43231 : ThreadLocalStoreImpl::TlsCache::insertScope(uint64_t scope_id) {
331 43231 : return scope_cache_[scope_id];
332 43231 : }
333 :
334 20 : void ThreadLocalStoreImpl::TlsCache::eraseScopes(const std::vector<uint64_t>& scope_ids) {
335 64 : for (uint64_t scope_id : scope_ids) {
336 64 : scope_cache_.erase(scope_id);
337 64 : }
338 20 : }
339 :
340 8 : void ThreadLocalStoreImpl::TlsCache::eraseHistograms(const std::vector<uint64_t>& histograms) {
341 : // This is called for every histogram in every thread, even though the
342 : // histogram may not have been cached in each thread yet. So we don't
343 : // want to check whether the erase() call erased anything.
344 8 : for (uint64_t histogram_id : histograms) {
345 8 : tls_histogram_cache_.erase(histogram_id);
346 8 : }
347 8 : }
348 :
349 10 : void ThreadLocalStoreImpl::clearScopesFromCaches() {
350 : // If we are shutting down we no longer perform cache flushes as workers may be shutting down
351 : // at the same time.
352 10 : if (!shutting_down_) {
353 : // Perform a cache flush on all threads.
354 :
355 : // Capture all the pending scope ids in a local, clearing the list held in
356 : // this. Once this occurs, if a new scope is deleted, a new post will be
357 : // required.
358 10 : auto scope_ids = std::make_shared<std::vector<uint64_t>>();
359 : // Capture all the central cache entries for scopes we're deleting. These will be freed after
360 : // all threads have completed.
361 10 : auto central_caches = std::make_shared<std::vector<CentralCacheEntrySharedPtr>>();
362 10 : {
363 10 : Thread::LockGuard lock(lock_);
364 10 : *scope_ids = std::move(scopes_to_cleanup_);
365 10 : scopes_to_cleanup_.clear();
366 10 : *central_caches = std::move(central_cache_entries_to_cleanup_);
367 10 : central_cache_entries_to_cleanup_.clear();
368 10 : }
369 :
370 10 : tls_cache_->runOnAllThreads(
371 20 : [scope_ids](OptRef<TlsCache> tls_cache) { tls_cache->eraseScopes(*scope_ids); },
372 10 : [central_caches]() { /* Holds onto central_caches until all tls caches are clear */ });
373 10 : }
374 10 : }
375 :
376 4 : void ThreadLocalStoreImpl::clearHistogramsFromCaches() {
377 : // If we are shutting down we no longer perform cache flushes as workers may be shutting down
378 : // at the same time.
379 4 : if (!shutting_down_) {
380 : // Move the histograms pending cleanup into a local variable. Future histogram deletions will be
381 : // batched until the next time this function is called.
382 4 : auto histograms = std::make_shared<std::vector<uint64_t>>();
383 4 : {
384 4 : Thread::LockGuard lock(hist_mutex_);
385 4 : histograms->swap(histograms_to_cleanup_);
386 4 : }
387 :
388 4 : tls_cache_->runOnAllThreads(
389 8 : [histograms](OptRef<TlsCache> tls_cache) { tls_cache->eraseHistograms(*histograms); });
390 4 : }
391 4 : }
392 :
393 : ThreadLocalStoreImpl::ScopeImpl::ScopeImpl(ThreadLocalStoreImpl& parent, StatName prefix)
394 : : scope_id_(parent.next_scope_id_++), parent_(parent),
395 : prefix_(prefix, parent.alloc_.symbolTable()),
396 1443 : central_cache_(new CentralCacheEntry(parent.alloc_.symbolTable())) {}
397 :
398 1443 : ThreadLocalStoreImpl::ScopeImpl::~ScopeImpl() {
399 : // Helps reproduce a previous race condition by pausing here in tests while we
400 : // loop over scopes. 'this' will not have been removed from the scopes_ table
401 : // yet, so we need to be careful.
402 1443 : parent_.sync_.syncPoint(DeleteScopeSync);
403 :
404 : // Note that scope iteration is thread-safe due to the lock held in
405 : // releaseScopeCrossThread. For more details see the comment in
406 : // `ThreadLocalStoreImpl::iterHelper`, and the lock it takes prior to the loop.
407 1443 : parent_.releaseScopeCrossThread(this);
408 1443 : prefix_.free(symbolTable());
409 1443 : }
410 :
411 : // Helper for managing the potential truncation of tags from the metric names and
412 : // converting them to StatName. Making the tag extraction optional within this class simplifies the
413 : // RAII ergonomics (as opposed to making the construction of this object conditional).
414 : //
415 : // The StatNameTagVector returned by this object will be valid as long as this object is in scope
416 : // and the provided stat_name_tags are valid.
417 : //
418 : // When tag extraction is not done, this class is just a passthrough for the provided name/tags.
419 : class StatNameTagHelper {
420 : public:
421 : StatNameTagHelper(ThreadLocalStoreImpl& tls, StatName name,
422 : const absl::optional<StatNameTagVector>& stat_name_tags)
423 52335 : : pool_(tls.symbolTable()), stat_name_tags_(stat_name_tags.value_or(StatNameTagVector())) {
424 52335 : if (!stat_name_tags) {
425 52335 : TagVector tags;
426 52335 : tag_extracted_name_ =
427 52335 : pool_.add(tls.tagProducer().produceTags(tls.symbolTable().toString(name), tags));
428 52335 : StatName empty;
429 52335 : for (const auto& tag : tags) {
430 45705 : StatName tag_name = tls.wellKnownTags().getBuiltin(tag.name_, empty);
431 45705 : if (tag_name.empty()) {
432 18465 : tag_name = pool_.add(tag.name_);
433 18465 : }
434 45705 : stat_name_tags_.emplace_back(tag_name, pool_.add(tag.value_));
435 45705 : }
436 52335 : } else {
437 0 : tag_extracted_name_ = name;
438 0 : }
439 52335 : }
440 :
441 52219 : const StatNameTagVector& statNameTags() const { return stat_name_tags_; }
442 52219 : StatName tagExtractedName() const { return tag_extracted_name_; }
443 :
444 : private:
445 : StatNamePool pool_;
446 : StatNameTagVector stat_name_tags_;
447 : StatName tag_extracted_name_;
448 : };
449 :
450 : bool ThreadLocalStoreImpl::checkAndRememberRejection(StatName name,
451 : StatsMatcher::FastResult fast_reject_result,
452 : StatNameStorageSet& central_rejected_stats,
453 51285 : StatNameHashSet* tls_rejected_stats) {
454 51285 : if (stats_matcher_->acceptsAll()) {
455 51285 : return false;
456 51285 : }
457 :
458 0 : auto iter = central_rejected_stats.find(name);
459 0 : const StatNameStorage* rejected_name = nullptr;
460 0 : if (iter != central_rejected_stats.end()) {
461 0 : rejected_name = &(*iter);
462 0 : } else {
463 0 : if (slowRejects(fast_reject_result, name)) {
464 0 : auto insertion = central_rejected_stats.insert(StatNameStorage(name, symbolTable()));
465 0 : const StatNameStorage& rejected_name_ref = *(insertion.first);
466 0 : rejected_name = &rejected_name_ref;
467 0 : }
468 0 : }
469 0 : if (rejected_name != nullptr) {
470 0 : if (tls_rejected_stats != nullptr) {
471 0 : tls_rejected_stats->insert(rejected_name->statName());
472 0 : }
473 0 : return true;
474 0 : }
475 0 : return false;
476 0 : }
477 :
478 : template <class StatType>
479 : StatType& ThreadLocalStoreImpl::ScopeImpl::safeMakeStat(
480 : StatName full_stat_name, StatName name_no_tags,
481 : const absl::optional<StatNameTagVector>& stat_name_tags,
482 : StatNameHashMap<RefcountPtr<StatType>>& central_cache_map,
483 : StatsMatcher::FastResult fast_reject_result, StatNameStorageSet& central_rejected_stats,
484 : MakeStatFn<StatType> make_stat, StatRefMap<StatType>* tls_cache,
485 51406 : StatNameHashSet* tls_rejected_stats, StatType& null_stat) {
486 :
487 51406 : if (tls_rejected_stats != nullptr &&
488 51406 : tls_rejected_stats->find(full_stat_name) != tls_rejected_stats->end()) {
489 0 : return null_stat;
490 0 : }
491 :
492 : // If we have a valid cache entry, return it.
493 51406 : if (tls_cache) {
494 41802 : auto pos = tls_cache->find(full_stat_name);
495 41802 : if (pos != tls_cache->end()) {
496 1746 : return pos->second;
497 1746 : }
498 41802 : }
499 :
500 : // We must now look in the central store so we must be locked. We grab a reference to the
501 : // central store location. It might contain nothing. In this case, we allocate a new stat.
502 49660 : Thread::LockGuard lock(parent_.lock_);
503 49660 : auto iter = central_cache_map.find(full_stat_name);
504 49660 : RefcountPtr<StatType>* central_ref = nullptr;
505 49660 : if (iter != central_cache_map.end()) {
506 34 : central_ref = &(iter->second);
507 49626 : } else if (parent_.checkAndRememberRejection(full_stat_name, fast_reject_result,
508 49626 : central_rejected_stats, tls_rejected_stats)) {
509 0 : return null_stat;
510 49626 : } else {
511 49626 : StatNameTagHelper tag_helper(parent_, name_no_tags, stat_name_tags);
512 :
513 49626 : RefcountPtr<StatType> stat = make_stat(
514 49626 : parent_.alloc_, full_stat_name, tag_helper.tagExtractedName(), tag_helper.statNameTags());
515 49626 : ASSERT(stat != nullptr);
516 49626 : central_ref = ¢ral_cache_map[stat->statName()];
517 49626 : *central_ref = stat;
518 49626 : }
519 :
520 : // If we have a TLS cache, insert the stat.
521 49660 : StatType& ret = **central_ref;
522 49660 : if (tls_cache) {
523 40056 : tls_cache->insert(std::make_pair(ret.statName(), std::reference_wrapper<StatType>(ret)));
524 40056 : }
525 :
526 : // Finally we return the reference.
527 49660 : return ret;
528 49660 : }
529 :
530 : Counter& ThreadLocalStoreImpl::ScopeImpl::counterFromStatNameWithTags(
531 41050 : const StatName& name, StatNameTagVectorOptConstRef stat_name_tags) {
532 41050 : if (parent_.rejectsAll()) {
533 0 : return parent_.null_counter_;
534 0 : }
535 :
536 : // Determine the final name based on the prefix and the passed name.
537 41050 : TagUtility::TagStatNameJoiner joiner(prefix_.statName(), name, stat_name_tags, symbolTable());
538 41050 : Stats::StatName final_stat_name = joiner.nameWithTags();
539 :
540 41050 : StatsMatcher::FastResult fast_reject_result = parent_.fastRejects(final_stat_name);
541 41050 : if (fast_reject_result == StatsMatcher::FastResult::Rejects) {
542 0 : return parent_.null_counter_;
543 0 : }
544 :
545 : // We now find the TLS cache. This might remain null if we don't have TLS
546 : // initialized currently.
547 41050 : StatRefMap<Counter>* tls_cache = nullptr;
548 41050 : StatNameHashSet* tls_rejected_stats = nullptr;
549 41050 : if (!parent_.shutting_down_ && parent_.tls_cache_) {
550 34386 : TlsCacheEntry& entry = parent_.tlsCache().insertScope(this->scope_id_);
551 34386 : tls_cache = &entry.counters_;
552 34386 : tls_rejected_stats = &entry.rejected_stats_;
553 34386 : }
554 :
555 41050 : const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
556 41050 : return safeMakeStat<Counter>(
557 41050 : final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->counters_,
558 41050 : fast_reject_result, central_cache->rejected_stats_,
559 41050 : [](Allocator& allocator, StatName name, StatName tag_extracted_name,
560 41050 : const StatNameTagVector& tags) -> CounterSharedPtr {
561 39298 : return allocator.makeCounter(name, tag_extracted_name, tags);
562 39298 : },
563 41050 : tls_cache, tls_rejected_stats, parent_.null_counter_);
564 41050 : }
565 :
566 4025 : void ThreadLocalStoreImpl::deliverHistogramToSinks(const Histogram& histogram, uint64_t value) {
567 : // Thread local deliveries must be blocked outright for histograms and timers during shutdown.
568 : // This is because the sinks may end up trying to create new connections via the thread local
569 : // cluster manager which may already be destroyed (there is no way to sequence this because the
570 : // cluster manager destroying can create deliveries). We special case this explicitly to avoid
571 : // having to implement a shutdown() method (or similar) on every TLS object.
572 4025 : if (shutting_down_) {
573 0 : return;
574 0 : }
575 :
576 4025 : for (Sink& sink : timer_sinks_) {
577 0 : sink.onHistogramComplete(histogram, value);
578 0 : }
579 4025 : }
580 :
581 : Gauge& ThreadLocalStoreImpl::ScopeImpl::gaugeFromStatNameWithTags(
582 : const StatName& name, StatNameTagVectorOptConstRef stat_name_tags,
583 10124 : Gauge::ImportMode import_mode) {
584 : // If a gauge is "hidden" it should not be rejected as these are used for deferred stats.
585 10124 : if (parent_.rejectsAll() && import_mode != Gauge::ImportMode::HiddenAccumulate) {
586 0 : return parent_.null_gauge_;
587 0 : }
588 :
589 : // See comments in counter(). There is no super clean way (via templates or otherwise) to
590 : // share this code so I'm leaving it largely duplicated for now.
591 10124 : TagUtility::TagStatNameJoiner joiner(prefix_.statName(), name, stat_name_tags, symbolTable());
592 10124 : StatName final_stat_name = joiner.nameWithTags();
593 :
594 10124 : StatsMatcher::FastResult fast_reject_result;
595 10124 : if (import_mode != Gauge::ImportMode::HiddenAccumulate) {
596 10124 : fast_reject_result = parent_.fastRejects(final_stat_name);
597 10124 : } else {
598 0 : fast_reject_result = StatsMatcher::FastResult::Matches;
599 0 : }
600 10124 : if (fast_reject_result == StatsMatcher::FastResult::Rejects) {
601 0 : return parent_.null_gauge_;
602 0 : }
603 :
604 10124 : StatRefMap<Gauge>* tls_cache = nullptr;
605 10124 : StatNameHashSet* tls_rejected_stats = nullptr;
606 10124 : if (!parent_.shutting_down_ && parent_.tls_cache_) {
607 7184 : TlsCacheEntry& entry = parent_.tlsCache().insertScope(this->scope_id_);
608 7184 : tls_cache = &entry.gauges_;
609 7184 : tls_rejected_stats = &entry.rejected_stats_;
610 7184 : }
611 :
612 10124 : const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
613 10124 : Gauge& gauge = safeMakeStat<Gauge>(
614 10124 : final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->gauges_,
615 10124 : fast_reject_result, central_cache->rejected_stats_,
616 10124 : [import_mode](Allocator& allocator, StatName name, StatName tag_extracted_name,
617 10124 : const StatNameTagVector& tags) -> GaugeSharedPtr {
618 10096 : return allocator.makeGauge(name, tag_extracted_name, tags, import_mode);
619 10096 : },
620 10124 : tls_cache, tls_rejected_stats, parent_.null_gauge_);
621 10124 : gauge.mergeImportMode(import_mode);
622 10124 : return gauge;
623 10124 : }
624 :
625 : Histogram& ThreadLocalStoreImpl::ScopeImpl::histogramFromStatNameWithTags(
626 1821 : const StatName& name, StatNameTagVectorOptConstRef stat_name_tags, Histogram::Unit unit) {
627 : // See safety analysis comment in counterFromStatNameWithTags above.
628 :
629 1821 : if (parent_.rejectsAll()) {
630 0 : return parent_.null_histogram_;
631 0 : }
632 :
633 : // See comments in counter(). There is no super clean way (via templates or otherwise) to
634 : // share this code so I'm leaving it largely duplicated for now.
635 1821 : TagUtility::TagStatNameJoiner joiner(prefix_.statName(), name, stat_name_tags, symbolTable());
636 1821 : StatName final_stat_name = joiner.nameWithTags();
637 :
638 1821 : StatsMatcher::FastResult fast_reject_result = parent_.fastRejects(final_stat_name);
639 1821 : if (fast_reject_result == StatsMatcher::FastResult::Rejects) {
640 0 : return parent_.null_histogram_;
641 0 : }
642 :
643 1821 : StatNameHashMap<ParentHistogramSharedPtr>* tls_cache = nullptr;
644 1821 : StatNameHashSet* tls_rejected_stats = nullptr;
645 1821 : if (!parent_.shutting_down_ && parent_.tls_cache_) {
646 1429 : TlsCacheEntry& entry = parent_.tlsCache().insertScope(this->scope_id_);
647 1429 : tls_cache = &entry.parent_histograms_;
648 1429 : auto iter = tls_cache->find(final_stat_name);
649 1429 : if (iter != tls_cache->end()) {
650 162 : return *iter->second;
651 162 : }
652 1267 : tls_rejected_stats = &entry.rejected_stats_;
653 1267 : if (tls_rejected_stats->find(final_stat_name) != tls_rejected_stats->end()) {
654 0 : return parent_.null_histogram_;
655 0 : }
656 1267 : }
657 :
658 1659 : Thread::LockGuard lock(parent_.lock_);
659 1659 : const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
660 1659 : auto iter = central_cache->histograms_.find(final_stat_name);
661 1659 : ParentHistogramImplSharedPtr* central_ref = nullptr;
662 1659 : if (iter != central_cache->histograms_.end()) {
663 0 : central_ref = &iter->second;
664 1659 : } else if (parent_.checkAndRememberRejection(final_stat_name, fast_reject_result,
665 1659 : central_cache->rejected_stats_,
666 1659 : tls_rejected_stats)) {
667 0 : return parent_.null_histogram_;
668 1659 : } else {
669 1659 : StatNameTagHelper tag_helper(parent_, joiner.tagExtractedName(), stat_name_tags);
670 :
671 1659 : ConstSupportedBuckets* buckets = nullptr;
672 1659 : buckets = &parent_.histogram_settings_->buckets(symbolTable().toString(final_stat_name));
673 :
674 1659 : RefcountPtr<ParentHistogramImpl> stat;
675 1659 : {
676 1659 : Thread::LockGuard lock(parent_.hist_mutex_);
677 1659 : auto iter = parent_.histogram_set_.find(final_stat_name);
678 1659 : if (iter != parent_.histogram_set_.end()) {
679 116 : stat = RefcountPtr<ParentHistogramImpl>(*iter);
680 1543 : } else {
681 1543 : stat = new ParentHistogramImpl(final_stat_name, unit, parent_,
682 1543 : tag_helper.tagExtractedName(), tag_helper.statNameTags(),
683 1543 : *buckets, parent_.next_histogram_id_++);
684 1543 : if (!parent_.shutting_down_) {
685 1543 : parent_.histogram_set_.insert(stat.get());
686 1543 : if (parent_.sink_predicates_.has_value() &&
687 1543 : parent_.sink_predicates_->includeHistogram(*stat)) {
688 0 : parent_.sinked_histograms_.insert(stat.get());
689 0 : }
690 1543 : }
691 1543 : }
692 1659 : }
693 :
694 1659 : central_ref = ¢ral_cache->histograms_[stat->statName()];
695 1659 : *central_ref = stat;
696 1659 : }
697 :
698 1659 : if (tls_cache != nullptr) {
699 1267 : tls_cache->insert(std::make_pair((*central_ref)->statName(), *central_ref));
700 1267 : }
701 1659 : return **central_ref;
702 1659 : }
703 :
704 : TextReadout& ThreadLocalStoreImpl::ScopeImpl::textReadoutFromStatNameWithTags(
705 232 : const StatName& name, StatNameTagVectorOptConstRef stat_name_tags) {
706 232 : if (parent_.rejectsAll()) {
707 0 : return parent_.null_text_readout_;
708 0 : }
709 :
710 : // Determine the final name based on the prefix and the passed name.
711 232 : TagUtility::TagStatNameJoiner joiner(prefix_.statName(), name, stat_name_tags, symbolTable());
712 232 : Stats::StatName final_stat_name = joiner.nameWithTags();
713 :
714 232 : StatsMatcher::FastResult fast_reject_result = parent_.fastRejects(final_stat_name);
715 232 : if (fast_reject_result == StatsMatcher::FastResult::Rejects) {
716 0 : return parent_.null_text_readout_;
717 0 : }
718 :
719 : // We now find the TLS cache. This might remain null if we don't have TLS
720 : // initialized currently.
721 232 : StatRefMap<TextReadout>* tls_cache = nullptr;
722 232 : StatNameHashSet* tls_rejected_stats = nullptr;
723 232 : if (!parent_.shutting_down_ && parent_.tls_cache_) {
724 232 : TlsCacheEntry& entry = parent_.tlsCache().insertScope(this->scope_id_);
725 232 : tls_cache = &entry.text_readouts_;
726 232 : tls_rejected_stats = &entry.rejected_stats_;
727 232 : }
728 :
729 232 : const CentralCacheEntrySharedPtr& central_cache = centralCacheNoThreadAnalysis();
730 232 : return safeMakeStat<TextReadout>(
731 232 : final_stat_name, joiner.tagExtractedName(), stat_name_tags, central_cache->text_readouts_,
732 232 : fast_reject_result, central_cache->rejected_stats_,
733 232 : [](Allocator& allocator, StatName name, StatName tag_extracted_name,
734 232 : const StatNameTagVector& tags) -> TextReadoutSharedPtr {
735 232 : return allocator.makeTextReadout(name, tag_extracted_name, tags);
736 232 : },
737 232 : tls_cache, tls_rejected_stats, parent_.null_text_readout_);
738 232 : }
739 :
740 0 : CounterOptConstRef ThreadLocalStoreImpl::ScopeImpl::findCounter(StatName name) const {
741 0 : Thread::LockGuard lock(parent_.lock_);
742 0 : return findStatLockHeld<Counter>(name, central_cache_->counters_);
743 0 : }
744 :
745 0 : GaugeOptConstRef ThreadLocalStoreImpl::ScopeImpl::findGauge(StatName name) const {
746 0 : Thread::LockGuard lock(parent_.lock_);
747 0 : return findStatLockHeld<Gauge>(name, central_cache_->gauges_);
748 0 : }
749 :
750 0 : HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogram(StatName name) const {
751 0 : Thread::LockGuard lock(parent_.lock_);
752 0 : return findHistogramLockHeld(name);
753 0 : }
754 :
755 : HistogramOptConstRef ThreadLocalStoreImpl::ScopeImpl::findHistogramLockHeld(StatName name) const
756 0 : ABSL_EXCLUSIVE_LOCKS_REQUIRED(parent_.lock_) {
757 0 : auto iter = central_cache_->histograms_.find(name);
758 0 : if (iter == central_cache_->histograms_.end()) {
759 0 : return absl::nullopt;
760 0 : }
761 :
762 0 : RefcountPtr<Histogram> histogram_ref(iter->second);
763 0 : return std::cref(*histogram_ref);
764 0 : }
765 :
766 0 : TextReadoutOptConstRef ThreadLocalStoreImpl::ScopeImpl::findTextReadout(StatName name) const {
767 0 : Thread::LockGuard lock(parent_.lock_);
768 0 : return findStatLockHeld<TextReadout>(name, central_cache_->text_readouts_);
769 0 : }
770 :
771 4025 : Histogram& ThreadLocalStoreImpl::tlsHistogram(ParentHistogramImpl& parent, uint64_t id) {
772 : // tlsHistogram() is generally not called for a histogram that is rejected by
773 : // the matcher, so no further rejection-checking is needed at this level.
774 : // TlsHistogram inherits its reject/accept status from ParentHistogram.
775 :
776 : // See comments in counterFromStatName() which explains the logic here.
777 :
778 4025 : TlsHistogramSharedPtr* tls_histogram = nullptr;
779 4025 : if (!shutting_down_ && tls_cache_) {
780 4025 : tls_histogram = &(tlsCache().tls_histogram_cache_[id]);
781 4025 : if (*tls_histogram != nullptr) {
782 2975 : return **tls_histogram;
783 2975 : }
784 4025 : }
785 :
786 1050 : StatNameTagHelper tag_helper(*this, parent.statName(), absl::nullopt);
787 :
788 1050 : TlsHistogramSharedPtr hist_tls_ptr(
789 1050 : new ThreadLocalHistogramImpl(parent.statName(), parent.unit(), tag_helper.tagExtractedName(),
790 1050 : tag_helper.statNameTags(), symbolTable()));
791 :
792 1050 : parent.addTlsHistogram(hist_tls_ptr);
793 :
794 1050 : if (tls_histogram != nullptr) {
795 1050 : *tls_histogram = hist_tls_ptr;
796 1050 : }
797 :
798 1050 : return *hist_tls_ptr;
799 4025 : }
800 :
801 : ThreadLocalHistogramImpl::ThreadLocalHistogramImpl(StatName name, Histogram::Unit unit,
802 : StatName tag_extracted_name,
803 : const StatNameTagVector& stat_name_tags,
804 : SymbolTable& symbol_table)
805 : : HistogramImplHelper(name, tag_extracted_name, stat_name_tags, symbol_table), unit_(unit),
806 1050 : used_(false), created_thread_id_(std::this_thread::get_id()), symbol_table_(symbol_table) {
807 1050 : histograms_[0] = hist_alloc();
808 1050 : histograms_[1] = hist_alloc();
809 1050 : }
810 :
811 1050 : ThreadLocalHistogramImpl::~ThreadLocalHistogramImpl() {
812 1050 : MetricImpl::clear(symbol_table_);
813 1050 : hist_free(histograms_[0]);
814 1050 : hist_free(histograms_[1]);
815 1050 : }
816 :
817 4025 : void ThreadLocalHistogramImpl::recordValue(uint64_t value) {
818 4025 : ASSERT(std::this_thread::get_id() == created_thread_id_);
819 4025 : hist_insert_intscale(histograms_[current_active_], value, 0, 1);
820 4025 : used_ = true;
821 4025 : }
822 :
823 0 : void ThreadLocalHistogramImpl::merge(histogram_t* target) {
824 0 : histogram_t** other_histogram = &histograms_[otherHistogramIndex()];
825 0 : hist_accumulate(target, other_histogram, 1);
826 0 : hist_clear(*other_histogram);
827 0 : }
828 :
829 : ParentHistogramImpl::ParentHistogramImpl(StatName name, Histogram::Unit unit,
830 : ThreadLocalStoreImpl& thread_local_store,
831 : StatName tag_extracted_name,
832 : const StatNameTagVector& stat_name_tags,
833 : ConstSupportedBuckets& supported_buckets, uint64_t id)
834 : : MetricImpl(name, tag_extracted_name, stat_name_tags, thread_local_store.symbolTable()),
835 : unit_(unit), thread_local_store_(thread_local_store), interval_histogram_(hist_alloc()),
836 : cumulative_histogram_(hist_alloc()),
837 : interval_statistics_(interval_histogram_, unit, supported_buckets),
838 1543 : cumulative_statistics_(cumulative_histogram_, unit, supported_buckets), id_(id) {}
839 :
840 1543 : ParentHistogramImpl::~ParentHistogramImpl() {
841 1543 : thread_local_store_.releaseHistogramCrossThread(id_);
842 1543 : ASSERT(ref_count_ == 0);
843 1543 : MetricImpl::clear(thread_local_store_.symbolTable());
844 1543 : hist_free(interval_histogram_);
845 1543 : hist_free(cumulative_histogram_);
846 1543 : }
847 :
848 5852 : void ParentHistogramImpl::incRefCount() { ++ref_count_; }
849 :
850 5852 : bool ParentHistogramImpl::decRefCount() {
851 5852 : bool ret;
852 5852 : if (shutting_down_) {
853 : // When shutting down, we cannot reference thread_local_store_, as
854 : // histograms can outlive the store. So we decrement the ref-count without
855 : // the stores' lock. We will not be removing the object from the store's
856 : // histogram map in this scenario, as the set was cleared during shutdown,
857 : // and will not be repopulated in histogramFromStatNameWithTags after
858 : // initiating shutdown.
859 2878 : ret = --ref_count_ == 0;
860 2974 : } else {
861 : // We delegate to the Store object to decrement the ref-count so it can hold
862 : // the lock to the map. If we don't hold a lock, another thread may
863 : // simultaneously try to allocate the same name'd histogram after we
864 : // decrement it, and we'll wind up with a dtor/update race. To avoid this we
865 : // must hold the lock until the histogram is removed from the map.
866 : //
867 : // See also StatsSharedImpl::decRefCount() in allocator_impl.cc, which has
868 : // the same issue.
869 2974 : ret = thread_local_store_.decHistogramRefCount(*this, ref_count_);
870 2974 : }
871 5852 : return ret;
872 5852 : }
873 :
874 : bool ThreadLocalStoreImpl::decHistogramRefCount(ParentHistogramImpl& hist,
875 2974 : std::atomic<uint32_t>& ref_count) {
876 : // We must hold the store's histogram lock when decrementing the
877 : // refcount. Otherwise another thread may simultaneously try to allocate the
878 : // same name'd stat after we decrement it, and we'll wind up with a
879 : // dtor/update race. To avoid this we must hold the lock until the stat is
880 : // removed from the map.
881 2974 : Thread::LockGuard lock(hist_mutex_);
882 2974 : ASSERT(ref_count >= 1);
883 2974 : if (--ref_count == 0) {
884 4 : if (!shutting_down_) {
885 4 : const size_t count = histogram_set_.erase(hist.statName());
886 4 : ASSERT(shutting_down_ || count == 1);
887 4 : sinked_histograms_.erase(&hist);
888 4 : }
889 4 : return true;
890 4 : }
891 2970 : return false;
892 2974 : }
893 :
894 1256 : SymbolTable& ParentHistogramImpl::symbolTable() { return thread_local_store_.symbolTable(); }
895 :
896 6568 : Histogram::Unit ParentHistogramImpl::unit() const { return unit_; }
897 :
898 4025 : void ParentHistogramImpl::recordValue(uint64_t value) {
899 4025 : Histogram& tls_histogram = thread_local_store_.tlsHistogram(*this, id_);
900 4025 : tls_histogram.recordValue(value);
901 4025 : thread_local_store_.deliverHistogramToSinks(*this, value);
902 4025 : }
903 :
904 0 : bool ParentHistogramImpl::used() const {
905 : // Consider ParentHistogram used only if has ever been merged.
906 0 : return merged_;
907 0 : }
908 :
909 0 : bool ParentHistogramImpl::hidden() const { return false; }
910 :
911 0 : void ParentHistogramImpl::merge() {
912 0 : Thread::ReleasableLockGuard lock(merge_lock_);
913 0 : if (merged_ || usedLockHeld()) {
914 0 : hist_clear(interval_histogram_);
915 : // Here we could copy all the pointers to TLS histograms in the tls_histogram_ list,
916 : // then release the lock before we do the actual merge. However it is not a big deal
917 : // because the tls_histogram merge is not that expensive as it is a single histogram
918 : // merge and adding TLS histograms is rare.
919 0 : for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) {
920 0 : tls_histogram->merge(interval_histogram_);
921 0 : }
922 : // Since TLS merge is done, we can release the lock here.
923 0 : lock.release();
924 0 : hist_accumulate(cumulative_histogram_, &interval_histogram_, 1);
925 0 : cumulative_statistics_.refresh(cumulative_histogram_);
926 0 : interval_statistics_.refresh(interval_histogram_);
927 0 : merged_ = true;
928 0 : }
929 0 : }
930 :
931 0 : std::string ParentHistogramImpl::quantileSummary() const {
932 0 : if (used()) {
933 0 : std::vector<std::string> summary;
934 0 : const std::vector<double>& supported_quantiles_ref = interval_statistics_.supportedQuantiles();
935 0 : summary.reserve(supported_quantiles_ref.size());
936 0 : for (size_t i = 0; i < supported_quantiles_ref.size(); ++i) {
937 0 : summary.push_back(fmt::format("P{:g}({},{})", 100 * supported_quantiles_ref[i],
938 0 : interval_statistics_.computedQuantiles()[i],
939 0 : cumulative_statistics_.computedQuantiles()[i]));
940 0 : }
941 0 : return absl::StrJoin(summary, " ");
942 0 : } else {
943 0 : return {"No recorded values"};
944 0 : }
945 0 : }
946 :
947 0 : std::string ParentHistogramImpl::bucketSummary() const {
948 0 : if (used()) {
949 0 : std::vector<std::string> bucket_summary;
950 0 : ConstSupportedBuckets& supported_buckets = interval_statistics_.supportedBuckets();
951 0 : bucket_summary.reserve(supported_buckets.size());
952 0 : for (size_t i = 0; i < supported_buckets.size(); ++i) {
953 0 : bucket_summary.push_back(fmt::format("B{:g}({},{})", supported_buckets[i],
954 0 : interval_statistics_.computedBuckets()[i],
955 0 : cumulative_statistics_.computedBuckets()[i]));
956 0 : }
957 0 : return absl::StrJoin(bucket_summary, " ");
958 0 : } else {
959 0 : return {"No recorded values"};
960 0 : }
961 0 : }
962 :
963 : std::vector<Stats::ParentHistogram::Bucket>
964 0 : ParentHistogramImpl::detailedlBucketsHelper(const histogram_t& histogram) {
965 0 : const uint32_t num_buckets = hist_num_buckets(&histogram);
966 0 : std::vector<Stats::ParentHistogram::Bucket> buckets(num_buckets);
967 0 : hist_bucket_t hist_bucket;
968 0 : for (uint32_t i = 0; i < num_buckets; ++i) {
969 0 : ParentHistogram::Bucket& bucket = buckets[i];
970 0 : hist_bucket_idx_bucket(&histogram, i, &hist_bucket, &bucket.count_);
971 0 : bucket.lower_bound_ = hist_bucket_to_double(hist_bucket);
972 0 : bucket.width_ = hist_bucket_to_double_bin_width(hist_bucket);
973 0 : }
974 0 : return buckets;
975 0 : }
976 :
977 1050 : void ParentHistogramImpl::addTlsHistogram(const TlsHistogramSharedPtr& hist_ptr) {
978 1050 : Thread::LockGuard lock(merge_lock_);
979 1050 : tls_histograms_.emplace_back(hist_ptr);
980 1050 : }
981 :
982 0 : bool ParentHistogramImpl::usedLockHeld() const {
983 0 : for (const TlsHistogramSharedPtr& tls_histogram : tls_histograms_) {
984 0 : if (tls_histogram->used()) {
985 0 : return true;
986 0 : }
987 0 : }
988 0 : return false;
989 0 : }
990 :
991 2154 : void ThreadLocalStoreImpl::forEachCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
992 2154 : alloc_.forEachCounter(f_size, f_stat);
993 2154 : }
994 :
995 1072 : void ThreadLocalStoreImpl::forEachGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
996 1072 : alloc_.forEachGauge(f_size, f_stat);
997 1072 : }
998 :
999 0 : void ThreadLocalStoreImpl::forEachTextReadout(SizeFn f_size, StatFn<TextReadout> f_stat) const {
1000 0 : alloc_.forEachTextReadout(f_size, f_stat);
1001 0 : }
1002 :
1003 196 : void ThreadLocalStoreImpl::forEachHistogram(SizeFn f_size, StatFn<ParentHistogram> f_stat) const {
1004 196 : Thread::LockGuard lock(hist_mutex_);
1005 196 : if (f_size != nullptr) {
1006 98 : f_size(histogram_set_.size());
1007 98 : }
1008 1256 : for (ParentHistogramImpl* histogram : histogram_set_) {
1009 1256 : f_stat(*histogram);
1010 1256 : }
1011 196 : }
1012 :
1013 : void ThreadLocalStoreImpl::forEachScope(std::function<void(std::size_t)> f_size,
1014 0 : StatFn<const Scope> f_scope) const {
1015 0 : std::vector<ScopeSharedPtr> scopes;
1016 0 : iterateScopes([&scopes](const ScopeImplSharedPtr& scope) -> bool {
1017 0 : scopes.push_back(scope);
1018 0 : return true;
1019 0 : });
1020 :
1021 0 : if (f_size != nullptr) {
1022 0 : f_size(scopes.size());
1023 0 : }
1024 0 : for (const ScopeSharedPtr& scope : scopes) {
1025 0 : f_scope(*scope);
1026 0 : }
1027 0 : }
1028 :
1029 : bool ThreadLocalStoreImpl::iterateScopesLockHeld(
1030 : const std::function<bool(const ScopeImplSharedPtr&)> fn) const
1031 98 : ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1032 196 : for (auto& iter : scopes_) {
1033 196 : sync_.syncPoint(ThreadLocalStoreImpl::IterateScopeSync);
1034 :
1035 : // We keep the scopes as a map from Scope* to weak_ptr<Scope> so that if,
1036 : // during the iteration, the last reference to a ScopeSharedPtr is dropped,
1037 : // we can test for that here by attempting to lock the weak pointer, and
1038 : // skip those that are nullptr.
1039 196 : const std::weak_ptr<ScopeImpl>& scope = iter.second;
1040 196 : const ScopeImplSharedPtr& locked = scope.lock();
1041 196 : if (locked != nullptr && !fn(locked)) {
1042 0 : return false;
1043 0 : }
1044 196 : }
1045 98 : return true;
1046 98 : }
1047 :
1048 98 : void ThreadLocalStoreImpl::forEachSinkedCounter(SizeFn f_size, StatFn<Counter> f_stat) const {
1049 98 : alloc_.forEachSinkedCounter(f_size, f_stat);
1050 98 : }
1051 :
1052 98 : void ThreadLocalStoreImpl::forEachSinkedGauge(SizeFn f_size, StatFn<Gauge> f_stat) const {
1053 98 : alloc_.forEachSinkedGauge(f_size, f_stat);
1054 98 : }
1055 :
1056 : void ThreadLocalStoreImpl::forEachSinkedTextReadout(SizeFn f_size,
1057 98 : StatFn<TextReadout> f_stat) const {
1058 98 : alloc_.forEachSinkedTextReadout(f_size, f_stat);
1059 98 : }
1060 :
1061 : void ThreadLocalStoreImpl::forEachSinkedHistogram(SizeFn f_size,
1062 98 : StatFn<ParentHistogram> f_stat) const {
1063 98 : if (sink_predicates_.has_value() &&
1064 98 : Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_include_histograms")) {
1065 0 : Thread::LockGuard lock(hist_mutex_);
1066 :
1067 0 : if (f_size != nullptr) {
1068 0 : f_size(sinked_histograms_.size());
1069 0 : }
1070 0 : for (auto histogram : sinked_histograms_) {
1071 0 : f_stat(*histogram);
1072 0 : }
1073 98 : } else {
1074 98 : forEachHistogram(f_size, f_stat);
1075 98 : }
1076 98 : }
1077 :
1078 0 : void ThreadLocalStoreImpl::setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) {
1079 0 : ASSERT(sink_predicates != nullptr);
1080 0 : if (sink_predicates != nullptr) {
1081 0 : sink_predicates_.emplace(*sink_predicates);
1082 0 : alloc_.setSinkPredicates(std::move(sink_predicates));
1083 : // Add histograms to the set of sinked histograms.
1084 0 : Thread::LockGuard lock(hist_mutex_);
1085 0 : sinked_histograms_.clear();
1086 0 : for (auto& histogram : histogram_set_) {
1087 0 : if (sink_predicates_->includeHistogram(*histogram)) {
1088 0 : sinked_histograms_.insert(histogram);
1089 0 : }
1090 0 : }
1091 0 : }
1092 0 : }
1093 :
1094 : void ThreadLocalStoreImpl::extractAndAppendTags(StatName name, StatNamePool& pool,
1095 0 : StatNameTagVector& stat_tags) {
1096 0 : extractAndAppendTags(symbolTable().toString(name), pool, stat_tags);
1097 0 : }
1098 :
1099 : void ThreadLocalStoreImpl::extractAndAppendTags(absl::string_view name, StatNamePool& pool,
1100 0 : StatNameTagVector& stat_tags) {
1101 0 : TagVector tags;
1102 0 : tagProducer().produceTags(name, tags);
1103 0 : for (const auto& tag : tags) {
1104 0 : stat_tags.emplace_back(pool.add(tag.name_), pool.add(tag.value_));
1105 0 : }
1106 0 : }
1107 :
1108 : } // namespace Stats
1109 : } // namespace Envoy
|