Line data Source code
1 : #pragma once 2 : 3 : #include <array> 4 : #include <atomic> 5 : #include <numeric> 6 : 7 : #include "envoy/event/dispatcher.h" 8 : #include "envoy/singleton/instance.h" 9 : #include "envoy/upstream/cluster_manager.h" 10 : 11 : #include "source/common/common/lock_guard.h" 12 : #include "source/common/common/thread.h" 13 : #include "source/extensions/common/redis/cluster_refresh_manager.h" 14 : 15 : namespace Envoy { 16 : namespace Extensions { 17 : namespace Common { 18 : namespace Redis { 19 : 20 : class ClusterRefreshManagerImpl : public ClusterRefreshManager, 21 : public Envoy::Singleton::Instance, 22 : public std::enable_shared_from_this<ClusterRefreshManagerImpl> { 23 : public: 24 : friend class ClusterRefreshManagerTest; 25 : 26 : /** 27 : * The information that the manager keeps for each cluster upon registration. 28 : */ 29 : struct ClusterInfo { 30 : ClusterInfo(std::string cluster_name, std::chrono::milliseconds min_time_between_triggering, 31 : const uint32_t redirects_threshold, const uint32_t failure_threshold, 32 : const uint32_t host_degraded_threshold, RefreshCB cb) 33 : : cluster_name_(std::move(cluster_name)), 34 : min_time_between_triggering_(min_time_between_triggering), 35 : redirects_threshold_(redirects_threshold), failure_threshold_(failure_threshold), 36 0 : host_degraded_threshold_(host_degraded_threshold), cb_(std::move(cb)) {} 37 : std::string cluster_name_; 38 : std::atomic<uint64_t> last_callback_time_ms_{}; 39 : std::atomic<uint32_t> redirects_count_{}; 40 : std::atomic<uint32_t> failures_count_{}; 41 : std::atomic<uint32_t> host_degraded_count_{}; 42 : std::chrono::milliseconds min_time_between_triggering_; 43 : const uint32_t redirects_threshold_; 44 : const uint32_t failure_threshold_; 45 : const uint32_t host_degraded_threshold_; 46 : RefreshCB cb_; 47 : }; 48 : 49 : using ClusterInfoSharedPtr = std::shared_ptr<ClusterInfo>; 50 : 51 : class HandleImpl : public Handle { 52 : public: 53 : HandleImpl(ClusterRefreshManagerImpl* mgr, ClusterInfoSharedPtr& cluster_info) 54 0 : : manager_(mgr->shared_from_this()), cluster_info_(cluster_info) {} 55 : 56 0 : ~HandleImpl() override { manager_->unregisterCluster(cluster_info_); } 57 : 58 : private: 59 : const std::shared_ptr<ClusterRefreshManagerImpl> manager_; 60 : const std::shared_ptr<ClusterInfo> cluster_info_; 61 : }; 62 : 63 : ClusterRefreshManagerImpl(Event::Dispatcher& main_thread_dispatcher, Upstream::ClusterManager& cm, 64 : TimeSource& time_source) 65 0 : : main_thread_dispatcher_(main_thread_dispatcher), cm_(cm), time_source_(time_source) {} 66 : 67 : bool onRedirection(const std::string& cluster_name) override; 68 : bool onFailure(const std::string& cluster_name) override; 69 : bool onHostDegraded(const std::string& cluster_name) override; 70 : 71 : HandlePtr registerCluster(const std::string& cluster_name, 72 : std::chrono::milliseconds min_time_between_triggering, 73 : const uint32_t redirects_threshold, const uint32_t failure_threshold, 74 : const uint32_t host_degraded_threshold, const RefreshCB& cb) override; 75 : 76 : private: 77 : void unregisterCluster(const ClusterInfoSharedPtr& cluster_info); 78 : /** 79 : * The type of events that can trigger discovery 80 : */ 81 : enum EventType { 82 : // MOVE or ASK redirection 83 : Redirection, 84 : // Failure 85 : Failure, 86 : // Sending request to degraded/unhealthy host 87 : DegradedHost 88 : }; 89 : 90 : bool onEvent(const std::string& cluster_name, EventType event_type); 91 : 92 : Event::Dispatcher& main_thread_dispatcher_; 93 : Upstream::ClusterManager& cm_; 94 : TimeSource& time_source_; 95 : std::map<std::string, ClusterInfoSharedPtr> info_map_ ABSL_GUARDED_BY(map_mutex_); 96 : Thread::MutexBasicLockable map_mutex_; 97 : }; 98 : 99 : ClusterRefreshManagerSharedPtr getClusterRefreshManager(Singleton::Manager& manager, 100 : Event::Dispatcher& main_thread_dispatcher, 101 : Upstream::ClusterManager& cm, 102 : TimeSource& time_source); 103 : } // namespace Redis 104 : } // namespace Common 105 : } // namespace Extensions 106 : } // namespace Envoy