Line data Source code
1 : #include "source/common/thread_local/thread_local_impl.h" 2 : 3 : #include <algorithm> 4 : #include <atomic> 5 : #include <cstdint> 6 : #include <list> 7 : 8 : #include "envoy/event/dispatcher.h" 9 : 10 : #include "source/common/common/assert.h" 11 : #include "source/common/common/stl_helpers.h" 12 : 13 : namespace Envoy { 14 : namespace ThreadLocal { 15 : 16 : thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_; 17 : 18 455 : InstanceImpl::~InstanceImpl() { 19 455 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 20 455 : ASSERT(shutdown_); 21 455 : thread_local_data_.data_.clear(); 22 455 : } 23 : 24 1281 : SlotPtr InstanceImpl::allocateSlot() { 25 1281 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 26 1281 : ASSERT(!shutdown_); 27 : 28 1281 : if (free_slot_indexes_.empty()) { 29 1277 : SlotPtr slot = std::make_unique<SlotImpl>(*this, uint32_t(slots_.size())); 30 1277 : slots_.push_back(slot.get()); 31 1277 : return slot; 32 1277 : } 33 4 : const uint32_t idx = free_slot_indexes_.front(); 34 4 : free_slot_indexes_.pop_front(); 35 4 : ASSERT(idx < slots_.size()); 36 4 : SlotPtr slot = std::make_unique<SlotImpl>(*this, idx); 37 4 : slots_[idx] = slot.get(); 38 4 : return slot; 39 1281 : } 40 : 41 : InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index) 42 1281 : : parent_(parent), index_(index), still_alive_guard_(std::make_shared<bool>(true)) {} 43 : 44 1117 : std::function<void()> InstanceImpl::SlotImpl::wrapCallback(const std::function<void()>& cb) { 45 : // See the header file comments for still_alive_guard_ for the purpose of this capture and the 46 : // expired check below. 47 : // 48 : // Note also that this logic is duplicated below and dataCallback(), rather 49 : // than incurring another lambda redirection. 50 1117 : return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb] { 51 920 : if (!still_alive_guard.expired()) { 52 920 : cb(); 53 920 : } 54 920 : }; 55 1117 : } 56 : 57 882 : bool InstanceImpl::SlotImpl::currentThreadRegisteredWorker(uint32_t index) { 58 882 : return thread_local_data_.data_.size() > index; 59 882 : } 60 : 61 882 : bool InstanceImpl::SlotImpl::currentThreadRegistered() { 62 882 : return currentThreadRegisteredWorker(index_); 63 882 : } 64 : 65 63573 : ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) { 66 63573 : ASSERT(currentThreadRegisteredWorker(index)); 67 63573 : return thread_local_data_.data_[index]; 68 63573 : } 69 : 70 63167 : ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); } 71 : 72 209 : std::function<void()> InstanceImpl::SlotImpl::dataCallback(const UpdateCb& cb) { 73 : // See the header file comments for still_alive_guard_ for why we capture index_. 74 209 : return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb = std::move(cb), 75 406 : index = index_]() mutable { 76 : // This duplicates logic in wrapCallback() (above). Using wrapCallback also 77 : // works, but incurs another indirection of lambda at runtime. As the 78 : // duplicated logic is only an if-statement and a bool function, it doesn't 79 : // seem worth factoring that out to a helper function. 80 406 : if (!still_alive_guard.expired()) { 81 406 : cb(getWorker(index)); 82 406 : } 83 406 : }; 84 209 : } 85 : 86 : void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb, 87 10 : const std::function<void()>& complete_cb) { 88 10 : parent_.runOnAllThreads(dataCallback(cb), complete_cb); 89 10 : } 90 : 91 199 : void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) { 92 199 : parent_.runOnAllThreads(dataCallback(cb)); 93 199 : } 94 : 95 1117 : void InstanceImpl::SlotImpl::set(InitializeCb cb) { 96 1117 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 97 1117 : ASSERT(!parent_.shutdown_); 98 : 99 1117 : for (Event::Dispatcher& dispatcher : parent_.registered_threads_) { 100 : // See the header file comments for still_alive_guard_ for why we capture index_. 101 1117 : dispatcher.post(wrapCallback( 102 1117 : [index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); })); 103 1117 : } 104 : 105 : // Handle main thread. 106 1117 : setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_)); 107 1117 : } 108 : 109 262 : void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) { 110 262 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 111 262 : ASSERT(!shutdown_); 112 : 113 262 : if (main_thread) { 114 131 : main_thread_dispatcher_ = &dispatcher; 115 131 : thread_local_data_.dispatcher_ = &dispatcher; 116 131 : } else { 117 131 : ASSERT(!containsReference(registered_threads_, dispatcher)); 118 131 : registered_threads_.push_back(dispatcher); 119 131 : dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; }); 120 131 : } 121 262 : } 122 : 123 1281 : void InstanceImpl::removeSlot(uint32_t slot) { 124 1281 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 125 : 126 : // When shutting down, we do not post slot removals to other threads. This is because the other 127 : // threads have already shut down and the dispatcher is no longer alive. There is also no reason 128 : // to do removal, because no allocations happen during shutdown and shutdownThread() will clean 129 : // things up on the other thread. 130 1281 : if (shutdown_) { 131 1265 : return; 132 1265 : } 133 : 134 16 : slots_[slot] = nullptr; 135 16 : ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), slot) == 136 16 : free_slot_indexes_.end(), 137 16 : fmt::format("slot index {} already in free slot set!", slot)); 138 16 : free_slot_indexes_.push_back(slot); 139 20 : runOnAllThreads([slot]() -> void { 140 : // This runs on each thread and clears the slot, making it available for a new allocations. 141 : // This is safe even if a new allocation comes in, because everything happens with post() and 142 : // will be sequenced after this removal. It is also safe if there are callbacks pending on 143 : // other threads because they will run first. 144 20 : if (slot < thread_local_data_.data_.size()) { 145 14 : thread_local_data_.data_[slot] = nullptr; 146 14 : } 147 20 : }); 148 16 : } 149 : 150 215 : void InstanceImpl::runOnAllThreads(std::function<void()> cb) { 151 215 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 152 215 : ASSERT(!shutdown_); 153 : 154 215 : for (Event::Dispatcher& dispatcher : registered_threads_) { 155 209 : dispatcher.post(cb); 156 209 : } 157 : 158 : // Handle main thread. 159 215 : cb(); 160 215 : } 161 : 162 : void InstanceImpl::runOnAllThreads(std::function<void()> cb, 163 10 : std::function<void()> all_threads_complete_cb) { 164 10 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 165 10 : ASSERT(!shutdown_); 166 : // Handle main thread first so that when the last worker thread wins, we could just call the 167 : // all_threads_complete_cb method. Parallelism of main thread execution is being traded off 168 : // for programming simplicity here. 169 10 : cb(); 170 : 171 10 : std::shared_ptr<std::function<void()>> cb_guard( 172 10 : new std::function<void()>(cb), [this, all_threads_complete_cb](std::function<void()>* cb) { 173 10 : main_thread_dispatcher_->post(all_threads_complete_cb); 174 10 : delete cb; 175 10 : }); 176 : 177 10 : for (Event::Dispatcher& dispatcher : registered_threads_) { 178 10 : dispatcher.post([cb_guard]() -> void { (*cb_guard)(); }); 179 10 : } 180 10 : } 181 : 182 2037 : void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) { 183 2037 : if (thread_local_data_.data_.size() <= index) { 184 1374 : thread_local_data_.data_.resize(index + 1); 185 1374 : } 186 : 187 2037 : thread_local_data_.data_[index] = object; 188 2037 : } 189 : 190 455 : void InstanceImpl::shutdownGlobalThreading() { 191 455 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 192 455 : ASSERT(!shutdown_); 193 455 : shutdown_ = true; 194 455 : } 195 : 196 549 : void InstanceImpl::shutdownThread() { 197 549 : ASSERT(shutdown_); 198 : 199 : // Destruction of slots is done in *reverse* order. This is so that filters and higher layer 200 : // things that are built on top of the cluster manager, stats, etc. will be destroyed before 201 : // more base layer things. The reason reverse ordering is done is to deal with the case that leaf 202 : // objects depend in some way on "persistent" objects (particularly the cluster manager) that are 203 : // created very early on with a known slot number and never destroyed until shutdown. For example, 204 : // if we chose to create persistent per-thread gRPC clients we would potentially run into shutdown 205 : // issues if that thing got destroyed after the cluster manager. This happens in practice 206 : // currently when a redis connection pool is destroyed and removes its member update callback from 207 : // the backing cluster. Examples of things with TLS that are created early on and are never 208 : // destroyed until server shutdown are stats, runtime, and the cluster manager (see server.cc). 209 : // 210 : // It's possible this might need to become more complicated later but it's OK for now. Note that 211 : // this is always safe to do because: 212 : // 1) All slot updates come in via post(). 213 : // 2) No updates or removals will come in during shutdown(). 214 : // 215 : // TODO(mattklein123): Deletion should really be in reverse *allocation* order. This could be 216 : // implemented relatively easily by keeping a parallel list of slot #s. This 217 : // would fix the case where something allocates two slots, but is interleaved 218 : // with a deletion, such that the second allocation is actually a lower slot 219 : // number than the first. This is an edge case that does not exist anywhere 220 : // in the code today, but we can keep this in mind if things become more 221 : // complicated in the future. 222 2598 : for (auto it = thread_local_data_.data_.rbegin(); it != thread_local_data_.data_.rend(); ++it) { 223 2049 : it->reset(); 224 2049 : } 225 549 : thread_local_data_.data_.clear(); 226 549 : } 227 : 228 2 : Event::Dispatcher& InstanceImpl::dispatcher() { 229 2 : ASSERT(thread_local_data_.dispatcher_ != nullptr); 230 2 : return *thread_local_data_.dispatcher_; 231 2 : } 232 : 233 : } // namespace ThreadLocal 234 : } // namespace Envoy