Line data Source code
1 : #include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
2 :
3 : #include "envoy/service/discovery/v3/discovery.pb.h"
4 :
5 : #include "source/common/config/decoded_resource_impl.h"
6 : #include "source/common/config/utility.h"
7 : #include "source/common/memory/utils.h"
8 : #include "source/common/protobuf/protobuf.h"
9 : #include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
10 : #include "source/extensions/config_subscription/grpc/xds_source_id.h"
11 :
12 : #include "absl/container/btree_map.h"
13 : #include "absl/container/node_hash_set.h"
14 : #include "absl/strings/match.h"
15 : #include "absl/strings/str_cat.h"
16 :
17 : namespace Envoy {
18 : namespace Config {
19 :
20 : namespace {
21 : class AllMuxesState {
22 : public:
23 11 : void insert(GrpcMuxImpl* mux) { muxes_.insert(mux); }
24 :
25 11 : void erase(GrpcMuxImpl* mux) { muxes_.erase(mux); }
26 :
27 135 : void shutdownAll() {
28 135 : for (auto& mux : muxes_) {
29 11 : mux->shutdown();
30 11 : }
31 135 : }
32 :
33 : private:
34 : absl::flat_hash_set<GrpcMuxImpl*> muxes_;
35 : };
36 : using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
37 :
38 : // Returns true if `resource_name` contains the wildcard XdsTp resource, for example:
39 : // xdstp://test/envoy.config.cluster.v3.Cluster/foo-cluster/*
40 : // xdstp://test/envoy.config.cluster.v3.Cluster/foo-cluster/*?node.name=my_node
41 9 : bool isXdsTpWildcard(const std::string& resource_name) {
42 9 : return XdsResourceIdentifier::hasXdsTpScheme(resource_name) &&
43 9 : (absl::EndsWith(resource_name, "/*") || absl::StrContains(resource_name, "/*?"));
44 9 : }
45 :
46 : // Converts the XdsTp resource name to its wildcard equivalent.
47 : // Must only be called on XdsTp resource names.
48 0 : std::string convertToWildcard(const std::string& resource_name) {
49 0 : ASSERT(XdsResourceIdentifier::hasXdsTpScheme(resource_name));
50 0 : auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
51 0 : THROW_IF_STATUS_NOT_OK(resource_or_error, throw);
52 0 : xds::core::v3::ResourceName xdstp_resource = resource_or_error.value();
53 0 : const auto pos = xdstp_resource.id().find_last_of('/');
54 0 : xdstp_resource.set_id(
55 0 : pos == std::string::npos ? "*" : absl::StrCat(xdstp_resource.id().substr(0, pos), "/*"));
56 0 : XdsResourceIdentifier::EncodeOptions options;
57 0 : options.sort_context_params_ = true;
58 0 : return XdsResourceIdentifier::encodeUrn(xdstp_resource, options);
59 0 : }
60 : } // namespace
61 :
62 : GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
63 : : grpc_stream_(this, std::move(grpc_mux_context.async_client_),
64 : grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
65 : grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
66 : grpc_mux_context.rate_limit_settings_),
67 : local_info_(grpc_mux_context.local_info_), skip_subsequent_node_(skip_subsequent_node),
68 : config_validators_(std::move(grpc_mux_context.config_validators_)),
69 : xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
70 : xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_),
71 : eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)),
72 : target_xds_authority_(grpc_mux_context.target_xds_authority_),
73 : dispatcher_(grpc_mux_context.dispatcher_),
74 : dynamic_update_callback_handle_(
75 : grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
76 0 : [this](absl::string_view resource_type_url) {
77 0 : onDynamicContextUpdate(resource_type_url);
78 11 : })) {
79 11 : Config::Utility::checkLocalInfo("ads", local_info_);
80 11 : AllMuxes::get().insert(this);
81 11 : }
82 :
83 11 : GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); }
84 :
85 135 : void GrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
86 :
87 0 : void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
88 0 : auto api_state = api_state_.find(resource_type_url);
89 0 : if (api_state == api_state_.end()) {
90 0 : return;
91 0 : }
92 0 : api_state->second->must_send_node_ = true;
93 0 : queueDiscoveryRequest(resource_type_url);
94 0 : }
95 :
96 11 : void GrpcMuxImpl::start() {
97 11 : ASSERT(!started_);
98 11 : if (started_) {
99 0 : return;
100 0 : }
101 11 : started_ = true;
102 11 : grpc_stream_.establishNewStream();
103 11 : }
104 :
105 140 : void GrpcMuxImpl::sendDiscoveryRequest(absl::string_view type_url) {
106 140 : if (shutdown_) {
107 0 : return;
108 0 : }
109 :
110 140 : ApiState& api_state = apiStateFor(type_url);
111 140 : auto& request = api_state.request_;
112 140 : request.mutable_resource_names()->Clear();
113 :
114 : // Maintain a set to avoid dupes.
115 140 : absl::node_hash_set<std::string> resources;
116 174 : for (const auto* watch : api_state.watches_) {
117 174 : for (const std::string& resource : watch->resources_) {
118 107 : if (!resources.contains(resource)) {
119 107 : resources.emplace(resource);
120 107 : request.add_resource_names(resource);
121 107 : }
122 107 : }
123 174 : }
124 :
125 140 : if (api_state.must_send_node_ || !skip_subsequent_node_ || first_stream_request_) {
126 : // Node may have been cleared during a previous request.
127 10 : request.mutable_node()->CopyFrom(local_info_.node());
128 10 : api_state.must_send_node_ = false;
129 130 : } else {
130 130 : request.clear_node();
131 130 : }
132 140 : ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url, request.ShortDebugString());
133 140 : grpc_stream_.sendMessage(request);
134 140 : first_stream_request_ = false;
135 :
136 : // clear error_detail after the request is sent if it exists.
137 140 : if (apiStateFor(type_url).request_.has_error_detail()) {
138 0 : apiStateFor(type_url).request_.clear_error_detail();
139 0 : }
140 140 : }
141 :
142 11 : void GrpcMuxImpl::clearNonce() {
143 : // Iterate over all api_states (for each type_url), and clear its nonce.
144 11 : for (auto& [type_url, api_state] : api_state_) {
145 0 : if (api_state) {
146 0 : api_state->request_.clear_response_nonce();
147 0 : }
148 0 : }
149 11 : }
150 :
151 : void GrpcMuxImpl::loadConfigFromDelegate(const std::string& type_url,
152 31 : const absl::flat_hash_set<std::string>& resource_names) {
153 31 : if (!xds_resources_delegate_.has_value()) {
154 31 : return;
155 31 : }
156 0 : ApiState& api_state = apiStateFor(type_url);
157 0 : if (api_state.watches_.empty()) {
158 : // No watches, so exit without loading config from storage.
159 0 : return;
160 0 : }
161 :
162 0 : const XdsConfigSourceId source_id{target_xds_authority_, type_url};
163 0 : TRY_ASSERT_MAIN_THREAD {
164 0 : std::vector<envoy::service::discovery::v3::Resource> resources =
165 0 : xds_resources_delegate_->getResources(source_id, resource_names);
166 0 : if (resources.empty()) {
167 : // There are no persisted resources, so nothing to process.
168 0 : return;
169 0 : }
170 :
171 0 : std::vector<DecodedResourcePtr> decoded_resources;
172 0 : OpaqueResourceDecoder& resource_decoder = *api_state.watches_.front()->resource_decoder_;
173 0 : std::string version_info;
174 0 : for (const auto& resource : resources) {
175 0 : if (version_info.empty()) {
176 0 : version_info = resource.version();
177 0 : } else {
178 0 : ASSERT(resource.version() == version_info);
179 0 : }
180 :
181 0 : TRY_ASSERT_MAIN_THREAD {
182 0 : decoded_resources.emplace_back(
183 0 : std::make_unique<DecodedResourceImpl>(resource_decoder, resource));
184 0 : }
185 0 : END_TRY
186 0 : catch (const EnvoyException& e) {
187 0 : xds_resources_delegate_->onResourceLoadFailed(source_id, resource.name(), e);
188 0 : }
189 0 : }
190 :
191 0 : processDiscoveryResources(decoded_resources, api_state, type_url, version_info,
192 0 : /*call_delegate=*/false);
193 0 : }
194 0 : END_TRY
195 0 : catch (const EnvoyException& e) {
196 : // TODO(abeyad): do something else here?
197 0 : ENVOY_LOG_MISC(warn, "Failed to load config from delegate for {}: {}", source_id.toKey(),
198 0 : e.what());
199 0 : }
200 0 : }
201 :
202 : GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
203 : const absl::flat_hash_set<std::string>& resources,
204 : SubscriptionCallbacks& callbacks,
205 : OpaqueResourceDecoderSharedPtr resource_decoder,
206 51 : const SubscriptionOptions& options) {
207 : // Resource cache is only used for EDS resources.
208 51 : EdsResourcesCacheOptRef resources_cache{absl::nullopt};
209 51 : if (eds_resources_cache_ &&
210 51 : (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
211 0 : resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
212 0 : }
213 51 : auto watch = std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url,
214 51 : *this, options, local_info_, resources_cache);
215 51 : ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);
216 :
217 : // Lazily kick off the requests based on first subscription. This has the
218 : // convenient side-effect that we order messages on the channel based on
219 : // Envoy's internal dependency ordering.
220 : // TODO(gsagula): move TokenBucketImpl params to a config.
221 51 : if (!apiStateFor(type_url).subscribed_) {
222 40 : apiStateFor(type_url).request_.set_type_url(type_url);
223 40 : apiStateFor(type_url).request_.mutable_node()->MergeFrom(local_info_.node());
224 40 : apiStateFor(type_url).subscribed_ = true;
225 40 : subscriptions_.emplace_back(type_url);
226 40 : }
227 :
228 : // This will send an updated request on each subscription.
229 : // TODO(htuch): For RDS/EDS, this will generate a new DiscoveryRequest on each resource we added.
230 : // Consider in the future adding some kind of collation/batching during CDS/LDS updates so that we
231 : // only send a single RDS/EDS update after the CDS/LDS update.
232 51 : queueDiscoveryRequest(type_url);
233 :
234 51 : return watch;
235 51 : }
236 :
237 108 : ScopedResume GrpcMuxImpl::pause(const std::string& type_url) {
238 108 : return pause(std::vector<std::string>{type_url});
239 108 : }
240 :
241 165 : ScopedResume GrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
242 279 : for (const auto& type_url : type_urls) {
243 279 : ApiState& api_state = apiStateFor(type_url);
244 279 : ENVOY_LOG(debug, "Pausing discovery requests for {} (previous count {})", type_url,
245 279 : api_state.pauses_);
246 279 : ++api_state.pauses_;
247 279 : }
248 165 : return std::make_unique<Cleanup>([this, type_urls]() {
249 279 : for (const auto& type_url : type_urls) {
250 279 : ApiState& api_state = apiStateFor(type_url);
251 279 : ENVOY_LOG(debug, "Decreasing pause count on discovery requests for {} (previous count {})",
252 279 : type_url, api_state.pauses_);
253 279 : ASSERT(api_state.paused());
254 :
255 279 : if (--api_state.pauses_ == 0 && api_state.pending_ && api_state.subscribed_) {
256 120 : ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url);
257 120 : queueDiscoveryRequest(type_url);
258 120 : api_state.pending_ = false;
259 120 : }
260 279 : }
261 165 : });
262 165 : }
263 :
264 : void GrpcMuxImpl::onDiscoveryResponse(
265 : std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
266 88 : ControlPlaneStats& control_plane_stats) {
267 88 : const std::string type_url = message->type_url();
268 88 : ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url, message->version_info());
269 :
270 88 : if (api_state_.count(type_url) == 0) {
271 : // TODO(yuval-k): This should never happen. consider dropping the stream as this is a
272 : // protocol violation
273 0 : ENVOY_LOG(warn, "Ignoring the message for type URL {} as it has no current subscribers.",
274 0 : type_url);
275 0 : return;
276 0 : }
277 :
278 88 : ApiState& api_state = apiStateFor(type_url);
279 :
280 88 : if (message->has_control_plane()) {
281 0 : control_plane_stats.identifier_.set(message->control_plane().identifier());
282 :
283 0 : if (message->control_plane().identifier() != api_state.control_plane_identifier_) {
284 0 : api_state.control_plane_identifier_ = message->control_plane().identifier();
285 0 : ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", type_url,
286 0 : api_state.control_plane_identifier_);
287 0 : }
288 0 : }
289 :
290 88 : if (api_state.watches_.empty()) {
291 : // update the nonce as we are processing this response.
292 0 : api_state.request_.set_response_nonce(message->nonce());
293 0 : if (message->resources().empty()) {
294 : // No watches and no resources. This can happen when envoy unregisters from a
295 : // resource that's removed from the server as well. For example, a deleted cluster
296 : // triggers un-watching the ClusterLoadAssignment watch, and at the same time the
297 : // xDS server sends an empty list of ClusterLoadAssignment resources. we'll accept
298 : // this update. no need to send a discovery request, as we don't watch for anything.
299 0 : api_state.request_.set_version_info(message->version_info());
300 0 : } else {
301 : // No watches and we have resources - this should not happen. send a NACK (by not
302 : // updating the version).
303 0 : ENVOY_LOG(warn, "Ignoring unwatched type URL {}", type_url);
304 0 : queueDiscoveryRequest(type_url);
305 0 : }
306 0 : return;
307 0 : }
308 88 : ScopedResume same_type_resume;
309 : // We pause updates of the same type. This is necessary for SotW and GrpcMuxImpl, since unlike
310 : // delta and NewGRpcMuxImpl, independent watch additions/removals trigger updates regardless of
311 : // the delta state. The proper fix for this is to converge these implementations,
312 : // see https://github.com/envoyproxy/envoy/issues/11477.
313 88 : same_type_resume = pause(type_url);
314 88 : TRY_ASSERT_MAIN_THREAD {
315 88 : std::vector<DecodedResourcePtr> resources;
316 88 : OpaqueResourceDecoder& resource_decoder = *api_state.watches_.front()->resource_decoder_;
317 :
318 142 : for (const auto& resource : message->resources()) {
319 : // TODO(snowp): Check the underlying type when the resource is a Resource.
320 142 : if (!resource.Is<envoy::service::discovery::v3::Resource>() &&
321 142 : type_url != resource.type_url()) {
322 0 : throw EnvoyException(
323 0 : fmt::format("{} does not match the message-wide type URL {} in DiscoveryResponse {}",
324 0 : resource.type_url(), type_url, message->DebugString()));
325 0 : }
326 :
327 142 : auto decoded_resource =
328 142 : DecodedResourceImpl::fromResource(resource_decoder, resource, message->version_info());
329 :
330 142 : if (!isHeartbeatResource(type_url, *decoded_resource)) {
331 142 : resources.emplace_back(std::move(decoded_resource));
332 142 : }
333 142 : }
334 :
335 88 : processDiscoveryResources(resources, api_state, type_url, message->version_info(),
336 88 : /*call_delegate=*/true);
337 :
338 : // Processing point when resources are successfully ingested.
339 88 : if (xds_config_tracker_.has_value()) {
340 0 : xds_config_tracker_->onConfigAccepted(type_url, resources);
341 0 : }
342 88 : }
343 88 : END_TRY
344 88 : catch (const EnvoyException& e) {
345 0 : for (auto watch : api_state.watches_) {
346 0 : watch->callbacks_.onConfigUpdateFailed(
347 0 : Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
348 0 : }
349 0 : ::google::rpc::Status* error_detail = api_state.request_.mutable_error_detail();
350 0 : error_detail->set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
351 0 : error_detail->set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
352 :
353 : // Processing point when there is any exception during the parse and ingestion process.
354 0 : if (xds_config_tracker_.has_value()) {
355 0 : xds_config_tracker_->onConfigRejected(*message, error_detail->message());
356 0 : }
357 0 : }
358 88 : api_state.previously_fetched_data_ = true;
359 88 : api_state.request_.set_response_nonce(message->nonce());
360 88 : ASSERT(api_state.paused());
361 88 : queueDiscoveryRequest(type_url);
362 88 : }
363 :
364 : void GrpcMuxImpl::processDiscoveryResources(const std::vector<DecodedResourcePtr>& resources,
365 : ApiState& api_state, const std::string& type_url,
366 : const std::string& version_info,
367 88 : const bool call_delegate) {
368 88 : ASSERT_IS_MAIN_OR_TEST_THREAD();
369 : // To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we
370 : // build a map here from resource name to resource and then walk watches_.
371 : // We have to walk all watches (and need an efficient map as a result) to
372 : // ensure we deliver empty config updates when a resource is dropped. We make the map ordered
373 : // for test determinism.
374 88 : absl::btree_map<std::string, DecodedResourceRef> resource_ref_map;
375 88 : std::vector<DecodedResourceRef> all_resource_refs;
376 :
377 88 : const auto scoped_ttl_update = api_state.ttl_.scopedTtlUpdate();
378 :
379 142 : for (const auto& resource : resources) {
380 142 : if (resource->ttl()) {
381 0 : api_state.ttl_.add(*resource->ttl(), resource->name());
382 142 : } else {
383 142 : api_state.ttl_.clear(resource->name());
384 142 : }
385 :
386 142 : all_resource_refs.emplace_back(*resource);
387 142 : if (XdsResourceIdentifier::hasXdsTpScheme(resource->name())) {
388 : // Sort the context params of an xdstp resource, so we can compare them easily.
389 0 : auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource->name());
390 0 : THROW_IF_STATUS_NOT_OK(resource_or_error, throw);
391 0 : xds::core::v3::ResourceName xdstp_resource = resource_or_error.value();
392 0 : XdsResourceIdentifier::EncodeOptions options;
393 0 : options.sort_context_params_ = true;
394 0 : resource_ref_map.emplace(XdsResourceIdentifier::encodeUrn(xdstp_resource, options),
395 0 : *resource);
396 142 : } else {
397 142 : resource_ref_map.emplace(resource->name(), *resource);
398 142 : }
399 142 : }
400 :
401 : // Execute external config validators if there are any watches.
402 88 : if (!api_state.watches_.empty()) {
403 88 : config_validators_->executeValidators(type_url, resources);
404 88 : }
405 :
406 108 : for (auto watch : api_state.watches_) {
407 : // onConfigUpdate should be called in all cases for single watch xDS (Cluster and
408 : // Listener) even if the message does not have resources so that update_empty stat
409 : // is properly incremented and state-of-the-world semantics are maintained.
410 108 : if (watch->resources_.empty()) {
411 47 : THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(all_resource_refs, version_info));
412 47 : continue;
413 47 : }
414 61 : std::vector<DecodedResourceRef> found_resources;
415 61 : for (const auto& watched_resource_name : watch->resources_) {
416 : // Look for a singleton subscription.
417 61 : auto it = resource_ref_map.find(watched_resource_name);
418 61 : if (it != resource_ref_map.end()) {
419 52 : found_resources.emplace_back(it->second);
420 52 : } else if (isXdsTpWildcard(watched_resource_name)) {
421 : // See if the resources match the xdstp wildcard subscription.
422 : // Note: although it is unlikely that Envoy will need to support a resource that is mapped
423 : // to both a singleton and collection watch, this code still supports this use case.
424 : // TODO(abeyad): This could be made more efficient, e.g. by pre-computing and having a map
425 : // entry for each wildcard watch.
426 0 : for (const auto& resource_ref_it : resource_ref_map) {
427 0 : if (XdsResourceIdentifier::hasXdsTpScheme(resource_ref_it.first) &&
428 0 : convertToWildcard(resource_ref_it.first) == watched_resource_name) {
429 0 : found_resources.emplace_back(resource_ref_it.second);
430 0 : }
431 0 : }
432 0 : }
433 61 : }
434 :
435 : // onConfigUpdate should be called only on watches(clusters/listeners) that have
436 : // updates in the message for EDS/RDS.
437 61 : if (!found_resources.empty()) {
438 52 : THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(found_resources, version_info));
439 : // Resource cache is only used for EDS resources.
440 52 : if (eds_resources_cache_ &&
441 52 : (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
442 0 : for (const auto& resource : found_resources) {
443 0 : const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
444 0 : dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
445 0 : resource.get().resource());
446 0 : eds_resources_cache_->setResource(resource.get().name(), cluster_load_assignment);
447 0 : }
448 : // No need to remove resources from the cache, as currently only non-collection
449 : // subscriptions are supported, and these resources are removed in the call
450 : // to updateWatchInterest().
451 0 : }
452 52 : }
453 61 : }
454 :
455 : // All config updates have been applied without throwing an exception, so we'll call the xDS
456 : // resources delegate, if any.
457 88 : if (call_delegate && xds_resources_delegate_.has_value()) {
458 0 : xds_resources_delegate_->onConfigUpdated(XdsConfigSourceId{target_xds_authority_, type_url},
459 0 : all_resource_refs);
460 0 : }
461 :
462 : // TODO(mattklein123): In the future if we start tracking per-resource versions, we
463 : // would do that tracking here.
464 88 : api_state.request_.set_version_info(version_info);
465 88 : Memory::Utils::tryShrinkHeap();
466 88 : }
467 :
468 0 : void GrpcMuxImpl::onWriteable() { drainRequests(); }
469 :
470 11 : void GrpcMuxImpl::onStreamEstablished() {
471 11 : first_stream_request_ = true;
472 11 : grpc_stream_.maybeUpdateQueueSizeStat(0);
473 11 : clearNonce();
474 11 : request_queue_ = std::make_unique<std::queue<std::string>>();
475 11 : for (const auto& type_url : subscriptions_) {
476 0 : queueDiscoveryRequest(type_url);
477 0 : }
478 11 : }
479 :
480 10 : void GrpcMuxImpl::onEstablishmentFailure() {
481 69 : for (const auto& api_state : api_state_) {
482 69 : for (auto watch : api_state.second->watches_) {
483 48 : watch->callbacks_.onConfigUpdateFailed(
484 48 : Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr);
485 48 : }
486 69 : if (!api_state.second->previously_fetched_data_) {
487 : // On the initialization of the gRPC mux, if connection to the xDS server fails, load the
488 : // persisted config, if available. The locally persisted config will be used until
489 : // connectivity is established with the xDS server.
490 31 : loadConfigFromDelegate(
491 31 : /*type_url=*/api_state.first,
492 31 : absl::flat_hash_set<std::string>{api_state.second->request_.resource_names().begin(),
493 31 : api_state.second->request_.resource_names().end()});
494 31 : api_state.second->previously_fetched_data_ = true;
495 31 : }
496 69 : }
497 10 : }
498 :
499 290 : void GrpcMuxImpl::queueDiscoveryRequest(absl::string_view queue_item) {
500 290 : if (!grpc_stream_.grpcStreamAvailable()) {
501 28 : ENVOY_LOG(debug, "No stream available to queueDiscoveryRequest for {}", queue_item);
502 28 : return; // Drop this request; the reconnect will enqueue a new one.
503 28 : }
504 262 : ApiState& api_state = apiStateFor(queue_item);
505 262 : if (api_state.paused()) {
506 120 : ENVOY_LOG(trace, "API {} paused during queueDiscoveryRequest(), setting pending.", queue_item);
507 120 : api_state.pending_ = true;
508 120 : return; // Drop this request; the unpause will enqueue a new one.
509 120 : }
510 142 : request_queue_->emplace(std::string(queue_item));
511 142 : drainRequests();
512 142 : }
513 :
514 : void GrpcMuxImpl::expiryCallback(absl::string_view type_url,
515 0 : const std::vector<std::string>& expired) {
516 : // The TtlManager triggers a callback with a list of all the expired elements, which we need
517 : // to compare against the various watched resources to return the subset that each watch is
518 : // subscribed to.
519 :
520 : // We convert the incoming list into a set in order to more efficiently perform this
521 : // comparison when there are a lot of watches.
522 0 : absl::flat_hash_set<std::string> all_expired;
523 0 : all_expired.insert(expired.begin(), expired.end());
524 :
525 : // Note: We can blindly dereference the lookup here since the only time we call this is in a
526 : // callback that is created at the same time as we insert the ApiState for this type.
527 0 : for (auto watch : api_state_.find(type_url)->second->watches_) {
528 0 : Protobuf::RepeatedPtrField<std::string> found_resources_for_watch;
529 :
530 0 : for (const auto& resource : expired) {
531 0 : if (all_expired.find(resource) != all_expired.end()) {
532 0 : found_resources_for_watch.Add(std::string(resource));
533 0 : }
534 0 : }
535 :
536 0 : THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate({}, found_resources_for_watch, ""));
537 0 : }
538 0 : }
539 :
540 1410 : GrpcMuxImpl::ApiState& GrpcMuxImpl::apiStateFor(absl::string_view type_url) {
541 1410 : auto itr = api_state_.find(type_url);
542 1410 : if (itr == api_state_.end()) {
543 70 : api_state_.emplace(
544 70 : type_url, std::make_unique<ApiState>(dispatcher_, [this, type_url](const auto& expired) {
545 0 : expiryCallback(type_url, expired);
546 0 : }));
547 70 : }
548 :
549 1410 : return *api_state_.find(type_url)->second;
550 1410 : }
551 :
552 142 : void GrpcMuxImpl::drainRequests() {
553 282 : while (!request_queue_->empty() && grpc_stream_.checkRateLimitAllowsDrain()) {
554 : // Process the request, if rate limiting is not enabled at all or if it is under rate limit.
555 140 : sendDiscoveryRequest(request_queue_->front());
556 140 : request_queue_->pop();
557 140 : }
558 142 : grpc_stream_.maybeUpdateQueueSizeStat(request_queue_->size());
559 142 : }
560 :
561 : // A factory class for creating GrpcMuxImpl so it does not have to be
562 : // hard-compiled into cluster_manager_impl.cc
563 : class GrpcMuxFactory : public MuxFactory {
564 : public:
565 13 : std::string name() const override { return "envoy.config_mux.grpc_mux_factory"; }
566 135 : void shutdownAll() override { return GrpcMuxImpl::shutdownAll(); }
567 : std::shared_ptr<GrpcMux>
568 : create(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
569 : Random::RandomGenerator&, Stats::Scope& scope,
570 : const envoy::config::core::v3::ApiConfigSource& ads_config,
571 : const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
572 : BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
573 11 : XdsResourcesDelegateOptRef xds_resources_delegate, bool use_eds_resources_cache) override {
574 11 : GrpcMuxContext grpc_mux_context{
575 11 : /*async_client_=*/std::move(async_client),
576 11 : /*dispatcher_=*/dispatcher,
577 : /*service_method_=*/
578 11 : *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
579 11 : "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
580 11 : /*local_info_=*/local_info,
581 11 : /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
582 11 : /*scope_=*/scope,
583 11 : /*config_validators_=*/std::move(config_validators),
584 11 : /*xds_resources_delegate_=*/xds_resources_delegate,
585 11 : /*xds_config_tracker_=*/xds_config_tracker,
586 11 : /*backoff_strategy_=*/std::move(backoff_strategy),
587 11 : /*target_xds_authority_=*/Config::Utility::getGrpcControlPlane(ads_config).value_or(""),
588 : /*eds_resources_cache_=*/
589 11 : (use_eds_resources_cache &&
590 11 : Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
591 11 : ? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
592 11 : : nullptr};
593 11 : return std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context,
594 11 : ads_config.set_node_on_first_message_only());
595 11 : }
596 : };
597 :
598 : REGISTER_FACTORY(GrpcMuxFactory, MuxFactory);
599 :
600 : } // namespace Config
601 : } // namespace Envoy
|