Line data Source code
1 : #include "source/extensions/common/redis/cluster_refresh_manager_impl.h" 2 : 3 : #include "envoy/singleton/manager.h" 4 : 5 : namespace Envoy { 6 : namespace Extensions { 7 : namespace Common { 8 : namespace Redis { 9 : 10 : SINGLETON_MANAGER_REGISTRATION(redis_refresh_manager); 11 : 12 : ClusterRefreshManagerSharedPtr getClusterRefreshManager(Singleton::Manager& manager, 13 : Event::Dispatcher& main_thread_dispatcher, 14 : Upstream::ClusterManager& cm, 15 0 : TimeSource& time_source) { 16 0 : return manager.getTyped<ClusterRefreshManager>( 17 0 : SINGLETON_MANAGER_REGISTERED_NAME(redis_refresh_manager), [&] { 18 0 : return std::make_shared<ClusterRefreshManagerImpl>(main_thread_dispatcher, cm, time_source); 19 0 : }); 20 0 : } 21 : 22 0 : bool ClusterRefreshManagerImpl::onFailure(const std::string& cluster_name) { 23 0 : return onEvent(cluster_name, EventType::Failure); 24 0 : } 25 : 26 0 : bool ClusterRefreshManagerImpl::onHostDegraded(const std::string& cluster_name) { 27 0 : return onEvent(cluster_name, EventType::DegradedHost); 28 0 : } 29 : 30 0 : bool ClusterRefreshManagerImpl::onRedirection(const std::string& cluster_name) { 31 0 : return onEvent(cluster_name, EventType::Redirection); 32 0 : } 33 : 34 0 : bool ClusterRefreshManagerImpl::onEvent(const std::string& cluster_name, EventType event_type) { 35 0 : ClusterInfoSharedPtr info; 36 0 : { 37 : // Hold the map lock to avoid a race condition with calls to unregisterCluster 38 : // on the main thread. 39 0 : Thread::LockGuard lock(map_mutex_); 40 0 : auto it = info_map_.find(cluster_name); 41 0 : if (it != info_map_.end()) { 42 0 : info = it->second; 43 0 : } 44 0 : } 45 : // No locks needed for thread safety while accessing clusterInfoSharedPtr members as 46 : // all potentially modified members are atomic (redirects_count_, last_callback_time_ms_). 47 0 : if (info.get()) { 48 0 : const uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>( 49 0 : time_source_.monotonicTime().time_since_epoch()) 50 0 : .count(); 51 0 : uint64_t last_callback_time_ms = info->last_callback_time_ms_.load(); 52 0 : if (!last_callback_time_ms || 53 0 : (now >= (last_callback_time_ms + info->min_time_between_triggering_.count()))) { 54 0 : std::atomic<uint32_t>* count; 55 0 : uint32_t threshold; 56 0 : switch (event_type) { 57 0 : case EventType::Redirection: { 58 0 : count = &(info->redirects_count_); 59 0 : threshold = info->redirects_threshold_; 60 0 : break; 61 0 : } 62 0 : case EventType::DegradedHost: { 63 0 : count = &(info->host_degraded_count_); 64 0 : threshold = info->host_degraded_threshold_; 65 0 : break; 66 0 : } 67 0 : case EventType::Failure: { 68 0 : count = &(info->failures_count_); 69 0 : threshold = info->failure_threshold_; 70 0 : break; 71 0 : } 72 0 : } 73 0 : if (threshold <= 0) { 74 0 : return false; 75 0 : } 76 : 77 : // There're 3 updates to atomic values cross threads in this section of code 78 : // a) ++(*count) >= threshold 79 : // b) info->last_callback_time_ms_.compare_exchange_strong(last_callback_time_ms, now) 80 : // c) *count = 0 81 : // Let's say there're 2 threads T1 and T2, for all legal permutation of execution order a, b, 82 : // c we need to ensure that post_callback is true for only 1 thread and if both a) and b) are 83 : // true in 1 thread the count is 0 after this section. Here's a few different sequence that 84 : // can potentially result in race conditions to consider 85 : 86 : // Sequence 1: 87 : // starting condition: threshold:2, count:1, T1.last_call_back = T2.last_call_back = 88 : // info.last_call_back 89 : // * T1.a (count: 2) 90 : // * T1.b succeed (info.last_call_back: T1.now, T1.post_callback: true) 91 : // * T1.c (count:0) 92 : // * T2.a (count: 1, T2.post_callback: false) 93 : // * T2.b is skip since T2.a is false 94 : // * T2.c will still be triggered since info.last_call_back is now changed by T1 (count: 0) 95 : // 96 : // Sequence 2: 97 : // starting condition: threshold:2, count:1, T1.last_call_back = T2.last_call_back = 98 : // info.last_call_back 99 : // * T1.a (count: 2) 100 : // * T2.a (count: 3) 101 : // * T1.b succeed (info.last_call_back: T1.now, post_callback: true) 102 : // * T2.b failed due since info.last_call_back is now T1.now 103 : // * T1.c (count:0) 104 : // * T2.c (count:0) note we can't use count.decrement here since count is already 0 105 : // 106 : // Sequence 3: 107 : // starting condition: threshold:2, count:1, T1.last_call_back == T2.last_call_back == 108 : // info.last_call_back 109 : // * T1.a (count: 1, T1.post_callback: false) 110 : // * T1.b skip since T1.a is false 111 : // * T2.a (count: 2, T2.post_callback: true) 112 : // * T2.b succeed (info.last_call_back = T2.now) 113 : // * T2.c (count: 0) 114 : // * T1.c will be triggered since info.last_call_back is changed by T2 (count: 0) 115 : 116 0 : bool post_callback = false; 117 : // ignore redirects during min time between triggering 118 0 : if ((++(*count) >= threshold) && 119 0 : (info->last_callback_time_ms_.compare_exchange_strong(last_callback_time_ms, now))) { 120 : // last_callback_time_ms_ successfully updated without any changes since it was 121 : // initially read. This thread is allowed to post a call to the registered callback 122 : // on the main thread. Otherwise, the thread would be ignored to prevent over-triggering 123 : // cluster callbacks. 124 0 : post_callback = true; 125 0 : } 126 : 127 : // If a callback should be triggered(in this or some other thread) signaled by the changed 128 : // last callback time, we reset the count to 0 129 0 : if (post_callback || info->last_callback_time_ms_.load() != last_callback_time_ms) { 130 0 : *count = 0; 131 0 : } 132 : 133 0 : if (post_callback) { 134 0 : main_thread_dispatcher_.post([this, cluster_name, info]() { 135 : // Ensure that cluster is still active before calling callback. 136 0 : auto maps = cm_.clusters(); 137 0 : auto it = maps.active_clusters_.find(cluster_name); 138 0 : if (it != maps.active_clusters_.end()) { 139 0 : info->cb_(); 140 0 : } 141 0 : }); 142 0 : return true; 143 0 : } 144 0 : } 145 0 : } 146 0 : return false; 147 0 : } 148 : 149 : ClusterRefreshManagerImpl::HandlePtr ClusterRefreshManagerImpl::registerCluster( 150 : const std::string& cluster_name, std::chrono::milliseconds min_time_between_triggering, 151 : const uint32_t redirects_threshold, const uint32_t failure_threshold, 152 0 : const uint32_t host_degraded_threshold, const RefreshCB& cb) { 153 0 : Thread::LockGuard lock(map_mutex_); 154 0 : ClusterInfoSharedPtr info = 155 0 : std::make_shared<ClusterInfo>(cluster_name, min_time_between_triggering, redirects_threshold, 156 0 : failure_threshold, host_degraded_threshold, cb); 157 0 : info_map_[cluster_name] = info; 158 : 159 0 : return std::make_unique<ClusterRefreshManagerImpl::HandleImpl>(this, info); 160 0 : } 161 : 162 0 : void ClusterRefreshManagerImpl::unregisterCluster(const ClusterInfoSharedPtr& cluster_info) { 163 0 : Thread::LockGuard lock(map_mutex_); 164 0 : info_map_.erase(cluster_info->cluster_name_); 165 0 : } 166 : 167 : } // namespace Redis 168 : } // namespace Common 169 : } // namespace Extensions 170 : } // namespace Envoy