Line data Source code
1 : #pragma once 2 : 3 : #include <list> 4 : #include <memory> 5 : #include <string> 6 : #include <utility> 7 : 8 : #include "envoy/upstream/cluster_manager.h" 9 : 10 : #include "source/common/common/logger.h" 11 : 12 : #include "absl/container/flat_hash_map.h" 13 : #include "absl/strings/string_view.h" 14 : #include "absl/types/optional.h" 15 : 16 : namespace Envoy { 17 : namespace Upstream { 18 : 19 : /** 20 : * A base class for cluster lifecycle handler. Mostly to avoid a dependency on 21 : * ThreadLocalClusterManagerImpl in ClusterDiscoveryManager. 22 : */ 23 : class ClusterLifecycleCallbackHandler { 24 : public: 25 223 : virtual ~ClusterLifecycleCallbackHandler() = default; 26 : 27 : virtual ClusterUpdateCallbacksHandlePtr 28 : addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) PURE; 29 : }; 30 : 31 : /** 32 : * A thread-local on-demand cluster discovery manager. It takes care of invoking the discovery 33 : * callbacks in the event of a finished discovery. It does it by installing a cluster lifecycle 34 : * callback that invokes the discovery callbacks when a matching cluster just got added. 35 : * 36 : * The manager is the sole owner of the added discovery callbacks. The only way to remove the 37 : * callback from the manager is by destroying the discovery handle. 38 : */ 39 : class ClusterDiscoveryManager : Logger::Loggable<Logger::Id::upstream> { 40 : private: 41 : struct CallbackListItem; 42 : using CallbackListItemSharedPtr = std::shared_ptr<CallbackListItem>; 43 : using CallbackListItemWeakPtr = std::weak_ptr<CallbackListItem>; 44 : using CallbackList = std::list<CallbackListItemSharedPtr>; 45 : using CallbackListIterator = CallbackList::iterator; 46 : 47 : public: 48 : /** 49 : * This class is used in a case when the cluster manager in the main thread notices that it 50 : * already has the requested cluster, so instead of starting the discovery process, it schedules 51 : * the invocation of the callback back to the thread that made the request. Invoking the request 52 : * removes it from the manager. 53 : */ 54 : class CallbackInvoker { 55 : public: 56 0 : void invokeCallback(ClusterDiscoveryStatus cluster_status) const { 57 0 : parent_.invokeCallbackFromItem(name_, item_weak_ptr_, cluster_status); 58 0 : } 59 : 60 : private: 61 : friend class ClusterDiscoveryManager; 62 : 63 : CallbackInvoker(ClusterDiscoveryManager& parent, std::string name, 64 : CallbackListItemWeakPtr item_weak_ptr) 65 0 : : parent_(parent), name_(std::move(name)), item_weak_ptr_(std::move(item_weak_ptr)) {} 66 : 67 : ClusterDiscoveryManager& parent_; 68 : const std::string name_; 69 : CallbackListItemWeakPtr item_weak_ptr_; 70 : }; 71 : 72 : ClusterDiscoveryManager(std::string thread_name, 73 : ClusterLifecycleCallbackHandler& lifecycle_callbacks_handler); 74 : 75 : /** 76 : * Invoke the callbacks for the given cluster name. The discovery status is passed to the 77 : * callbacks. After invoking the callbacks, they are dropped from the manager. 78 : */ 79 : void processClusterName(absl::string_view name, ClusterDiscoveryStatus cluster_status); 80 : 81 : /** 82 : * A struct containing a discovery handle, information whether a discovery for a given cluster 83 : * was already requested in this thread, and an immediate invocation context. 84 : */ 85 : struct AddedCallbackData { 86 : ClusterDiscoveryCallbackHandlePtr handle_ptr_; 87 : bool discovery_in_progress_; 88 : CallbackInvoker invoker_; 89 : }; 90 : 91 : /** 92 : * Adds the discovery callback. Returns a handle and a boolean indicating whether this worker 93 : * thread has already requested the discovery of a cluster with a given name. 94 : */ 95 : AddedCallbackData addCallback(std::string name, ClusterDiscoveryCallbackPtr callback); 96 : 97 : /** 98 : * Swaps this manager with another. Used for tests only. 99 : */ 100 : void swap(ClusterDiscoveryManager& other); 101 : 102 : private: 103 : /** 104 : * An item in the callbacks list. It contains the iterator to itself inside the callbacks 105 : * list. Since the list contains shared pointers to items, we know that the iterator is valid as 106 : * long as the item is alive. 107 : */ 108 : struct CallbackListItem { 109 0 : CallbackListItem(ClusterDiscoveryCallbackPtr callback) : callback_(std::move(callback)) {} 110 : 111 : ClusterDiscoveryCallbackPtr callback_; 112 : absl::optional<CallbackListIterator> self_iterator_; 113 : }; 114 : 115 : /** 116 : * An implementation of discovery handle. Destroy it to drop the callback from the discovery 117 : * manager. It won't stop the discovery process, though. 118 : */ 119 : class ClusterDiscoveryCallbackHandleImpl : public ClusterDiscoveryCallbackHandle { 120 : public: 121 : ClusterDiscoveryCallbackHandleImpl(ClusterDiscoveryManager& parent, std::string name, 122 : CallbackListItemWeakPtr item_weak_ptr) 123 0 : : parent_(parent), name_(std::move(name)), item_weak_ptr_(std::move(item_weak_ptr)) {} 124 : 125 0 : ~ClusterDiscoveryCallbackHandleImpl() override { 126 0 : parent_.erase(name_, std::move(item_weak_ptr_)); 127 0 : } 128 : 129 : private: 130 : ClusterDiscoveryManager& parent_; 131 : const std::string name_; 132 : CallbackListItemWeakPtr item_weak_ptr_; 133 : }; 134 : 135 : /** 136 : * Invokes a callback stored in the item and removes it from the callbacks list, so it won't be 137 : * invoked again. 138 : */ 139 : void invokeCallbackFromItem(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr, 140 : ClusterDiscoveryStatus cluster_status); 141 : 142 : /** 143 : * Extracts the list of callbacks from the pending_clusters_ map. This action invalidates the 144 : * self iterators in the items, so destroying the handle won't try to erase the element from the 145 : * list using an invalid iterator. 146 : */ 147 : CallbackList extractCallbackList(absl::string_view name); 148 : /** 149 : * Creates and sets up the callback list item, adds to the list and returns a weak_ptr to the 150 : * item. 151 : */ 152 : CallbackListItemWeakPtr addCallbackInternal(CallbackList& list, 153 : ClusterDiscoveryCallbackPtr callback); 154 : /** 155 : * Drops the callback item from the discovery manager. It the item wasn't stale, the callback 156 : * will not be invoked. Called when the discovery handle is destroyed. 157 : */ 158 : void erase(absl::string_view name, CallbackListItemWeakPtr item_weak_ptr); 159 : /** 160 : * Drops the callback item from the discovery manager. 161 : */ 162 : void eraseItem(absl::string_view name, CallbackListItemSharedPtr item_ptr); 163 : /** 164 : * Try to erase a callback from under the given iterator. It returns a boolean value indicating 165 : * whether the dropped callback was a last one for the given cluster. 166 : */ 167 : bool eraseFromList(absl::string_view name, CallbackListIterator it); 168 : 169 : std::string thread_name_; 170 : absl::flat_hash_map<std::string, CallbackList> pending_clusters_; 171 : std::unique_ptr<ClusterUpdateCallbacks> callbacks_; 172 : ClusterUpdateCallbacksHandlePtr callbacks_handle_; 173 : }; 174 : 175 : } // namespace Upstream 176 : } // namespace Envoy