LCOV - code coverage report
Current view: top level - source/common/upstream - cluster_discovery_manager.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 26 124 21.0 %
Date: 2024-01-05 06:35:25 Functions: 6 14 42.9 %

          Line data    Source code
       1             : #include "source/common/upstream/cluster_discovery_manager.h"
       2             : 
       3             : #include <functional>
       4             : 
       5             : #include "source/common/common/enum_to_int.h"
       6             : 
       7             : namespace Envoy {
       8             : namespace Upstream {
       9             : 
      10             : namespace {
      11             : 
      12             : using ClusterAddedCb = std::function<void(absl::string_view)>;
      13             : 
      14             : class ClusterCallbacks : public ClusterUpdateCallbacks {
      15             : public:
      16         223 :   ClusterCallbacks(ClusterAddedCb cb) : cb_(std::move(cb)) {}
      17             : 
      18         306 :   void onClusterAddOrUpdate(absl::string_view cluster_name, ThreadLocalClusterCommand&) override {
      19         306 :     cb_(cluster_name);
      20         306 :   };
      21             : 
      22           0 :   void onClusterRemoval(const std::string&) override {}
      23             : 
      24             : private:
      25             :   ClusterAddedCb cb_;
      26             : };
      27             : 
      28             : } // namespace
      29             : 
      30             : ClusterDiscoveryManager::ClusterDiscoveryManager(
      31             :     std::string thread_name, ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler)
      32         223 :     : thread_name_(std::move(thread_name)) {
      33         337 :   callbacks_ = std::make_unique<ClusterCallbacks>([this](absl::string_view cluster_name) {
      34         306 :     ENVOY_LOG(trace,
      35         306 :               "cm cdm: starting processing cluster name {} (status {}) from cluster lifecycle "
      36         306 :               "callback in {}",
      37         306 :               cluster_name, enumToInt(ClusterDiscoveryStatus::Available), thread_name_);
      38         306 :     processClusterName(cluster_name, ClusterDiscoveryStatus::Available);
      39         306 :   });
      40         223 :   callbacks_handle_ = lifecycle_callbacks_handler.addClusterUpdateCallbacks(*callbacks_);
      41         223 : }
      42             : 
      43             : void ClusterDiscoveryManager::processClusterName(absl::string_view name,
      44         306 :                                                  ClusterDiscoveryStatus cluster_status) {
      45         306 :   auto callback_items = extractCallbackList(name);
      46         306 :   if (callback_items.empty()) {
      47         306 :     ENVOY_LOG(trace, "cm cdm: no callbacks for the cluster name {} in {}", name, thread_name_);
      48         306 :     return;
      49         306 :   }
      50           0 :   ENVOY_LOG(trace, "cm cdm: invoking {} callbacks for the cluster name {} in {}",
      51           0 :             callback_items.size(), name, thread_name_);
      52           0 :   for (auto& item : callback_items) {
      53           0 :     auto callback = std::move(item->callback_);
      54             :     // This invalidates the handle and the invoker.
      55           0 :     item.reset();
      56             :     // The callback could be null when handle was destroyed during the
      57             :     // previous callback.
      58           0 :     if (callback != nullptr) {
      59           0 :       (*callback)(cluster_status);
      60           0 :     }
      61           0 :   }
      62           0 : }
      63             : 
      64             : ClusterDiscoveryManager::AddedCallbackData
      65           0 : ClusterDiscoveryManager::addCallback(std::string name, ClusterDiscoveryCallbackPtr callback) {
      66           0 :   ENVOY_LOG(trace, "cm cdm: adding callback for the cluster name {} in {}", name, thread_name_);
      67           0 :   auto& callbacks_list = pending_clusters_[name];
      68           0 :   auto item_weak_ptr = addCallbackInternal(callbacks_list, std::move(callback));
      69           0 :   auto handle = std::make_unique<ClusterDiscoveryCallbackHandleImpl>(*this, name, item_weak_ptr);
      70           0 :   CallbackInvoker invoker(*this, std::move(name), std::move(item_weak_ptr));
      71           0 :   auto discovery_in_progress = (callbacks_list.size() > 1);
      72           0 :   return {std::move(handle), discovery_in_progress, std::move(invoker)};
      73           0 : }
      74             : 
      75           0 : void ClusterDiscoveryManager::swap(ClusterDiscoveryManager& other) {
      76           0 :   thread_name_.swap(other.thread_name_);
      77           0 :   pending_clusters_.swap(other.pending_clusters_);
      78           0 :   callbacks_.swap(other.callbacks_);
      79           0 :   callbacks_handle_.swap(other.callbacks_handle_);
      80           0 : }
      81             : 
      82             : void ClusterDiscoveryManager::invokeCallbackFromItem(absl::string_view name,
      83             :                                                      CallbackListItemWeakPtr item_weak_ptr,
      84           0 :                                                      ClusterDiscoveryStatus cluster_status) {
      85           0 :   auto item_ptr = item_weak_ptr.lock();
      86           0 :   if (item_ptr == nullptr) {
      87           0 :     ENVOY_LOG(trace, "cm cdm: not invoking an already stale callback for cluster {} in {}", name,
      88           0 :               thread_name_);
      89           0 :     return;
      90           0 :   }
      91           0 :   ENVOY_LOG(trace, "cm cdm: invoking a callback for cluster {} in {}", name, thread_name_);
      92           0 :   auto callback = std::move(item_ptr->callback_);
      93           0 :   if (item_ptr->self_iterator_.has_value()) {
      94           0 :     eraseItem(name, std::move(item_ptr));
      95           0 :   } else {
      96           0 :     ENVOY_LOG(trace,
      97           0 :               "cm cdm: the callback for cluster {} in {} is prepared for invoking during "
      98           0 :               "processing, yet some other callback tries to invoke this callback earlier",
      99           0 :               name, thread_name_);
     100           0 :   }
     101           0 :   if (callback != nullptr) {
     102           0 :     (*callback)(cluster_status);
     103           0 :   } else {
     104           0 :     ENVOY_LOG(trace, "cm cdm: the callback for cluster {} in {} is prepared for invoking during "
     105           0 :                      "processing, yet some other callback destroyed its handle in the meantime");
     106           0 :   }
     107           0 : }
     108             : 
     109             : ClusterDiscoveryManager::CallbackList
     110         306 : ClusterDiscoveryManager::extractCallbackList(absl::string_view name) {
     111         306 :   auto map_node_handle = pending_clusters_.extract(name);
     112         306 :   if (map_node_handle.empty()) {
     113         306 :     return {};
     114         306 :   }
     115           0 :   CallbackList extracted;
     116           0 :   map_node_handle.mapped().swap(extracted);
     117           0 :   for (auto& item : extracted) {
     118           0 :     item->self_iterator_.reset();
     119           0 :   }
     120           0 :   return extracted;
     121         306 : }
     122             : 
     123             : ClusterDiscoveryManager::CallbackListItemWeakPtr
     124             : ClusterDiscoveryManager::addCallbackInternal(CallbackList& list,
     125           0 :                                              ClusterDiscoveryCallbackPtr callback) {
     126           0 :   auto item = std::make_shared<CallbackListItem>(std::move(callback));
     127           0 :   auto it = list.emplace(list.end(), item);
     128           0 :   item->self_iterator_ = std::move(it);
     129           0 :   return item;
     130           0 : }
     131             : 
     132           0 : void ClusterDiscoveryManager::erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr) {
     133           0 :   auto item_ptr = item_weak_ptr.lock();
     134           0 :   if (item_ptr == nullptr) {
     135           0 :     ENVOY_LOG(trace, "cm cdm: not dropping a stale callback for the cluster name {} in {}", name,
     136           0 :               thread_name_);
     137           0 :     return;
     138           0 :   }
     139           0 :   ENVOY_LOG(trace, "cm cdm: dropping callback for the cluster name {} in {}", name, thread_name_);
     140           0 :   if (!item_ptr->self_iterator_.has_value()) {
     141           0 :     ENVOY_LOG(trace,
     142           0 :               "cm cdm: callback for the cluster name {} in {} is not on the callbacks list "
     143           0 :               "anymore, which means it is about to be invoked; preventing it",
     144           0 :               name, thread_name_);
     145           0 :     item_ptr->callback_.reset();
     146           0 :     return;
     147           0 :   }
     148           0 :   eraseItem(name, std::move(item_ptr));
     149           0 : }
     150             : 
     151             : void ClusterDiscoveryManager::eraseItem(absl::string_view name,
     152           0 :                                         CallbackListItemSharedPtr item_ptr) {
     153           0 :   ASSERT(item_ptr != nullptr);
     154           0 :   ASSERT(item_ptr->self_iterator_.has_value());
     155           0 :   const bool drop_list = eraseFromList(name, item_ptr->self_iterator_.value());
     156           0 :   item_ptr->self_iterator_.reset();
     157           0 :   if (drop_list) {
     158           0 :     ENVOY_LOG(trace, "cm cdm: dropped last callback for the cluster name {} in {}", name,
     159           0 :               thread_name_);
     160           0 :     pending_clusters_.erase(name);
     161           0 :   }
     162           0 : }
     163             : 
     164           0 : bool ClusterDiscoveryManager::eraseFromList(absl::string_view name, CallbackListIterator it) {
     165           0 :   auto map_it = pending_clusters_.find(name);
     166           0 :   ASSERT(map_it != pending_clusters_.end());
     167           0 :   auto& list = map_it->second;
     168           0 :   list.erase(it);
     169           0 :   return list.empty();
     170           0 : }
     171             : 
     172             : } // namespace Upstream
     173             : } // namespace Envoy

Generated by: LCOV version 1.15