LCOV - code coverage report
Current view: top level - source/extensions/common/redis - cluster_refresh_manager_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 87 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 9 0.0 %

          Line data    Source code
       1             : #include "source/extensions/common/redis/cluster_refresh_manager_impl.h"
       2             : 
       3             : #include "envoy/singleton/manager.h"
       4             : 
       5             : namespace Envoy {
       6             : namespace Extensions {
       7             : namespace Common {
       8             : namespace Redis {
       9             : 
      10             : SINGLETON_MANAGER_REGISTRATION(redis_refresh_manager);
      11             : 
      12             : ClusterRefreshManagerSharedPtr getClusterRefreshManager(Singleton::Manager& manager,
      13             :                                                         Event::Dispatcher& main_thread_dispatcher,
      14             :                                                         Upstream::ClusterManager& cm,
      15           0 :                                                         TimeSource& time_source) {
      16           0 :   return manager.getTyped<ClusterRefreshManager>(
      17           0 :       SINGLETON_MANAGER_REGISTERED_NAME(redis_refresh_manager), [&] {
      18           0 :         return std::make_shared<ClusterRefreshManagerImpl>(main_thread_dispatcher, cm, time_source);
      19           0 :       });
      20           0 : }
      21             : 
      22           0 : bool ClusterRefreshManagerImpl::onFailure(const std::string& cluster_name) {
      23           0 :   return onEvent(cluster_name, EventType::Failure);
      24           0 : }
      25             : 
      26           0 : bool ClusterRefreshManagerImpl::onHostDegraded(const std::string& cluster_name) {
      27           0 :   return onEvent(cluster_name, EventType::DegradedHost);
      28           0 : }
      29             : 
      30           0 : bool ClusterRefreshManagerImpl::onRedirection(const std::string& cluster_name) {
      31           0 :   return onEvent(cluster_name, EventType::Redirection);
      32           0 : }
      33             : 
      34           0 : bool ClusterRefreshManagerImpl::onEvent(const std::string& cluster_name, EventType event_type) {
      35           0 :   ClusterInfoSharedPtr info;
      36           0 :   {
      37             :     // Hold the map lock to avoid a race condition with calls to unregisterCluster
      38             :     // on the main thread.
      39           0 :     Thread::LockGuard lock(map_mutex_);
      40           0 :     auto it = info_map_.find(cluster_name);
      41           0 :     if (it != info_map_.end()) {
      42           0 :       info = it->second;
      43           0 :     }
      44           0 :   }
      45             :   // No locks needed for thread safety while accessing clusterInfoSharedPtr members as
      46             :   // all potentially modified members are atomic (redirects_count_, last_callback_time_ms_).
      47           0 :   if (info.get()) {
      48           0 :     const uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
      49           0 :                              time_source_.monotonicTime().time_since_epoch())
      50           0 :                              .count();
      51           0 :     uint64_t last_callback_time_ms = info->last_callback_time_ms_.load();
      52           0 :     if (!last_callback_time_ms ||
      53           0 :         (now >= (last_callback_time_ms + info->min_time_between_triggering_.count()))) {
      54           0 :       std::atomic<uint32_t>* count;
      55           0 :       uint32_t threshold;
      56           0 :       switch (event_type) {
      57           0 :       case EventType::Redirection: {
      58           0 :         count = &(info->redirects_count_);
      59           0 :         threshold = info->redirects_threshold_;
      60           0 :         break;
      61           0 :       }
      62           0 :       case EventType::DegradedHost: {
      63           0 :         count = &(info->host_degraded_count_);
      64           0 :         threshold = info->host_degraded_threshold_;
      65           0 :         break;
      66           0 :       }
      67           0 :       case EventType::Failure: {
      68           0 :         count = &(info->failures_count_);
      69           0 :         threshold = info->failure_threshold_;
      70           0 :         break;
      71           0 :       }
      72           0 :       }
      73           0 :       if (threshold <= 0) {
      74           0 :         return false;
      75           0 :       }
      76             : 
      77             :       // There're 3 updates to atomic values cross threads in this section of code
      78             :       // a) ++(*count) >= threshold
      79             :       // b) info->last_callback_time_ms_.compare_exchange_strong(last_callback_time_ms, now)
      80             :       // c) *count = 0
      81             :       // Let's say there're 2 threads T1 and T2, for all legal permutation of execution order a, b,
      82             :       // c we need to ensure that post_callback is true for only 1 thread and if both a) and b) are
      83             :       // true in 1 thread the count is 0 after this section. Here's a few different sequence that
      84             :       // can potentially result in race conditions to consider
      85             : 
      86             :       // Sequence 1:
      87             :       // starting condition: threshold:2, count:1, T1.last_call_back = T2.last_call_back =
      88             :       // info.last_call_back
      89             :       // * T1.a (count: 2)
      90             :       // * T1.b succeed (info.last_call_back: T1.now, T1.post_callback: true)
      91             :       // * T1.c (count:0)
      92             :       // * T2.a (count: 1, T2.post_callback: false)
      93             :       // * T2.b is skip since T2.a is false
      94             :       // * T2.c will still be triggered since info.last_call_back is now changed by T1 (count: 0)
      95             :       //
      96             :       // Sequence 2:
      97             :       // starting condition: threshold:2, count:1, T1.last_call_back = T2.last_call_back =
      98             :       // info.last_call_back
      99             :       // * T1.a (count: 2)
     100             :       // * T2.a (count: 3)
     101             :       // * T1.b succeed (info.last_call_back: T1.now, post_callback: true)
     102             :       // * T2.b failed due since info.last_call_back is now T1.now
     103             :       // * T1.c (count:0)
     104             :       // * T2.c (count:0) note we can't use count.decrement here since count is already 0
     105             :       //
     106             :       // Sequence 3:
     107             :       // starting condition: threshold:2, count:1, T1.last_call_back == T2.last_call_back ==
     108             :       // info.last_call_back
     109             :       // * T1.a (count: 1, T1.post_callback: false)
     110             :       // * T1.b skip since T1.a is false
     111             :       // * T2.a (count: 2, T2.post_callback: true)
     112             :       // * T2.b succeed (info.last_call_back = T2.now)
     113             :       // * T2.c (count: 0)
     114             :       // * T1.c will be triggered since info.last_call_back is changed by T2 (count: 0)
     115             : 
     116           0 :       bool post_callback = false;
     117             :       // ignore redirects during min time between triggering
     118           0 :       if ((++(*count) >= threshold) &&
     119           0 :           (info->last_callback_time_ms_.compare_exchange_strong(last_callback_time_ms, now))) {
     120             :         // last_callback_time_ms_ successfully updated without any changes since it was
     121             :         // initially read. This thread is allowed to post a call to the registered callback
     122             :         // on the main thread. Otherwise, the thread would be ignored to prevent over-triggering
     123             :         // cluster callbacks.
     124           0 :         post_callback = true;
     125           0 :       }
     126             : 
     127             :       // If a callback should be triggered(in this or some other thread) signaled by the changed
     128             :       // last callback time, we reset the count to 0
     129           0 :       if (post_callback || info->last_callback_time_ms_.load() != last_callback_time_ms) {
     130           0 :         *count = 0;
     131           0 :       }
     132             : 
     133           0 :       if (post_callback) {
     134           0 :         main_thread_dispatcher_.post([this, cluster_name, info]() {
     135             :           // Ensure that cluster is still active before calling callback.
     136           0 :           auto maps = cm_.clusters();
     137           0 :           auto it = maps.active_clusters_.find(cluster_name);
     138           0 :           if (it != maps.active_clusters_.end()) {
     139           0 :             info->cb_();
     140           0 :           }
     141           0 :         });
     142           0 :         return true;
     143           0 :       }
     144           0 :     }
     145           0 :   }
     146           0 :   return false;
     147           0 : }
     148             : 
     149             : ClusterRefreshManagerImpl::HandlePtr ClusterRefreshManagerImpl::registerCluster(
     150             :     const std::string& cluster_name, std::chrono::milliseconds min_time_between_triggering,
     151             :     const uint32_t redirects_threshold, const uint32_t failure_threshold,
     152           0 :     const uint32_t host_degraded_threshold, const RefreshCB& cb) {
     153           0 :   Thread::LockGuard lock(map_mutex_);
     154           0 :   ClusterInfoSharedPtr info =
     155           0 :       std::make_shared<ClusterInfo>(cluster_name, min_time_between_triggering, redirects_threshold,
     156           0 :                                     failure_threshold, host_degraded_threshold, cb);
     157           0 :   info_map_[cluster_name] = info;
     158             : 
     159           0 :   return std::make_unique<ClusterRefreshManagerImpl::HandleImpl>(this, info);
     160           0 : }
     161             : 
     162           0 : void ClusterRefreshManagerImpl::unregisterCluster(const ClusterInfoSharedPtr& cluster_info) {
     163           0 :   Thread::LockGuard lock(map_mutex_);
     164           0 :   info_map_.erase(cluster_info->cluster_name_);
     165           0 : }
     166             : 
     167             : } // namespace Redis
     168             : } // namespace Common
     169             : } // namespace Extensions
     170             : } // namespace Envoy

Generated by: LCOV version 1.15