Line data Source code
1 : #include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
2 :
3 : #include "envoy/service/discovery/v3/discovery.pb.h"
4 :
5 : #include "source/common/common/assert.h"
6 : #include "source/common/common/backoff_strategy.h"
7 : #include "source/common/common/token_bucket_impl.h"
8 : #include "source/common/config/utility.h"
9 : #include "source/common/config/xds_context_params.h"
10 : #include "source/common/config/xds_resource.h"
11 : #include "source/common/memory/utils.h"
12 : #include "source/common/protobuf/protobuf.h"
13 : #include "source/common/protobuf/utility.h"
14 : #include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
15 :
16 : namespace Envoy {
17 : namespace Config {
18 :
19 : namespace {
20 : class AllMuxesState {
21 : public:
22 4 : void insert(NewGrpcMuxImpl* mux) { muxes_.insert(mux); }
23 :
24 4 : void erase(NewGrpcMuxImpl* mux) { muxes_.erase(mux); }
25 :
26 135 : void shutdownAll() {
27 135 : for (auto& mux : muxes_) {
28 4 : mux->shutdown();
29 4 : }
30 135 : }
31 :
32 : private:
33 : absl::flat_hash_set<NewGrpcMuxImpl*> muxes_;
34 : };
35 : using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
36 : } // namespace
37 :
38 : NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context)
39 : : grpc_stream_(this, std::move(grpc_mux_context.async_client_),
40 : grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
41 : grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
42 : grpc_mux_context.rate_limit_settings_),
43 : local_info_(grpc_mux_context.local_info_),
44 : config_validators_(std::move(grpc_mux_context.config_validators_)),
45 : dynamic_update_callback_handle_(
46 : grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
47 0 : [this](absl::string_view resource_type_url) {
48 0 : onDynamicContextUpdate(resource_type_url);
49 0 : })),
50 : dispatcher_(grpc_mux_context.dispatcher_),
51 : xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
52 4 : eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)) {
53 4 : AllMuxes::get().insert(this);
54 4 : }
55 :
56 4 : NewGrpcMuxImpl::~NewGrpcMuxImpl() { AllMuxes::get().erase(this); }
57 :
58 135 : void NewGrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
59 :
60 0 : void NewGrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
61 0 : auto sub = subscriptions_.find(resource_type_url);
62 0 : if (sub == subscriptions_.end()) {
63 0 : return;
64 0 : }
65 0 : sub->second->sub_state_.setMustSendDiscoveryRequest();
66 0 : trySendDiscoveryRequests();
67 0 : }
68 :
69 8 : ScopedResume NewGrpcMuxImpl::pause(const std::string& type_url) {
70 8 : return pause(std::vector<std::string>{type_url});
71 8 : }
72 :
73 23 : ScopedResume NewGrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
74 53 : for (const auto& type_url : type_urls) {
75 53 : pausable_ack_queue_.pause(type_url);
76 53 : }
77 :
78 23 : return std::make_unique<Cleanup>([this, type_urls]() {
79 53 : for (const auto& type_url : type_urls) {
80 53 : pausable_ack_queue_.resume(type_url);
81 53 : if (!pausable_ack_queue_.paused(type_url)) {
82 53 : trySendDiscoveryRequests();
83 53 : }
84 53 : }
85 23 : });
86 23 : }
87 :
88 : void NewGrpcMuxImpl::onDiscoveryResponse(
89 : std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message,
90 23 : ControlPlaneStats& control_plane_stats) {
91 23 : ENVOY_LOG(debug, "Received DeltaDiscoveryResponse for {} at version {}", message->type_url(),
92 23 : message->system_version_info());
93 :
94 23 : auto sub = subscriptions_.find(message->type_url());
95 23 : if (sub == subscriptions_.end()) {
96 0 : ENVOY_LOG(warn,
97 0 : "Dropping received DeltaDiscoveryResponse (with version {}) for non-existent "
98 0 : "subscription {}.",
99 0 : message->system_version_info(), message->type_url());
100 0 : return;
101 0 : }
102 :
103 23 : if (message->has_control_plane()) {
104 0 : control_plane_stats.identifier_.set(message->control_plane().identifier());
105 :
106 0 : if (message->control_plane().identifier() != sub->second->control_plane_identifier_) {
107 0 : sub->second->control_plane_identifier_ = message->control_plane().identifier();
108 0 : ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", message->type_url(),
109 0 : sub->second->control_plane_identifier_);
110 0 : }
111 0 : }
112 :
113 23 : auto ack = sub->second->sub_state_.handleResponse(*message);
114 :
115 : // Processing point to record error if there is any failure after the response is processed.
116 23 : if (xds_config_tracker_.has_value() &&
117 23 : ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
118 0 : xds_config_tracker_->onConfigRejected(*message, ack.error_detail_.message());
119 0 : }
120 23 : kickOffAck(ack);
121 23 : Memory::Utils::tryShrinkHeap();
122 23 : }
123 :
124 4 : void NewGrpcMuxImpl::onStreamEstablished() {
125 4 : for (auto& [type_url, subscription] : subscriptions_) {
126 0 : UNREFERENCED_PARAMETER(type_url);
127 0 : subscription->sub_state_.markStreamFresh();
128 0 : }
129 4 : pausable_ack_queue_.clear();
130 4 : trySendDiscoveryRequests();
131 4 : }
132 :
133 4 : void NewGrpcMuxImpl::onEstablishmentFailure() {
134 : // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
135 : // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
136 : // crash, the iteration needs to dance around a little: collect pointers to all
137 : // SubscriptionStates, call on all those pointers we haven't yet called on, repeat if there are
138 : // now more SubscriptionStates.
139 4 : absl::flat_hash_map<std::string, DeltaSubscriptionState*> all_subscribed;
140 4 : absl::flat_hash_map<std::string, DeltaSubscriptionState*> already_called;
141 4 : do {
142 15 : for (auto& [type_url, subscription] : subscriptions_) {
143 15 : all_subscribed[type_url] = &subscription->sub_state_;
144 15 : }
145 15 : for (auto& sub : all_subscribed) {
146 15 : if (already_called.insert(sub).second) { // insert succeeded ==> not already called
147 15 : sub.second->handleEstablishmentFailure();
148 15 : }
149 15 : }
150 4 : } while (all_subscribed.size() != subscriptions_.size());
151 4 : }
152 :
153 0 : void NewGrpcMuxImpl::onWriteable() { trySendDiscoveryRequests(); }
154 :
155 23 : void NewGrpcMuxImpl::kickOffAck(UpdateAck ack) {
156 23 : pausable_ack_queue_.push(std::move(ack));
157 23 : trySendDiscoveryRequests();
158 23 : }
159 :
160 : // TODO(fredlas) to be removed from the GrpcMux interface very soon.
161 4 : void NewGrpcMuxImpl::start() {
162 4 : ASSERT(!started_);
163 4 : if (started_) {
164 0 : return;
165 0 : }
166 4 : started_ = true;
167 4 : grpc_stream_.establishNewStream();
168 4 : }
169 :
170 : GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
171 : const absl::flat_hash_set<std::string>& resources,
172 : SubscriptionCallbacks& callbacks,
173 : OpaqueResourceDecoderSharedPtr resource_decoder,
174 32 : const SubscriptionOptions& options) {
175 32 : auto entry = subscriptions_.find(type_url);
176 32 : if (entry == subscriptions_.end()) {
177 : // We don't yet have a subscription for type_url! Make one!
178 15 : addSubscription(type_url, options.use_namespace_matching_);
179 15 : return addWatch(type_url, resources, callbacks, resource_decoder, options);
180 15 : }
181 :
182 17 : Watch* watch = entry->second->watch_map_.addWatch(callbacks, *resource_decoder);
183 : // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
184 17 : updateWatch(type_url, watch, resources, options);
185 17 : return std::make_unique<WatchImpl>(type_url, watch, *this, options);
186 32 : }
187 :
188 : // Updates the list of resource names watched by the given watch. If an added name is new across
189 : // the whole subscription, or if a removed name has no other watch interested in it, then the
190 : // subscription will enqueue and attempt to send an appropriate discovery request.
191 : void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
192 : const absl::flat_hash_set<std::string>& resources,
193 34 : const SubscriptionOptions& options) {
194 34 : ASSERT(watch != nullptr);
195 34 : auto sub = subscriptions_.find(type_url);
196 34 : RELEASE_ASSERT(sub != subscriptions_.end(),
197 34 : fmt::format("Watch of {} has no subscription to update.", type_url));
198 : // We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra
199 : // context parameters.
200 34 : absl::flat_hash_set<std::string> effective_resources;
201 34 : for (const auto& resource : resources) {
202 9 : if (XdsResourceIdentifier::hasXdsTpScheme(resource)) {
203 0 : auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource);
204 0 : THROW_IF_STATUS_NOT_OK(xdstp_resource_or_error, throw);
205 0 : auto xdstp_resource = xdstp_resource_or_error.value();
206 0 : if (options.add_xdstp_node_context_params_) {
207 0 : const auto context = XdsContextParams::encodeResource(
208 0 : local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
209 0 : xdstp_resource.mutable_context()->CopyFrom(context);
210 0 : }
211 0 : XdsResourceIdentifier::EncodeOptions encode_options;
212 0 : encode_options.sort_context_params_ = true;
213 0 : effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options));
214 9 : } else {
215 9 : effective_resources.insert(resource);
216 9 : }
217 9 : }
218 34 : auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, effective_resources);
219 34 : if (options.use_namespace_matching_) {
220 : // This is to prevent sending out of requests that contain prefixes instead of resource names
221 0 : sub->second->sub_state_.updateSubscriptionInterest({}, {});
222 34 : } else {
223 34 : sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_,
224 34 : added_removed.removed_);
225 34 : }
226 : // Tell the server about our change in interest, if any.
227 34 : if (sub->second->sub_state_.subscriptionUpdatePending()) {
228 26 : trySendDiscoveryRequests();
229 26 : }
230 34 : }
231 :
232 : void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
233 0 : const absl::flat_hash_set<std::string>& for_update) {
234 0 : auto sub = subscriptions_.find(type_url);
235 0 : RELEASE_ASSERT(sub != subscriptions_.end(),
236 0 : fmt::format("Watch of {} has no subscription to update.", type_url));
237 0 : sub->second->sub_state_.updateSubscriptionInterest(for_update, {});
238 : // Tell the server about our change in interest, if any.
239 0 : if (sub->second->sub_state_.subscriptionUpdatePending()) {
240 0 : trySendDiscoveryRequests();
241 0 : }
242 0 : }
243 :
244 17 : void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
245 17 : updateWatch(type_url, watch, {}, {});
246 17 : auto entry = subscriptions_.find(type_url);
247 17 : ASSERT(entry != subscriptions_.end(),
248 17 : fmt::format("removeWatch() called for non-existent subscription {}.", type_url));
249 17 : entry->second->watch_map_.removeWatch(watch);
250 17 : }
251 :
252 : void NewGrpcMuxImpl::addSubscription(const std::string& type_url,
253 15 : const bool use_namespace_matching) {
254 : // Resource cache is only used for EDS resources.
255 15 : EdsResourcesCacheOptRef resources_cache{absl::nullopt};
256 15 : if (eds_resources_cache_ &&
257 15 : (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
258 0 : resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
259 0 : }
260 15 : subscriptions_.emplace(
261 15 : type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_, use_namespace_matching,
262 15 : dispatcher_, *config_validators_.get(),
263 15 : xds_config_tracker_, resources_cache));
264 15 : subscription_ordering_.emplace_back(type_url);
265 15 : }
266 :
267 106 : void NewGrpcMuxImpl::trySendDiscoveryRequests() {
268 106 : if (shutdown_) {
269 9 : return;
270 9 : }
271 :
272 137 : while (true) {
273 : // Do any of our subscriptions even want to send a request?
274 137 : absl::optional<std::string> maybe_request_type = whoWantsToSendDiscoveryRequest();
275 137 : if (!maybe_request_type.has_value()) {
276 97 : break;
277 97 : }
278 : // If so, which one (by type_url)?
279 40 : std::string next_request_type_url = maybe_request_type.value();
280 : // If we don't have a subscription object for this request's type_url, drop the request.
281 40 : auto sub = subscriptions_.find(next_request_type_url);
282 40 : RELEASE_ASSERT(sub != subscriptions_.end(),
283 40 : fmt::format("Tried to send discovery request for non-existent subscription {}.",
284 40 : next_request_type_url));
285 :
286 : // Try again later if paused/rate limited/stream down.
287 40 : if (!canSendDiscoveryRequest(next_request_type_url)) {
288 0 : break;
289 0 : }
290 40 : envoy::service::discovery::v3::DeltaDiscoveryRequest request;
291 : // Get our subscription state to generate the appropriate DeltaDiscoveryRequest, and send.
292 40 : if (!pausable_ack_queue_.empty()) {
293 : // Because ACKs take precedence over plain requests, if there is anything in the queue, it's
294 : // safe to assume it's of the type_url that we're wanting to send.
295 23 : request = sub->second->sub_state_.getNextRequestWithAck(pausable_ack_queue_.popFront());
296 23 : } else {
297 17 : request = sub->second->sub_state_.getNextRequestAckless();
298 17 : }
299 40 : grpc_stream_.sendMessage(request);
300 40 : }
301 97 : grpc_stream_.maybeUpdateQueueSizeStat(pausable_ack_queue_.size());
302 97 : }
303 :
304 : // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
305 : // whether we *want* to send a DeltaDiscoveryRequest).
306 40 : bool NewGrpcMuxImpl::canSendDiscoveryRequest(const std::string& type_url) {
307 40 : RELEASE_ASSERT(
308 40 : !pausable_ack_queue_.paused(type_url),
309 40 : fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is "
310 40 : "supposed to be filtered out by whoWantsToSendDiscoveryRequest(). ",
311 40 : type_url));
312 :
313 40 : if (!grpc_stream_.grpcStreamAvailable()) {
314 0 : ENVOY_LOG(trace, "No stream available to send a discovery request for {}.", type_url);
315 0 : return false;
316 40 : } else if (!grpc_stream_.checkRateLimitAllowsDrain()) {
317 0 : ENVOY_LOG(trace, "{} discovery request hit rate limit; will try later.", type_url);
318 0 : return false;
319 0 : }
320 40 : return true;
321 40 : }
322 :
323 : // Checks whether we have something to say in a DeltaDiscoveryRequest, which can be an ACK and/or
324 : // a subscription update. (Does not check whether we *can* send that DeltaDiscoveryRequest).
325 : // Returns the type_url we should send the DeltaDiscoveryRequest for (if any).
326 : // First, prioritizes ACKs over non-ACK subscription interest updates.
327 : // Then, prioritizes non-ACK updates in the order the various types
328 : // of subscriptions were activated.
329 137 : absl::optional<std::string> NewGrpcMuxImpl::whoWantsToSendDiscoveryRequest() {
330 : // All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose
331 : // type_url from pausable_ack_queue_ if possible, before looking at pending updates.
332 137 : if (!pausable_ack_queue_.empty()) {
333 23 : return pausable_ack_queue_.front().type_url_;
334 23 : }
335 : // If we're looking to send multiple non-ACK requests, send them in the order that their
336 : // subscriptions were initiated.
337 312 : for (const auto& sub_type : subscription_ordering_) {
338 312 : auto sub = subscriptions_.find(sub_type);
339 312 : if (sub != subscriptions_.end() && sub->second->sub_state_.subscriptionUpdatePending() &&
340 312 : !pausable_ack_queue_.paused(sub_type)) {
341 17 : return sub->first;
342 17 : }
343 312 : }
344 97 : return absl::nullopt;
345 114 : }
346 :
347 : // A factory class for creating NewGrpcMuxImpl so it does not have to be
348 : // hard-compiled into cluster_manager_impl.cc
349 : class NewGrpcMuxFactory : public MuxFactory {
350 : public:
351 13 : std::string name() const override { return "envoy.config_mux.new_grpc_mux_factory"; }
352 135 : void shutdownAll() override { return NewGrpcMuxImpl::shutdownAll(); }
353 : std::shared_ptr<GrpcMux>
354 : create(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
355 : Random::RandomGenerator&, Stats::Scope& scope,
356 : const envoy::config::core::v3::ApiConfigSource& ads_config,
357 : const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
358 : BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
359 4 : OptRef<XdsResourcesDelegate>, bool use_eds_resources_cache) override {
360 4 : GrpcMuxContext grpc_mux_context{
361 4 : /*async_client_=*/std::move(async_client),
362 4 : /*dispatcher_=*/dispatcher,
363 : /*service_method_=*/
364 4 : *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
365 4 : "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"),
366 4 : /*local_info_=*/local_info,
367 4 : /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
368 4 : /*scope_=*/scope,
369 4 : /*config_validators_=*/std::move(config_validators),
370 4 : /*xds_resources_delegate_=*/absl::nullopt,
371 4 : /*xds_config_tracker_=*/xds_config_tracker,
372 4 : /*backoff_strategy_=*/std::move(backoff_strategy),
373 4 : /*target_xds_authority_=*/"",
374 : /*eds_resources_cache_=*/
375 4 : (use_eds_resources_cache &&
376 4 : Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
377 4 : ? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
378 4 : : nullptr};
379 4 : return std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context);
380 4 : }
381 : };
382 :
383 : REGISTER_FACTORY(NewGrpcMuxFactory, MuxFactory);
384 :
385 : } // namespace Config
386 : } // namespace Envoy
|