Line data Source code
1 : #pragma once 2 : 3 : #include <memory> 4 : 5 : #include "envoy/common/random_generator.h" 6 : #include "envoy/common/token_bucket.h" 7 : #include "envoy/config/endpoint/v3/endpoint.pb.h" 8 : #include "envoy/config/grpc_mux.h" 9 : #include "envoy/config/subscription.h" 10 : #include "envoy/config/xds_config_tracker.h" 11 : #include "envoy/service/discovery/v3/discovery.pb.h" 12 : 13 : #include "source/common/common/logger.h" 14 : #include "source/common/config/api_version.h" 15 : #include "source/common/config/resource_name.h" 16 : #include "source/common/grpc/common.h" 17 : #include "source/common/runtime/runtime_features.h" 18 : #include "source/extensions/config_subscription/grpc/delta_subscription_state.h" 19 : #include "source/extensions/config_subscription/grpc/grpc_mux_context.h" 20 : #include "source/extensions/config_subscription/grpc/grpc_stream.h" 21 : #include "source/extensions/config_subscription/grpc/pausable_ack_queue.h" 22 : #include "source/extensions/config_subscription/grpc/watch_map.h" 23 : 24 : namespace Envoy { 25 : namespace Config { 26 : 27 : // Manages subscriptions to one or more type of resource. The logical protocol 28 : // state of those subscription(s) is handled by DeltaSubscriptionState. 29 : // This class owns the GrpcStream used to talk to the server, maintains queuing 30 : // logic to properly order the subscription(s)' various messages, and allows 31 : // starting/stopping/pausing of the subscriptions. 32 : class NewGrpcMuxImpl 33 : : public GrpcMux, 34 : public GrpcStreamCallbacks<envoy::service::discovery::v3::DeltaDiscoveryResponse>, 35 : Logger::Loggable<Logger::Id::config> { 36 : public: 37 : NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context); 38 : 39 : ~NewGrpcMuxImpl() override; 40 : 41 : // Causes all NewGrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash 42 : // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably 43 : // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which 44 : // would then cause all `NewGrpcMuxImpl` to be destructed. 45 : // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072. 46 : static void shutdownAll(); 47 : 48 4 : void shutdown() { shutdown_ = true; } 49 : 50 : GrpcMuxWatchPtr addWatch(const std::string& type_url, 51 : const absl::flat_hash_set<std::string>& resources, 52 : SubscriptionCallbacks& callbacks, 53 : OpaqueResourceDecoderSharedPtr resource_decoder, 54 : const SubscriptionOptions& options) override; 55 : 56 : void requestOnDemandUpdate(const std::string& type_url, 57 : const absl::flat_hash_set<std::string>& for_update) override; 58 : 59 0 : EdsResourcesCacheOptRef edsResourcesCache() override { 60 0 : return makeOptRefFromPtr(eds_resources_cache_.get()); 61 0 : } 62 : 63 : ScopedResume pause(const std::string& type_url) override; 64 : ScopedResume pause(const std::vector<std::string> type_urls) override; 65 : 66 : void onDiscoveryResponse( 67 : std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message, 68 : ControlPlaneStats& control_plane_stats) override; 69 : 70 : void onStreamEstablished() override; 71 : 72 : void onEstablishmentFailure() override; 73 : 74 : void onWriteable() override; 75 : 76 : void kickOffAck(UpdateAck ack); 77 : 78 : // TODO(fredlas) remove this from the GrpcMux interface. 79 : void start() override; 80 : 81 : GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest, 82 : envoy::service::discovery::v3::DeltaDiscoveryResponse>& 83 0 : grpcStreamForTest() { 84 0 : return grpc_stream_; 85 0 : } 86 : 87 : struct SubscriptionStuff { 88 : SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info, 89 : const bool use_namespace_matching, Event::Dispatcher& dispatcher, 90 : CustomConfigValidators& config_validators, 91 : XdsConfigTrackerOptRef xds_config_tracker, 92 : EdsResourcesCacheOptRef eds_resources_cache) 93 : : watch_map_(use_namespace_matching, type_url, config_validators, eds_resources_cache), 94 15 : sub_state_(type_url, watch_map_, local_info, dispatcher, xds_config_tracker) { 95 : // If eds resources cache is provided, then the type must be ClusterLoadAssignment. 96 15 : ASSERT( 97 15 : !eds_resources_cache.has_value() || 98 15 : (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())); 99 15 : } 100 : 101 : WatchMap watch_map_; 102 : DeltaSubscriptionState sub_state_; 103 : std::string control_plane_identifier_{}; 104 : 105 : SubscriptionStuff(const SubscriptionStuff&) = delete; 106 : SubscriptionStuff& operator=(const SubscriptionStuff&) = delete; 107 : }; 108 : 109 : using SubscriptionStuffPtr = std::unique_ptr<SubscriptionStuff>; 110 : 111 : // for use in tests only 112 0 : const absl::flat_hash_map<std::string, SubscriptionStuffPtr>& subscriptions() { 113 0 : return subscriptions_; 114 0 : } 115 : 116 : private: 117 : class WatchImpl : public GrpcMuxWatch { 118 : public: 119 : WatchImpl(const std::string& type_url, Watch* watch, NewGrpcMuxImpl& parent, 120 : const SubscriptionOptions& options) 121 17 : : type_url_(type_url), watch_(watch), parent_(parent), options_(options) {} 122 : 123 17 : ~WatchImpl() override { remove(); } 124 : 125 17 : void remove() { 126 17 : if (watch_) { 127 17 : parent_.removeWatch(type_url_, watch_); 128 17 : watch_ = nullptr; 129 17 : } 130 17 : } 131 : 132 0 : void update(const absl::flat_hash_set<std::string>& resources) override { 133 0 : parent_.updateWatch(type_url_, watch_, resources, options_); 134 0 : } 135 : 136 : private: 137 : const std::string type_url_; 138 : Watch* watch_; 139 : NewGrpcMuxImpl& parent_; 140 : const SubscriptionOptions options_; 141 : }; 142 : 143 : void removeWatch(const std::string& type_url, Watch* watch); 144 : 145 : // Updates the list of resource names watched by the given watch. If an added name is new across 146 : // the whole subscription, or if a removed name has no other watch interested in it, then the 147 : // subscription will enqueue and attempt to send an appropriate discovery request. 148 : void updateWatch(const std::string& type_url, Watch* watch, 149 : const absl::flat_hash_set<std::string>& resources, 150 : const SubscriptionOptions& options); 151 : 152 : // Adds a subscription for the type_url to the subscriptions map and order list. 153 : void addSubscription(const std::string& type_url, bool use_namespace_matching); 154 : 155 : void trySendDiscoveryRequests(); 156 : 157 : // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check 158 : // whether we *want* to send a DeltaDiscoveryRequest). 159 : bool canSendDiscoveryRequest(const std::string& type_url); 160 : 161 : // Checks whether we have something to say in a DeltaDiscoveryRequest, which can be an ACK and/or 162 : // a subscription update. (Does not check whether we *can* send that DeltaDiscoveryRequest). 163 : // Returns the type_url we should send the DeltaDiscoveryRequest for (if any). 164 : // First, prioritizes ACKs over non-ACK subscription interest updates. 165 : // Then, prioritizes non-ACK updates in the order the various types 166 : // of subscriptions were activated. 167 : absl::optional<std::string> whoWantsToSendDiscoveryRequest(); 168 : 169 : // Invoked when dynamic context parameters change for a resource type. 170 : void onDynamicContextUpdate(absl::string_view resource_type_url); 171 : 172 : // Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All 173 : // of our different resource types' ACKs are mixed together in this queue. See class for 174 : // description of how it interacts with pause() and resume(). 175 : PausableAckQueue pausable_ack_queue_; 176 : 177 : // Map key is type_url. 178 : absl::flat_hash_map<std::string, SubscriptionStuffPtr> subscriptions_; 179 : 180 : // Determines the order of initial discovery requests. (Assumes that subscriptions are added in 181 : // the order of Envoy's dependency ordering). 182 : std::list<std::string> subscription_ordering_; 183 : 184 : GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest, 185 : envoy::service::discovery::v3::DeltaDiscoveryResponse> 186 : grpc_stream_; 187 : 188 : const LocalInfo::LocalInfo& local_info_; 189 : CustomConfigValidatorsPtr config_validators_; 190 : Common::CallbackHandlePtr dynamic_update_callback_handle_; 191 : Event::Dispatcher& dispatcher_; 192 : XdsConfigTrackerOptRef xds_config_tracker_; 193 : EdsResourcesCachePtr eds_resources_cache_; 194 : 195 : bool started_{false}; 196 : // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is 197 : // true because it may contain dangling pointers. 198 : std::atomic<bool> shutdown_{false}; 199 : }; 200 : 201 : using NewGrpcMuxImplPtr = std::unique_ptr<NewGrpcMuxImpl>; 202 : using NewGrpcMuxImplSharedPtr = std::shared_ptr<NewGrpcMuxImpl>; 203 : 204 : class NewGrpcMuxFactory; 205 : DECLARE_FACTORY(NewGrpcMuxFactory); 206 : 207 : } // namespace Config 208 : } // namespace Envoy