Line data Source code
1 : #include "source/common/upstream/cluster_discovery_manager.h"
2 :
3 : #include <functional>
4 :
5 : #include "source/common/common/enum_to_int.h"
6 :
7 : namespace Envoy {
8 : namespace Upstream {
9 :
10 : namespace {
11 :
12 : using ClusterAddedCb = std::function<void(absl::string_view)>;
13 :
14 : class ClusterCallbacks : public ClusterUpdateCallbacks {
15 : public:
16 223 : ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}
17 :
18 306 : void onClusterAddOrUpdate(absl::string_view cluster_name, ThreadLocalClusterCommand&) override {
19 306 : cb_(cluster_name);
20 306 : };
21 :
22 0 : void onClusterRemoval(const std::string&) override {}
23 :
24 : private:
25 : ClusterAddedCb cb_;
26 : };
27 :
28 : } // namespace
29 :
30 : ClusterDiscoveryManager::ClusterDiscoveryManager(
31 : std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler)
32 223 : : thread_name_(std::move(thread_name)) {
33 337 : callbacks_ = std::make_unique<ClusterCallbacks>([this](absl::string_view cluster_name) {
34 306 : ENVOY_LOG(trace,
35 306 : "cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle "
36 306 : "callback in {}",
37 306 : cluster_name, enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
38 306 : processClusterName(cluster_name, ClusterDiscoveryStatus::Available);
39 306 : });
40 223 : callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_);
41 223 : }
42 :
43 : void ClusterDiscoveryManager::processClusterName(absl::string_view name,
44 306 : ClusterDiscoveryStatus cluster_status) {
45 306 : auto callback_items = extractCallbackList(name);
46 306 : if (callback_items.empty()) {
47 306 : ENVOY_LOG(trace, "cm cdm: no callbacks for the cluster name {} in {}", name, thread_name_);
48 306 : return;
49 306 : }
50 0 : ENVOY_LOG(trace, "cm cdm: invoking {} callbacks for the cluster name {} in {}",
51 0 : callback_items.size(), name, thread_name_);
52 0 : for (auto& item : callback_items) {
53 0 : auto callback = std::move(item->callback_);
54 : // This invalidates the handle and the invoker.
55 0 : item.reset();
56 : // The callback could be null when handle was destroyed during the
57 : // previous callback.
58 0 : if (callback != nullptr) {
59 0 : (*callback)(cluster_status);
60 0 : }
61 0 : }
62 0 : }
63 :
64 : ClusterDiscoveryManager::AddedCallbackData
65 0 : ClusterDiscoveryManager::addCallback(std::string name, ClusterDiscoveryCallbackPtr callback) {
66 0 : ENVOY_LOG(trace, "cm cdm: adding callback for the cluster name {} in {}", name, thread_name_);
67 0 : auto& callbacks_list = pending_clusters_[name];
68 0 : auto item_weak_ptr = addCallbackInternal(callbacks_list, std::move(callback));
69 0 : auto handle = std::make_unique<ClusterDiscoveryCallbackHandleImpl>(*this, name, item_weak_ptr);
70 0 : CallbackInvoker invoker(*this, std::move(name), std::move(item_weak_ptr));
71 0 : auto discovery_in_progress = (callbacks_list.size() > 1);
72 0 : return {std::move(handle), discovery_in_progress, std::move(invoker)};
73 0 : }
74 :
75 0 : void ClusterDiscoveryManager::swap(ClusterDiscoveryManager& other) {
76 0 : thread_name_.swap(other.thread_name_);
77 0 : pending_clusters_.swap(other.pending_clusters_);
78 0 : callbacks_.swap(other.callbacks_);
79 0 : callbacks_handle_.swap(other.callbacks_handle_);
80 0 : }
81 :
82 : void ClusterDiscoveryManager::invokeCallbackFromItem(absl::string_view name,
83 : CallbackListItemWeakPtr item_weak_ptr,
84 0 : ClusterDiscoveryStatus cluster_status) {
85 0 : auto item_ptr = item_weak_ptr.lock();
86 0 : if (item_ptr == nullptr) {
87 0 : ENVOY_LOG(trace, "cm cdm: not invoking an already stale callback for cluster {} in {}", name,
88 0 : thread_name_);
89 0 : return;
90 0 : }
91 0 : ENVOY_LOG(trace, "cm cdm: invoking a callback for cluster {} in {}", name, thread_name_);
92 0 : auto callback = std::move(item_ptr->callback_);
93 0 : if (item_ptr->self_iterator_.has_value()) {
94 0 : eraseItem(name, std::move(item_ptr));
95 0 : } else {
96 0 : ENVOY_LOG(trace,
97 0 : "cm cdm: the callback for cluster {} in {} is prepared for invoking during "
98 0 : "processing, yet some other callback tries to invoke this callback earlier",
99 0 : name, thread_name_);
100 0 : }
101 0 : if (callback != nullptr) {
102 0 : (*callback)(cluster_status);
103 0 : } else {
104 0 : ENVOY_LOG(trace, "cm cdm: the callback for cluster {} in {} is prepared for invoking during "
105 0 : "processing, yet some other callback destroyed its handle in the meantime");
106 0 : }
107 0 : }
108 :
109 : ClusterDiscoveryManager::CallbackList
110 306 : ClusterDiscoveryManager::extractCallbackList(absl::string_view name) {
111 306 : auto map_node_handle = pending_clusters_.extract(name);
112 306 : if (map_node_handle.empty()) {
113 306 : return {};
114 306 : }
115 0 : CallbackList extracted;
116 0 : map_node_handle.mapped().swap(extracted);
117 0 : for (auto& item : extracted) {
118 0 : item->self_iterator_.reset();
119 0 : }
120 0 : return extracted;
121 306 : }
122 :
123 : ClusterDiscoveryManager::CallbackListItemWeakPtr
124 : ClusterDiscoveryManager::addCallbackInternal(CallbackList& list,
125 0 : ClusterDiscoveryCallbackPtr callback) {
126 0 : auto item = std::make_shared<CallbackListItem>(std::move(callback));
127 0 : auto it = list.emplace(list.end(), item);
128 0 : item->self_iterator_ = std::move(it);
129 0 : return item;
130 0 : }
131 :
132 0 : void ClusterDiscoveryManager::erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr) {
133 0 : auto item_ptr = item_weak_ptr.lock();
134 0 : if (item_ptr == nullptr) {
135 0 : ENVOY_LOG(trace, "cm cdm: not dropping a stale callback for the cluster name {} in {}", name,
136 0 : thread_name_);
137 0 : return;
138 0 : }
139 0 : ENVOY_LOG(trace, "cm cdm: dropping callback for the cluster name {} in {}", name, thread_name_);
140 0 : if (!item_ptr->self_iterator_.has_value()) {
141 0 : ENVOY_LOG(trace,
142 0 : "cm cdm: callback for the cluster name {} in {} is not on the callbacks list "
143 0 : "anymore, which means it is about to be invoked; preventing it",
144 0 : name, thread_name_);
145 0 : item_ptr->callback_.reset();
146 0 : return;
147 0 : }
148 0 : eraseItem(name, std::move(item_ptr));
149 0 : }
150 :
151 : void ClusterDiscoveryManager::eraseItem(absl::string_view name,
152 0 : CallbackListItemSharedPtr item_ptr) {
153 0 : ASSERT(item_ptr != nullptr);
154 0 : ASSERT(item_ptr->self_iterator_.has_value());
155 0 : const bool drop_list = eraseFromList(name, item_ptr->self_iterator_.value());
156 0 : item_ptr->self_iterator_.reset();
157 0 : if (drop_list) {
158 0 : ENVOY_LOG(trace, "cm cdm: dropped last callback for the cluster name {} in {}", name,
159 0 : thread_name_);
160 0 : pending_clusters_.erase(name);
161 0 : }
162 0 : }
163 :
164 0 : bool ClusterDiscoveryManager::eraseFromList(absl::string_view name, CallbackListIterator it) {
165 0 : auto map_it = pending_clusters_.find(name);
166 0 : ASSERT(map_it != pending_clusters_.end());
167 0 : auto& list = map_it->second;
168 0 : list.erase(it);
169 0 : return list.empty();
170 0 : }
171 :
172 : } // namespace Upstream
173 : } // namespace Envoy
|