1
#include "source/common/thread_local/thread_local_impl.h"
2

            
3
#include <algorithm>
4
#include <atomic>
5
#include <cstdint>
6
#include <list>
7

            
8
#include "envoy/event/dispatcher.h"
9

            
10
#include "source/common/common/assert.h"
11
#include "source/common/common/stl_helpers.h"
12
#include "source/common/runtime/runtime_features.h"
13

            
14
namespace Envoy {
15
namespace ThreadLocal {
16

            
17
thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_;
18

            
19
10822
InstanceImpl::InstanceImpl() = default;
20

            
21
10822
InstanceImpl::~InstanceImpl() {
22
10822
  ASSERT_IS_MAIN_OR_TEST_THREAD();
23
10822
  ASSERT(shutdown_);
24
10822
  thread_local_data_.data_.clear();
25
10822
}
26

            
27
111088
SlotPtr InstanceImpl::allocateSlot() {
28
111088
  ASSERT_IS_MAIN_OR_TEST_THREAD();
29
111088
  ASSERT(!shutdown_);
30

            
31
111088
  if (free_slot_indexes_.empty()) {
32
111063
    SlotPtr slot = std::make_unique<SlotImpl>(*this, uint32_t(slots_.size()));
33
111063
    slots_.push_back(slot.get());
34
111063
    return slot;
35
111063
  }
36
25
  const uint32_t idx = free_slot_indexes_.front();
37
25
  free_slot_indexes_.pop_front();
38
25
  ASSERT(idx < slots_.size());
39
25
  SlotPtr slot = std::make_unique<SlotImpl>(*this, idx);
40
25
  slots_[idx] = slot.get();
41
25
  return slot;
42
111088
}
43

            
44
InstanceImpl::SlotImpl::SlotImpl(InstanceImpl& parent, uint32_t index)
45
111088
    : parent_(parent), index_(index), still_alive_guard_(std::make_shared<bool>(true)) {}
46

            
47
111088
InstanceImpl::SlotImpl::~SlotImpl() {
48
  // Do nothing if the parent is already shutdown. Return early here to avoid accessing the main
49
  // thread dispatcher because it may have been destroyed.
50
111088
  if (isShutdownImpl()) {
51
110908
    return;
52
110908
  }
53

            
54
180
  auto* main_thread_dispatcher = parent_.main_thread_dispatcher_;
55
  // Main thread dispatcher may be nullptr if the slot is being created and destroyed during
56
  // server initialization.
57
180
  if (main_thread_dispatcher == nullptr || main_thread_dispatcher->isThreadSafe()) {
58
    // If the slot is being destroyed on the main thread, we can remove it immediately.
59
162
    parent_.removeSlot(index_);
60
179
  } else {
61
    // If the slot is being destroyed on a worker thread, we need to post the removal to the
62
    // main thread. There are two possible cases here:
63
    // 1. The removal is executed on the main thread as expected if the main dispatcher is still
64
    //    active. This is the common case and the clean up will be done as expected because the
65
    //    the worker dispatchers must be active before the main dispatcher is exited.
66
    // 2. The removal is not executed if the main dispatcher has already exited. This is fine
67
    //    because the removal has no side effect and will be ignored. The shutdown process will
68
    //    clean up all the slots anyway.
69
18
    main_thread_dispatcher->post([i = index_, &tls = parent_] { tls.removeSlot(i); });
70
18
  }
71
180
}
72

            
73
111684
std::function<void()> InstanceImpl::SlotImpl::wrapCallback(const std::function<void()>& cb) {
74
  // See the header file comments for still_alive_guard_ for the purpose of this capture and the
75
  // expired check below.
76
  //
77
  // Note also that this logic is duplicated below and dataCallback(), rather
78
  // than incurring another lambda redirection.
79
111684
  return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb] {
80
110084
    if (!still_alive_guard.expired()) {
81
110074
      cb();
82
110074
    }
83
110084
  };
84
111684
}
85

            
86
23010
bool InstanceImpl::SlotImpl::currentThreadRegisteredWorker(uint32_t index) {
87
23010
  return thread_local_data_.data_.size() > index;
88
23010
}
89

            
90
23010
bool InstanceImpl::SlotImpl::currentThreadRegistered() {
91
23010
  return currentThreadRegisteredWorker(index_);
92
23010
}
93

            
94
8530422
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::getWorker(uint32_t index) {
95
8530422
  ASSERT(currentThreadRegisteredWorker(index));
96
8530422
  return thread_local_data_.data_[index];
97
8530422
}
98

            
99
8424281
ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() { return getWorker(index_); }
100

            
101
45742
std::function<void()> InstanceImpl::SlotImpl::dataCallback(const UpdateCb& cb) {
102
  // See the header file comments for still_alive_guard_ for why we capture index_.
103
45742
  return [still_alive_guard = std::weak_ptr<bool>(still_alive_guard_), cb = std::move(cb),
104
106145
          index = index_]() mutable {
105
    // This duplicates logic in wrapCallback() (above). Using wrapCallback also
106
    // works, but incurs another indirection of lambda at runtime. As the
107
    // duplicated logic is only an if-statement and a bool function, it doesn't
108
    // seem worth factoring that out to a helper function.
109
106145
    if (!still_alive_guard.expired()) {
110
106142
      cb(getWorker(index));
111
106142
    }
112
106145
  };
113
45742
}
114

            
115
void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb,
116
16343
                                             const std::function<void()>& complete_cb) {
117
16343
  parent_.runOnAllThreads(dataCallback(cb), complete_cb);
118
16343
}
119

            
120
29399
void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) {
121
29399
  parent_.runOnAllThreads(dataCallback(cb));
122
29399
}
123

            
124
110102
void InstanceImpl::SlotImpl::set(InitializeCb cb) {
125
110102
  ASSERT_IS_MAIN_OR_TEST_THREAD();
126
110102
  ASSERT(!parent_.shutdown_);
127

            
128
111984
  for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
129
    // See the header file comments for still_alive_guard_ for why we capture index_.
130
111684
    dispatcher.post(wrapCallback(
131
111684
        [index = index_, cb, &dispatcher]() -> void { setThreadLocal(index, cb(dispatcher)); }));
132
111684
  }
