LCOV - code coverage report
Current view: top level - source/common/thread_local - thread_local_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 133 133 100.0 %
Date: 2024-01-05 06:35:25 Functions: 27 27 100.0 %

          Line data    Source code
       1             : #include "source/common/thread_local/thread_local_impl.h"
       2             : 
       3             : #include <algorithm>
       4             : #include <atomic>
       5             : #include <cstdint>
       6             : #include <list>
       7             : 
       8             : #include "envoy/event/dispatcher.h"
       9             : 
      10             : #include "source/common/common/assert.h"
      11             : #include "source/common/common/stl_helpers.h"
      12             : 
      13             : namespace Envoy {
      14             : namespace ThreadLocal {
      15             : 
      16             : thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_;
      17             : 
      18         455 : InstanceImpl::~InstanceImpl() {
      19         455 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
      20         455 :   ASSERT(shutdown_);
      21         455 :   thread_local_data_.data_.clear();
      22         455 : }
      23             : 
      24        1281 : SlotPtr InstanceImpl::allocateSlot() {
      25        1281 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
      26        1281 :   ASSERT(!shutdown_);
      27             : 
      28        1281 :   if (free_slot_indexes_.empty()) {
      29        1277 :     SlotPtr slot = std::make_unique<SlotImpl>(*this, uint32_t(slots_.size()));
      30        1277 :     slots_.push_back(slot.get());
      31        1277 :     return slot;
      32        1277 :   }
      33           4 :   const uint32_t idx = free_slot_indexes_.front();
      34           4 :   free_slot_indexes_.pop_front();
      35           4 :   ASSERT(idx < slots_.size());
      36           4 :   SlotPtr slot = std::make_unique<SlotImpl>(*this, idx);
      37           4 :   slots_[idx] = slot.get();
      38           4 :   return slot;
      39        1281 : }
      40             : 
      41             : InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index)
      42        1281 :     : parent_(parent), index_(index), still_alive_guard_(std::make_shared<bool>(true)) {}
      43             : 
      44        1117 : std::function<void()> InstanceImpl::SlotImpl::wrapCallback(const std::function<void()>& cb) {
      45             :   // See the header file comments for still_alive_guard_ for the purpose of this capture and the
      46             :   // expired check below.
      47             :   //
      48             :   // Note also that this logic is duplicated below and dataCallback(), rather
      49             :   // than incurring another lambda redirection.
      50        1117 :   return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb] {
      51         920 :     if (!still_alive_guard.expired()) {
      52         920 :       cb();
      53         920 :     }
      54         920 :   };
      55        1117 : }
      56             : 
      57         882 : bool InstanceImpl::SlotImpl::currentThreadRegisteredWorker(uint32_t index) {
      58         882 :   return thread_local_data_.data_.size() > index;
      59         882 : }
      60             : 
      61         882 : bool InstanceImpl::SlotImpl::currentThreadRegistered() {
      62         882 :   return currentThreadRegisteredWorker(index_);
      63         882 : }
      64             : 
      65       63573 : ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) {
      66       63573 :   ASSERT(currentThreadRegisteredWorker(index));
      67       63573 :   return thread_local_data_.data_[index];
      68       63573 : }
      69             : 
      70       63167 : ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); }
      71             : 
      72         209 : std::function<void()> InstanceImpl::SlotImpl::dataCallback(const UpdateCb& cb) {
      73             :   // See the header file comments for still_alive_guard_ for why we capture index_.
      74         209 :   return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb = std::move(cb),
      75         406 :           index = index_]() mutable {
      76             :     // This duplicates logic in wrapCallback() (above). Using wrapCallback also
      77             :     // works, but incurs another indirection of lambda at runtime. As the
      78             :     // duplicated logic is only an if-statement and a bool function, it doesn't
      79             :     // seem worth factoring that out to a helper function.
      80         406 :     if (!still_alive_guard.expired()) {
      81         406 :       cb(getWorker(index));
      82         406 :     }
      83         406 :   };
      84         209 : }
      85             : 
      86             : void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb,
      87          10 :                                              const std::function<void()>& complete_cb) {
      88          10 :   parent_.runOnAllThreads(dataCallback(cb), complete_cb);
      89          10 : }
      90             : 
      91         199 : void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) {
      92         199 :   parent_.runOnAllThreads(dataCallback(cb));
      93         199 : }
      94             : 
      95        1117 : void InstanceImpl::SlotImpl::set(InitializeCb cb) {
      96        1117 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
      97        1117 :   ASSERT(!parent_.shutdown_);
      98             : 
      99        1117 :   for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
     100             :     // See the header file comments for still_alive_guard_ for why we capture index_.
     101        1117 :     dispatcher.post(wrapCallback(
     102        1117 :         [index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }));
     103        1117 :   }
     104             : 
     105             :   // Handle main thread.
     106        1117 :   setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
     107        1117 : }
     108             : 
     109         262 : void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
     110         262 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
     111         262 :   ASSERT(!shutdown_);
     112             : 
     113         262 :   if (main_thread) {
     114         131 :     main_thread_dispatcher_ = &dispatcher;
     115         131 :     thread_local_data_.dispatcher_ = &dispatcher;
     116         131 :   } else {
     117         131 :     ASSERT(!containsReference(registered_threads_, dispatcher));
     118         131 :     registered_threads_.push_back(dispatcher);
     119         131 :     dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
     120         131 :   }
     121         262 : }
     122             : 
     123        1281 : void InstanceImpl::removeSlot(uint32_t slot) {
     124        1281 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
     125             : 
     126             :   // When shutting down, we do not post slot removals to other threads. This is because the other
     127             :   // threads have already shut down and the dispatcher is no longer alive. There is also no reason
     128             :   // to do removal, because no allocations happen during shutdown and shutdownThread() will clean
     129             :   // things up on the other thread.
     130        1281 :   if (shutdown_) {
     131        1265 :     return;
     132        1265 :   }
     133             : 
     134          16 :   slots_[slot] = nullptr;
     135          16 :   ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), slot) ==
     136          16 :              free_slot_indexes_.end(),
     137          16 :          fmt::format("slot index {} already in free slot set!", slot));
     138          16 :   free_slot_indexes_.push_back(slot);
     139          20 :   runOnAllThreads([slot]() -> void {
     140             :     // This runs on each thread and clears the slot, making it available for a new allocations.
     141             :     // This is safe even if a new allocation comes in, because everything happens with post() and
     142             :     // will be sequenced after this removal. It is also safe if there are callbacks pending on
     143             :     // other threads because they will run first.
     144          20 :     if (slot < thread_local_data_.data_.size()) {
     145          14 :       thread_local_data_.data_[slot] = nullptr;
     146          14 :     }
     147          20 :   });
     148          16 : }
     149             : 
     150         215 : void InstanceImpl::runOnAllThreads(std::function<void()> cb) {
     151         215 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
     152         215 :   ASSERT(!shutdown_);
     153             : 
     154         215 :   for (Event::Dispatcher& dispatcher : registered_threads_) {
     155         209 :     dispatcher.post(cb);
     156         209 :   }
     157             : 
     158             :   // Handle main thread.
     159         215 :   cb();
     160         215 : }
     161             : 
     162             : void InstanceImpl::runOnAllThreads(std::function<void()> cb,
     163          10 :                                    std::function<void()> all_threads_complete_cb) {
     164          10 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
     165          10 :   ASSERT(!shutdown_);
     166             :   // Handle main thread first so that when the last worker thread wins, we could just call the
     167             :   // all_threads_complete_cb method. Parallelism of main thread execution is being traded off
     168             :   // for programming simplicity here.
     169          10 :   cb();
     170             : 
     171          10 :   std::shared_ptr<std::function<void()>> cb_guard(
     172          10 :       new std::function<void()>(cb), [this, all_threads_complete_cb](std::function<void()>* cb) {
     173          10 :         main_thread_dispatcher_->post(all_threads_complete_cb);
     174          10 :         delete cb;
     175          10 :       });
     176             : 
     177          10 :   for (Event::Dispatcher& dispatcher : registered_threads_) {
     178          10 :     dispatcher.post([cb_guard]() -> void { (*cb_guard)(); });
     179          10 :   }
     180          10 : }
     181             : 
     182        2037 : void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
     183        2037 :   if (thread_local_data_.data_.size() <= index) {
     184        1374 :     thread_local_data_.data_.resize(index + 1);
     185        1374 :   }
     186             : 
     187        2037 :   thread_local_data_.data_[index] = object;
     188        2037 : }
     189             : 
     190         455 : void InstanceImpl::shutdownGlobalThreading() {
     191         455 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
     192         455 :   ASSERT(!shutdown_);
     193         455 :   shutdown_ = true;
     194         455 : }
     195             : 
     196         549 : void InstanceImpl::shutdownThread() {
     197         549 :   ASSERT(shutdown_);
     198             : 
     199             :   // Destruction of slots is done in *reverse* order. This is so that filters and higher layer
     200             :   // things that are built on top of the cluster manager, stats, etc. will be destroyed before
     201             :   // more base layer things. The reason reverse ordering is done is to deal with the case that leaf
     202             :   // objects depend in some way on "persistent" objects (particularly the cluster manager) that are
     203             :   // created very early on with a known slot number and never destroyed until shutdown. For example,
     204             :   // if we chose to create persistent per-thread gRPC clients we would potentially run into shutdown
     205             :   // issues if that thing got destroyed after the cluster manager. This happens in practice
     206             :   // currently when a redis connection pool is destroyed and removes its member update callback from
     207             :   // the backing cluster. Examples of things with TLS that are created early on and are never
     208             :   // destroyed until server shutdown are stats, runtime, and the cluster manager (see server.cc).
     209             :   //
     210             :   // It's possible this might need to become more complicated later but it's OK for now. Note that
     211             :   // this is always safe to do because:
     212             :   // 1) All slot updates come in via post().
     213             :   // 2) No updates or removals will come in during shutdown().
     214             :   //
     215             :   // TODO(mattklein123): Deletion should really be in reverse *allocation* order. This could be
     216             :   //                     implemented relatively easily by keeping a parallel list of slot #s. This
     217             :   //                     would fix the case where something allocates two slots, but is interleaved
     218             :   //                     with a deletion, such that the second allocation is actually a lower slot
     219             :   //                     number than the first. This is an edge case that does not exist anywhere
     220             :   //                     in the code today, but we can keep this in mind if things become more
     221             :   //                     complicated in the future.
     222        2598 :   for (auto it = thread_local_data_.data_.rbegin(); it != thread_local_data_.data_.rend(); ++it) {
     223        2049 :     it->reset();
     224        2049 :   }
     225         549 :   thread_local_data_.data_.clear();
     226         549 : }
     227             : 
     228           2 : Event::Dispatcher& InstanceImpl::dispatcher() {
     229           2 :   ASSERT(thread_local_data_.dispatcher_ != nullptr);
     230           2 :   return *thread_local_data_.dispatcher_;
     231           2 : }
     232             : 
     233             : } // namespace ThreadLocal
     234             : } // namespace Envoy

Generated by: LCOV version 1.15