1
#include "source/extensions/common/redis/cluster_refresh_manager_impl.h"
2

            
3
#include "envoy/singleton/manager.h"
4

            
5
namespace Envoy {
6
namespace Extensions {
7
namespace Common {
8
namespace Redis {
9

            
10
SINGLETON_MANAGER_REGISTRATION(redis_refresh_manager);
11

            
12
ClusterRefreshManagerSharedPtr getClusterRefreshManager(Singleton::Manager& manager,
13
                                                        Event::Dispatcher& main_thread_dispatcher,
14
                                                        Upstream::ClusterManager& cm,
15
96
                                                        TimeSource& time_source) {
16
96
  return manager.getTyped<ClusterRefreshManager>(
17
96
      SINGLETON_MANAGER_REGISTERED_NAME(redis_refresh_manager), [&] {
18
88
        return std::make_shared<ClusterRefreshManagerImpl>(main_thread_dispatcher, cm, time_source);
19
88
      });
20
96
}
21

            
22
120009
bool ClusterRefreshManagerImpl::onFailure(const std::string& cluster_name) {
23
120009
  return onEvent(cluster_name, EventType::Failure);
24
120009
}
25

            
26
119987
bool ClusterRefreshManagerImpl::onHostDegraded(const std::string& cluster_name) {
27
119987
  return onEvent(cluster_name, EventType::DegradedHost);
28
119987
}
29

            
30
120014
bool ClusterRefreshManagerImpl::onRedirection(const std::string& cluster_name) {
31
120014
  return onEvent(cluster_name, EventType::Redirection);
32
120014
}
33

            
34
360010
bool ClusterRefreshManagerImpl::onEvent(const std::string& cluster_name, EventType event_type) {
35
360010
  ClusterInfoSharedPtr info;
36
360010
  {
37
    // Hold the map lock to avoid a race condition with calls to unregisterCluster
38
    // on the main thread.
39
360010
    Thread::LockGuard lock(map_mutex_);
40
360010
    auto it = info_map_.find(cluster_name);
41
360010
    if (it != info_map_.end()) {
42
359985
      info = it->second;
43
359985
    }
44
360010
  }
45
  // No locks needed for thread safety while accessing clusterInfoSharedPtr members as
46
  // all potentially modified members are atomic (redirects_count_, last_callback_time_ms_).
47
360010
  if (info.get()) {
48
359985
    const uint64_t now = std::chrono::duration_cast<std::chrono::milliseconds>(
49
359985
                             time_source_.monotonicTime().time_since_epoch())
50
359985
                             .count();
51
359985
    uint64_t last_callback_time_ms = info->last_callback_time_ms_.load();
52
359985
    if (!last_callback_time_ms ||
53
359985
        (now >= (last_callback_time_ms + info->min_time_between_triggering_.count()))) {
54
30044
      std::atomic<uint32_t>* count;
55
30044
      uint32_t threshold;
56
30044
      switch (event_type) {
57
10025
      case EventType::Redirection: {
58
10025
        count = &(info->redirects_count_);
59
10025
        threshold = info->redirects_threshold_;
60
10025
        break;
61
      }
62
10004
      case EventType::DegradedHost: {
63
10004
        count = &(info->host_degraded_count_);
64
10004
        threshold = info->host_degraded_threshold_;
65
10004
        break;
66
      }
67
10015
      case EventType::Failure: {
68
10015
        count = &(info->failures_count_);
69
10015
        threshold = info->failure_threshold_;
70
10015
        break;
71
      }
72
30044
      }
73
30044
      if (threshold <= 0) {
74
4
        return false;
75
4
      }
76

            
77
      // There're 3 updates to atomic values cross threads in this section of code
78
      // a) ++(*count) >= threshold
79
      // b) info->last_callback_time_ms_.compare_exchange_strong(last_callback_time_ms, now)
80
      // c) *count = 0
81
      // Let's say there're 2 threads T1 and T2, for all legal permutation of execution order a, b,
82
      // c we need to ensure that post_callback is true for only 1 thread and if both a) and b) are
83
      // true in 1 thread the count is 0 after this section. Here's a few different sequence that
84
      // can potentially result in race conditions to consider
85

            
86
      // Sequence 1:
87
      // starting condition: threshold:2, count:1, T1.last_call_back = T2.last_call_back =
88
      // info.last_call_back
89
      // * T1.a (count: 2)
90
      // * T1.b succeed (info.last_call_back: T1.now, T1.post_callback: true)
91
      // * T1.c (count:0)
92
      // * T2.a (count: 1, T2.post_callback: false)
93
      // * T2.b is skip since T2.a is false
94
      // * T2.c will still be triggered since info.last_call_back is now changed by T1 (count: 0)
95
      //
96
      // Sequence 2:
97
      // starting condition: threshold:2, count:1, T1.last_call_back = T2.last_call_back =
98
      // info.last_call_back
99
      // * T1.a (count: 2)
100
      // * T2.a (count: 3)
101
      // * T1.b succeed (info.last_call_back: T1.now, post_callback: true)
102
      // * T2.b failed due since info.last_call_back is now T1.now
103
      // * T1.c (count:0)
104
      // * T2.c (count:0) note we can't use count.decrement here since count is already 0
105
      //
106
      // Sequence 3:
107
      // starting condition: threshold:2, count:1, T1.last_call_back == T2.last_call_back ==
108
      // info.last_call_back
109
      // * T1.a (count: 1, T1.post_callback: false)
110
      // * T1.b skip since T1.a is false
111
      // * T2.a (count: 2, T2.post_callback: true)
112
      // * T2.b succeed (info.last_call_back = T2.now)
113
      // * T2.c (count: 0)
114
      // * T1.c will be triggered since info.last_call_back is changed by T2 (count: 0)
115

            
116
30040
      bool post_callback = false;
117
      // ignore redirects during min time between triggering
118
30040
      if ((++(*count) >= threshold) &&
119
30040
          (info->last_callback_time_ms_.compare_exchange_strong(last_callback_time_ms, now))) {
120
        // last_callback_time_ms_ successfully updated without any changes since it was
121
        // initially read. This thread is allowed to post a call to the registered callback
122
        // on the main thread. Otherwise, the thread would be ignored to prevent over-triggering
123
        // cluster callbacks.
124
38
        post_callback = true;
125
38
      }
126

            
127
      // If a callback should be triggered(in this or some other thread) signaled by the changed
128
      // last callback time, we reset the count to 0
129
30040
      if (post_callback || info->last_callback_time_ms_.load() != last_callback_time_ms) {
130
38
        *count = 0;
131
38
      }
132

            
133
30040
      if (post_callback) {
134
38
        main_thread_dispatcher_.post([this, cluster_name, info]() {
135
          // Ensure that cluster is still active before calling callback.
136
38
          auto maps = cm_.clusters();
137
38
          auto it = maps.active_clusters_.find(cluster_name);
138
38
          if (it != maps.active_clusters_.end()) {
139
38
            info->cb_();
140
38
          }
141
38
        });
142
38
        return true;
143
38
      }
144
30040
    }
145
359985
  }
146
359968
  return false;
147
360010
}
148

            
149
ClusterRefreshManagerImpl::HandlePtr ClusterRefreshManagerImpl::registerCluster(
150
    const std::string& cluster_name, std::chrono::milliseconds min_time_between_triggering,
151
    const uint32_t redirects_threshold, const uint32_t failure_threshold,
152
36
    const uint32_t host_degraded_threshold, const RefreshCB& cb) {
153
36
  Thread::LockGuard lock(map_mutex_);
154
36
  ClusterInfoSharedPtr info =
155
36
      std::make_shared<ClusterInfo>(cluster_name, min_time_between_triggering, redirects_threshold,
156
36
                                    failure_threshold, host_degraded_threshold, cb);
157
36
  info_map_[cluster_name] = info;
158

            
159
36
  return std::make_unique<ClusterRefreshManagerImpl::HandleImpl>(this, info);
160
36
}
161

            
162
36
void ClusterRefreshManagerImpl::unregisterCluster(const ClusterInfoSharedPtr& cluster_info) {
163
36
  Thread::LockGuard lock(map_mutex_);
164
36
  info_map_.erase(cluster_info->cluster_name_);
165
36
}
166

            
167
} // namespace Redis
168
} // namespace Common
169
} // namespace Extensions
170
} // namespace Envoy