133

            
134
  // Handle main thread.
135
110102
  setThreadLocal(index_, cb(*parent_.main_thread_dispatcher_));
136
110102
}
137

            
138
21767
void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
139
21767
  ASSERT_IS_MAIN_OR_TEST_THREAD();
140
21767
  ASSERT(!shutdown_);
141

            
142
21767
  if (main_thread) {
143
10791
    main_thread_dispatcher_ = &dispatcher;
144
10791
    thread_local_data_.dispatcher_ = &dispatcher;
145
11064
  } else {
146
10976
    ASSERT(!containsReference(registered_threads_, dispatcher));
147
10976
    registered_threads_.push_back(dispatcher);
148
10976
    dispatcher.post([&dispatcher] { thread_local_data_.dispatcher_ = &dispatcher; });
149
10976
  }
150
21767
}
151

            
152
179
void InstanceImpl::removeSlot(uint32_t slot) {
153
179
  ASSERT_IS_MAIN_OR_TEST_THREAD();
154

            
155
  // When shutting down, we do not post slot removals to other threads. This is because the other
156
  // threads have already shut down and the dispatcher is no longer alive. There is also no reason
157
  // to do removal, because no allocations happen during shutdown and shutdownThread() will clean
158
  // things up on the other thread.
159
179
  if (shutdown_) {
160
    return;
161
  }
162

            
163
179
  slots_[slot] = nullptr;
164
179
  ASSERT(std::find(free_slot_indexes_.begin(), free_slot_indexes_.end(), slot) ==
165
179
             free_slot_indexes_.end(),
166
179
         fmt::format("slot index {} already in free slot set!", slot));
167
179
  free_slot_indexes_.push_back(slot);
168
461
  runOnAllThreads([slot]() -> void {
169
    // This runs on each thread and clears the slot, making it available for a new allocations.
170
    // This is safe even if a new allocation comes in, because everything happens with post() and
171
    // will be sequenced after this removal. It is also safe if there are callbacks pending on
172
    // other threads because they will run first.
173
461
    if (slot < thread_local_data_.data_.size()) {
174
392
      thread_local_data_.data_[slot] = nullptr;
175
392
    }
176
461
  });
177
179
}
178

            
179
29578
void InstanceImpl::runOnAllThreads(std::function<void()> cb) {
180
29578
  ASSERT_IS_MAIN_OR_TEST_THREAD();
181
29578
  ASSERT(!shutdown_);
182

            
183
29870
  for (Event::Dispatcher& dispatcher : registered_threads_) {
184
29854
    dispatcher.post(cb);
185
29854
  }
186

            
187
  // Handle main thread.
188
29578
  cb();
189
29578
}
190

            
191
void InstanceImpl::runOnAllThreads(std::function<void()> cb,
192
16343
                                   std::function<void()> all_threads_complete_cb) {
193
16343
  ASSERT_IS_MAIN_OR_TEST_THREAD();
194
16343
  ASSERT(!shutdown_);
195
  // Handle main thread first so that when the last worker thread wins, we could just call the
196
  // all_threads_complete_cb method. Parallelism of main thread execution is being traded off
197
  // for programming simplicity here.
198
16343
  cb();
199

            
200
16343
  std::shared_ptr<std::function<void()>> cb_guard(
201
16343
      new std::function<void()>(cb), [this, all_threads_complete_cb](std::function<void()>* cb) {
202
16343
        main_thread_dispatcher_->post(all_threads_complete_cb);
203
16343
        delete cb;
204
16343
      });
205

            
206
31258
  for (Event::Dispatcher& dispatcher : registered_threads_) {
207
31258
    dispatcher.post([cb_guard]() -> void { (*cb_guard)(); });
208
31258
  }
209
16343
}
210

            
211
220179
void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr object) {
212
220179
  if (thread_local_data_.data_.size() <= index) {
213
136089
    thread_local_data_.data_.resize(index + 1);
214
136089
  }
215

            
216
220179
  thread_local_data_.data_[index] = object;
217
220179
}
218

            
219
10821
void InstanceImpl::shutdownGlobalThreading() {
220
10821
  ASSERT_IS_MAIN_OR_TEST_THREAD();
221
10821
  ASSERT(!shutdown_);
222
10821
  shutdown_ = true;
223
10821
}
224

            
225
21457
void InstanceImpl::shutdownThread() {
226
21457
  ASSERT(shutdown_);
227

            
228
  // Destruction of slots is done in *reverse* order. This is so that filters and higher layer
229
  // things that are built on top of the cluster manager, stats, etc. will be destroyed before
230
  // more base layer things. The reason reverse ordering is done is to deal with the case that leaf
231
  // objects depend in some way on "persistent" objects (particularly the cluster manager) that are
232
  // created very early on with a known slot number and never destroyed until shutdown. For example,
233
  // if we chose to create persistent per-thread gRPC clients we would potentially run into shutdown
234
  // issues if that thing got destroyed after the cluster manager. This happens in practice
235
  // currently when a redis connection pool is destroyed and removes its member update callback from
236
  // the backing cluster. Examples of things with TLS that are created early on and are never
237
  // destroyed until server shutdown are stats, runtime, and the cluster manager (see server.cc).
238
  //
239
  // It's possible this might need to become more complicated later but it's OK for now. Note that
240
  // this is always safe to do because:
241
  // 1) All slot updates come in via post().
242
  // 2) No updates or removals will come in during shutdown().
243
  //
244
  // TODO(mattklein123): Deletion should really be in reverse *allocation* order. This could be
245
  //                     implemented relatively easily by keeping a parallel list of slot #s. This
246
  //                     would fix the case where something allocates two slots, but is interleaved
247
  //                     with a deletion, such that the second allocation is actually a lower slot
248
  //                     number than the first. This is an edge case that does not exist anywhere
249
  //                     in the code today, but we can keep this in mind if things become more
250
  //                     complicated in the future.
251
242748
  for (auto it = thread_local_data_.data_.rbegin(); it != thread_local_data_.data_.rend(); ++it) {
252
221291
    it->reset();
253
221291
  }
254
21457
  thread_local_data_.data_.clear();
255
21457
}
256

            
257
1051
Event::Dispatcher& InstanceImpl::dispatcher() {
258
1051
  ASSERT(thread_local_data_.dispatcher_ != nullptr);
259
1051
  return *thread_local_data_.dispatcher_;
260
1051
}
261

            
262
} // namespace ThreadLocal
263
} // namespace Envoy