1
#include "source/common/upstream/cluster_discovery_manager.h"
2

            
3
#include <functional>
4

            
5
#include "source/common/common/enum_to_int.h"
6

            
7
namespace Envoy {
8
namespace Upstream {
9

            
10
namespace {
11

            
12
using ClusterAddedCb = std::function<void(absl::string_view)>;
13

            
14
class ClusterCallbacks : public ClusterUpdateCallbacks {
15
public:
16
21682
  ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}
17

            
18
34271
  void onClusterAddOrUpdate(absl::string_view cluster_name, ThreadLocalClusterCommand&) override {
19
34271
    cb_(cluster_name);
20
34271
  };
21

            
22
1158
  void onClusterRemoval(const std::string&) override {}
23

            
24
private:
25
  ClusterAddedCb cb_;
26
};
27

            
28
} // namespace
29

            
30
ClusterDiscoveryManager::ClusterDiscoveryManager(
31
    std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler)
32
21682
    : thread_name_(std::move(thread_name)) {
33
34453
  callbacks_ = std::make_unique<ClusterCallbacks>([this](absl::string_view cluster_name) {
34
34271
    ENVOY_LOG(trace,
35
34271
              "cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle "
36
34271
              "callback in {}",
37
34271
              cluster_name, enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
38
34271
    processClusterName(cluster_name, ClusterDiscoveryStatus::Available);
39
34271
  });
40
21682
  callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_);
41
21682
}
42

            
43
void ClusterDiscoveryManager::processClusterName(absl::string_view name,
44
34343
                                                 ClusterDiscoveryStatus cluster_status) {
45
34343
  auto callback_items = extractCallbackList(name);
46
34343
  if (callback_items.empty()) {
47
34095
    ENVOY_LOG(trace, "cm cdm: no callbacks for the cluster name {} in {}", name, thread_name_);
48
34095
    return;
49
34095
  }
50
248
  ENVOY_LOG(trace, "cm cdm: invoking {} callbacks for the cluster name {} in {}",
51
248
            callback_items.size(), name, thread_name_);
52
343
  for (auto& item : callback_items) {
53
343
    auto callback = std::move(item->callback_);
54
    // This invalidates the handle and the invoker.
55
343
    item.reset();
56
    // The callback could be null when handle was destroyed during the
57
    // previous callback.
58
343
    if (callback != nullptr) {
59
338
      (*callback)(cluster_status);
60
338
    }
61
343
  }
62
248
}
63

            
64
ClusterDiscoveryManager::AddedCallbackData
65
457
ClusterDiscoveryManager::addCallback(std::string name, ClusterDiscoveryCallbackPtr callback) {
66
457
  ENVOY_LOG(trace, "cm cdm: adding callback for the cluster name {} in {}", name, thread_name_);
67
457
  auto& callbacks_list = pending_clusters_[name];
68
457
  auto item_weak_ptr = addCallbackInternal(callbacks_list, std::move(callback));
69
457
  auto handle = std::make_unique<ClusterDiscoveryCallbackHandleImpl>(*this, name, item_weak_ptr);
70
457
  CallbackInvoker invoker(*this, std::move(name), std::move(item_weak_ptr));
71
457
  auto discovery_in_progress = (callbacks_list.size() > 1);
72
457
  return {std::move(handle), discovery_in_progress, std::move(invoker)};
73
457
}
74

            
75
1
void ClusterDiscoveryManager::swap(ClusterDiscoveryManager& other) {
76
1
  thread_name_.swap(other.thread_name_);
77
1
  pending_clusters_.swap(other.pending_clusters_);
78
1
  callbacks_.swap(other.callbacks_);
79
1
  callbacks_handle_.swap(other.callbacks_handle_);
80
1
}
81

            
82
void ClusterDiscoveryManager::invokeCallbackFromItem(absl::string_view name,
83
                                                     CallbackListItemWeakPtr item_weak_ptr,
84
34
                                                     ClusterDiscoveryStatus cluster_status) {
85
34
  auto item_ptr = item_weak_ptr.lock();
86
34
  if (item_ptr == nullptr) {
87
7
    ENVOY_LOG(trace, "cm cdm: not invoking an already stale callback for cluster {} in {}", name,
88
7
              thread_name_);
89
7
    return;
90
7
  }
91
27
  ENVOY_LOG(trace, "cm cdm: invoking a callback for cluster {} in {}", name, thread_name_);
92
27
  auto callback = std::move(item_ptr->callback_);
93
27
  if (item_ptr->self_iterator_.has_value()) {
94
24
    eraseItem(name, std::move(item_ptr));
95
24
  } else {
96
3
    ENVOY_LOG(trace,
97
3
              "cm cdm: the callback for cluster {} in {} is prepared for invoking during "
98
3
              "processing, yet some other callback tries to invoke this callback earlier",
99
3
              name, thread_name_);
100
3
  }
101
27
  if (callback != nullptr) {
102
26
    (*callback)(cluster_status);
103
26
  } else {
104
1
    ENVOY_LOG(trace, "cm cdm: the callback for cluster {} in {} is prepared for invoking during "
105
1
                     "processing, yet some other callback destroyed its handle in the meantime");
106
1
  }
107
27
}
108

            
109
ClusterDiscoveryManager::CallbackList
110
34343
ClusterDiscoveryManager::extractCallbackList(absl::string_view name) {
111
34343
  auto map_node_handle = pending_clusters_.extract(name);
112
34343
  if (map_node_handle.empty()) {
113
34093
    return {};
114
34093
  }
115
250
  CallbackList extracted;
116
250
  map_node_handle.mapped().swap(extracted);
117
345
  for (auto& item : extracted) {
118
343
    item->self_iterator_.reset();
119
343
  }
120
250
  return extracted;
121
34343
}
122

            
123
ClusterDiscoveryManager::CallbackListItemWeakPtr
124
ClusterDiscoveryManager::addCallbackInternal(CallbackList& list,
125
457
                                             ClusterDiscoveryCallbackPtr callback) {
126
457
  auto item = std::make_shared<CallbackListItem>(std::move(callback));
127
457
  auto it = list.emplace(list.end(), item);
128
457
  item->self_iterator_ = std::move(it);
129
457
  return item;
130
457
}
131

            
132
457
void ClusterDiscoveryManager::erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr) {
133
457
  auto item_ptr = item_weak_ptr.lock();
134
457
  if (item_ptr == nullptr) {
135
365
    ENVOY_LOG(trace, "cm cdm: not dropping a stale callback for the cluster name {} in {}", name,
136
365
              thread_name_);
137
365
    return;
138
365
  }
139
92
  ENVOY_LOG(trace, "cm cdm: dropping callback for the cluster name {} in {}", name, thread_name_);
140
92
  if (!item_ptr->self_iterator_.has_value()) {
141
3
    ENVOY_LOG(trace,
142
3
              "cm cdm: callback for the cluster name {} in {} is not on the callbacks list "
143
3
              "anymore, which means it is about to be invoked; preventing it",
144
3
              name, thread_name_);
145
3
    item_ptr->callback_.reset();
146
3
    return;
147
3
  }
148
89
  eraseItem(name, std::move(item_ptr));
149
89
}
150

            
151
void ClusterDiscoveryManager::eraseItem(absl::string_view name,
152
113
                                        CallbackListItemSharedPtr item_ptr) {
153
113
  ASSERT(item_ptr != nullptr);
154
113
  ASSERT(item_ptr->self_iterator_.has_value());
155
113
  const bool drop_list = eraseFromList(name, item_ptr->self_iterator_.value());
156
113
  item_ptr->self_iterator_.reset();
157
113
  if (drop_list) {
158
62
    ENVOY_LOG(trace, "cm cdm: dropped last callback for the cluster name {} in {}", name,
159
62
              thread_name_);
160
62
    pending_clusters_.erase(name);
161
62
  }
162
113
}
163

            
164
113
bool ClusterDiscoveryManager::eraseFromList(absl::string_view name, CallbackListIterator it) {
165
113
  auto map_it = pending_clusters_.find(name);
166
113
  ASSERT(map_it != pending_clusters_.end());
167
113
  auto& list = map_it->second;
168
113
  list.erase(it);
169
113
  return list.empty();
170
113
}
171

            
172
} // namespace Upstream
173
} // namespace Envoy