1
#pragma once
2

            
3
#include <list>
4
#include <memory>
5
#include <string>
6
#include <utility>
7

            
8
#include "envoy/upstream/cluster_manager.h"
9

            
10
#include "source/common/common/logger.h"
11

            
12
#include "absl/container/flat_hash_map.h"
13
#include "absl/strings/string_view.h"
14
#include "absl/types/optional.h"
15

            
16
namespace Envoy {
17
namespace Upstream {
18

            
19
/**
20
 * A base class for cluster lifecycle handler. Mostly to avoid a dependency on
21
 * ThreadLocalClusterManagerImpl in ClusterDiscoveryManager.
22
 */
23
class ClusterLifecycleCallbackHandler {
24
public:
25
21681
  virtual ~ClusterLifecycleCallbackHandler() = default;
26

            
27
  virtual ClusterUpdateCallbacksHandlePtr
28
  addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) PURE;
29
};
30

            
31
/**
32
 * A thread-local on-demand cluster discovery manager. It takes care of invoking the discovery
33
 * callbacks in the event of a finished discovery. It does it by installing a cluster lifecycle
34
 * callback that invokes the discovery callbacks when a matching cluster just got added.
35
 *
36
 * The manager is the sole owner of the added discovery callbacks. The only way to remove the
37
 * callback from the manager is by destroying the discovery handle.
38
 */
39
class ClusterDiscoveryManager : Logger::Loggable<Logger::Id::upstream> {
40
private:
41
  struct CallbackListItem;
42
  using CallbackListItemSharedPtr = std::shared_ptr<CallbackListItem>;
43
  using CallbackListItemWeakPtr = std::weak_ptr<CallbackListItem>;
44
  using CallbackList = std::list<CallbackListItemSharedPtr>;
45
  using CallbackListIterator = CallbackList::iterator;
46

            
47
public:
48
  /**
49
   * This class is used in a case when the cluster manager in the main thread notices that it
50
   * already has the requested cluster, so instead of starting the discovery process, it schedules
51
   * the invocation of the callback back to the thread that made the request. Invoking the request
52
   * removes it from the manager.
53
   */
54
  class CallbackInvoker {
55
  public:
56
34
    void invokeCallback(ClusterDiscoveryStatus cluster_status) const {
57
34
      parent_.invokeCallbackFromItem(name_, item_weak_ptr_, cluster_status);
58
34
    }
59

            
60
  private:
61
    friend class ClusterDiscoveryManager;
62

            
63
    CallbackInvoker(ClusterDiscoveryManager& parent, std::string name,
64
                    CallbackListItemWeakPtr item_weak_ptr)
65
457
        : parent_(parent), name_(std::move(name)), item_weak_ptr_(std::move(item_weak_ptr)) {}
66

            
67
    ClusterDiscoveryManager& parent_;
68
    const std::string name_;
69
    CallbackListItemWeakPtr item_weak_ptr_;
70
  };
71

            
72
  ClusterDiscoveryManager(std::string thread_name,
73
                          ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler);
74

            
75
  /**
76
   * Invoke the callbacks for the given cluster name. The discovery status is passed to the
77
   * callbacks. After invoking the callbacks, they are dropped from the manager.
78
   */
79
  void processClusterName(absl::string_view name, ClusterDiscoveryStatus cluster_status);
80

            
81
  /**
82
   * A struct containing a discovery handle, information whether a discovery for a given cluster
83
   * was already requested in this thread, and an immediate invocation context.
84
   */
85
  struct AddedCallbackData {
86
    ClusterDiscoveryCallbackHandlePtr handle_ptr_;
87
    bool discovery_in_progress_;
88
    CallbackInvoker invoker_;
89
  };
90

            
91
  /**
92
   * Adds the discovery callback. Returns a handle and a boolean indicating whether this worker
93
   * thread has already requested the discovery of a cluster with a given name.
94
   */
95
  AddedCallbackData addCallback(std::string name, ClusterDiscoveryCallbackPtr callback);
96

            
97
  /**
98
   * Swaps this manager with another. Used for tests only.
99
   */
100
  void swap(ClusterDiscoveryManager& other);
101

            
102
private:
103
  /**
104
   * An item in the callbacks list. It contains the iterator to itself inside the callbacks
105
   * list. Since the list contains shared pointers to items, we know that the iterator is valid as
106
   * long as the item is alive.
107
   */
108
  struct CallbackListItem {
109
457
    CallbackListItem(ClusterDiscoveryCallbackPtr callback) : callback_(std::move(callback)) {}
110

            
111
    ClusterDiscoveryCallbackPtr callback_;
112
    absl::optional<CallbackListIterator> self_iterator_;
113
  };
114

            
115
  /**
116
   * An implementation of discovery handle. Destroy it to drop the callback from the discovery
117
   * manager. It won't stop the discovery process, though.
118
   */
119
  class ClusterDiscoveryCallbackHandleImpl : public ClusterDiscoveryCallbackHandle {
120
  public:
121
    ClusterDiscoveryCallbackHandleImpl(ClusterDiscoveryManager& parent, std::string name,
122
                                       CallbackListItemWeakPtr item_weak_ptr)
123
457
        : parent_(parent), name_(std::move(name)), item_weak_ptr_(std::move(item_weak_ptr)) {}
124

            
125
457
    ~ClusterDiscoveryCallbackHandleImpl() override {
126
457
      parent_.erase(name_, std::move(item_weak_ptr_));
127
457
    }
128

            
129
  private:
130
    ClusterDiscoveryManager& parent_;
131
    const std::string name_;
132
    CallbackListItemWeakPtr item_weak_ptr_;
133
  };
134

            
135
  /**
136
   * Invokes a callback stored in the item and removes it from the callbacks list, so it won't be
137
   * invoked again.
138
   */
139
  void invokeCallbackFromItem(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr,
140
                              ClusterDiscoveryStatus cluster_status);
141

            
142
  /**
143
   * Extracts the list of callbacks from the pending_clusters_ map. This action invalidates the
144
   * self iterators in the items, so destroying the handle won't try to erase the element from the
145
   * list using an invalid iterator.
146
   */
147
  CallbackList extractCallbackList(absl::string_view name);
148
  /**
149
   * Creates and sets up the callback list item, adds to the list and returns a weak_ptr to the
150
   * item.
151
   */
152
  CallbackListItemWeakPtr addCallbackInternal(CallbackList& list,
153
                                              ClusterDiscoveryCallbackPtr callback);
154
  /**
155
   * Drops the callback item from the discovery manager. It the item wasn't stale, the callback
156
   * will not be invoked. Called when the discovery handle is destroyed.
157
   */
158
  void erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr);
159
  /**
160
   * Drops the callback item from the discovery manager.
161
   */
162
  void eraseItem(absl::string_view name, CallbackListItemSharedPtr item_ptr);
163
  /**
164
   * Try to erase a callback from under the given iterator. It returns a boolean value indicating
165
   * whether the dropped callback was a last one for the given cluster.
166
   */
167
  bool eraseFromList(absl::string_view name, CallbackListIterator it);
168

            
169
  std::string thread_name_;
170
  absl::flat_hash_map<std::string, CallbackList> pending_clusters_;
171
  std::unique_ptr<ClusterUpdateCallbacks> callbacks_;
172
  ClusterUpdateCallbacksHandlePtr callbacks_handle_;
173
};
174

            
175
} // namespace Upstream
176
} // namespace Envoy