1
#pragma once
2

            
3
#include <array>
4
#include <atomic>
5
#include <numeric>
6

            
7
#include "envoy/event/dispatcher.h"
8
#include "envoy/singleton/instance.h"
9
#include "envoy/upstream/cluster_manager.h"
10

            
11
#include "source/common/common/lock_guard.h"
12
#include "source/common/common/thread.h"
13
#include "source/extensions/common/redis/cluster_refresh_manager.h"
14

            
15
namespace Envoy {
16
namespace Extensions {
17
namespace Common {
18
namespace Redis {
19

            
20
class ClusterRefreshManagerImpl : public ClusterRefreshManager,
21
                                  public Envoy::Singleton::Instance,
22
                                  public std::enable_shared_from_this<ClusterRefreshManagerImpl> {
23
public:
24
  friend class ClusterRefreshManagerTest;
25

            
26
  /**
27
   * The information that the manager keeps for each cluster upon registration.
28
   */
29
  struct ClusterInfo {
30
    ClusterInfo(std::string cluster_name, std::chrono::milliseconds min_time_between_triggering,
31
                const uint32_t redirects_threshold, const uint32_t failure_threshold,
32
                const uint32_t host_degraded_threshold, RefreshCB cb)
33
36
        : cluster_name_(std::move(cluster_name)),
34
36
          min_time_between_triggering_(min_time_between_triggering),
35
36
          redirects_threshold_(redirects_threshold), failure_threshold_(failure_threshold),
36
36
          host_degraded_threshold_(host_degraded_threshold), cb_(std::move(cb)) {}
37
    std::string cluster_name_;
38
    std::atomic<uint64_t> last_callback_time_ms_{};
39
    std::atomic<uint32_t> redirects_count_{};
40
    std::atomic<uint32_t> failures_count_{};
41
    std::atomic<uint32_t> host_degraded_count_{};
42
    std::chrono::milliseconds min_time_between_triggering_;
43
    const uint32_t redirects_threshold_;
44
    const uint32_t failure_threshold_;
45
    const uint32_t host_degraded_threshold_;
46
    RefreshCB cb_;
47
  };
48

            
49
  using ClusterInfoSharedPtr = std::shared_ptr<ClusterInfo>;
50

            
51
  class HandleImpl : public Handle {
52
  public:
53
    HandleImpl(ClusterRefreshManagerImpl* mgr, ClusterInfoSharedPtr& cluster_info)
54
36
        : manager_(mgr->shared_from_this()), cluster_info_(cluster_info) {}
55

            
56
36
    ~HandleImpl() override { manager_->unregisterCluster(cluster_info_); }
57

            
58
  private:
59
    const std::shared_ptr<ClusterRefreshManagerImpl> manager_;
60
    const std::shared_ptr<ClusterInfo> cluster_info_;
61
  };
62

            
63
  ClusterRefreshManagerImpl(Event::Dispatcher& main_thread_dispatcher, Upstream::ClusterManager& cm,
64
                            TimeSource& time_source)
65
93
      : main_thread_dispatcher_(main_thread_dispatcher), cm_(cm), time_source_(time_source) {}
66

            
67
  bool onRedirection(const std::string& cluster_name) override;
68
  bool onFailure(const std::string& cluster_name) override;
69
  bool onHostDegraded(const std::string& cluster_name) override;
70

            
71
  HandlePtr registerCluster(const std::string& cluster_name,
72
                            std::chrono::milliseconds min_time_between_triggering,
73
                            const uint32_t redirects_threshold, const uint32_t failure_threshold,
74
                            const uint32_t host_degraded_threshold, const RefreshCB& cb) override;
75

            
76
private:
77
  void unregisterCluster(const ClusterInfoSharedPtr& cluster_info);
78
  /**
79
   * The type of events that can trigger discovery
80
   */
81
  enum EventType {
82
    // MOVE or ASK redirection
83
    Redirection,
84
    // Failure
85
    Failure,
86
    // Sending request to degraded/unhealthy host
87
    DegradedHost
88
  };
89

            
90
  bool onEvent(const std::string& cluster_name, EventType event_type);
91

            
92
  Event::Dispatcher& main_thread_dispatcher_;
93
  Upstream::ClusterManager& cm_;
94
  TimeSource& time_source_;
95
  std::map<std::string, ClusterInfoSharedPtr> info_map_ ABSL_GUARDED_BY(map_mutex_);
96
  Thread::MutexBasicLockable map_mutex_;
97
};
98

            
99
ClusterRefreshManagerSharedPtr getClusterRefreshManager(Singleton::Manager& manager,
100
                                                        Event::Dispatcher& main_thread_dispatcher,
101
                                                        Upstream::ClusterManager& cm,
102
                                                        TimeSource& time_source);
103
} // namespace Redis
104
} // namespace Common
105
} // namespace Extensions
106
} // namespace Envoy