LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc - grpc_mux_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 242 394 61.4 %
Date: 2024-01-05 06:35:25 Functions: 25 31 80.6 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
       2             : 
       3             : #include "envoy/service/discovery/v3/discovery.pb.h"
       4             : 
       5             : #include "source/common/config/decoded_resource_impl.h"
       6             : #include "source/common/config/utility.h"
       7             : #include "source/common/memory/utils.h"
       8             : #include "source/common/protobuf/protobuf.h"
       9             : #include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
      10             : #include "source/extensions/config_subscription/grpc/xds_source_id.h"
      11             : 
      12             : #include "absl/container/btree_map.h"
      13             : #include "absl/container/node_hash_set.h"
      14             : #include "absl/strings/match.h"
      15             : #include "absl/strings/str_cat.h"
      16             : 
      17             : namespace Envoy {
      18             : namespace Config {
      19             : 
      20             : namespace {
      21             : class AllMuxesState {
      22             : public:
      23          11 :   void insert(GrpcMuxImpl* mux) { muxes_.insert(mux); }
      24             : 
      25          11 :   void erase(GrpcMuxImpl* mux) { muxes_.erase(mux); }
      26             : 
      27         135 :   void shutdownAll() {
      28         135 :     for (auto& mux : muxes_) {
      29          11 :       mux->shutdown();
      30          11 :     }
      31         135 :   }
      32             : 
      33             : private:
      34             :   absl::flat_hash_set<GrpcMuxImpl*> muxes_;
      35             : };
      36             : using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
      37             : 
      38             : // Returns true if `resource_name` contains the wildcard XdsTp resource, for example:
      39             : // xdstp://test/envoy.config.cluster.v3.Cluster/foo-cluster/*
      40             : // xdstp://test/envoy.config.cluster.v3.Cluster/foo-cluster/*?node.name=my_node
      41           9 : bool isXdsTpWildcard(const std::string& resource_name) {
      42           9 :   return XdsResourceIdentifier::hasXdsTpScheme(resource_name) &&
      43           9 :          (absl::EndsWith(resource_name, "/*") || absl::StrContains(resource_name, "/*?"));
      44           9 : }
      45             : 
      46             : // Converts the XdsTp resource name to its wildcard equivalent.
      47             : // Must only be called on XdsTp resource names.
      48           0 : std::string convertToWildcard(const std::string& resource_name) {
      49           0 :   ASSERT(XdsResourceIdentifier::hasXdsTpScheme(resource_name));
      50           0 :   auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
      51           0 :   THROW_IF_STATUS_NOT_OK(resource_or_error, throw);
      52           0 :   xds::core::v3::ResourceName xdstp_resource = resource_or_error.value();
      53           0 :   const auto pos = xdstp_resource.id().find_last_of('/');
      54           0 :   xdstp_resource.set_id(
      55           0 :       pos == std::string::npos ? "*" : absl::StrCat(xdstp_resource.id().substr(0, pos), "/*"));
      56           0 :   XdsResourceIdentifier::EncodeOptions options;
      57           0 :   options.sort_context_params_ = true;
      58           0 :   return XdsResourceIdentifier::encodeUrn(xdstp_resource, options);
      59           0 : }
      60             : } // namespace
      61             : 
      62             : GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
      63             :     : grpc_stream_(this, std::move(grpc_mux_context.async_client_),
      64             :                    grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
      65             :                    grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
      66             :                    grpc_mux_context.rate_limit_settings_),
      67             :       local_info_(grpc_mux_context.local_info_), skip_subsequent_node_(skip_subsequent_node),
      68             :       config_validators_(std::move(grpc_mux_context.config_validators_)),
      69             :       xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
      70             :       xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_),
      71             :       eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)),
      72             :       target_xds_authority_(grpc_mux_context.target_xds_authority_),
      73             :       dispatcher_(grpc_mux_context.dispatcher_),
      74             :       dynamic_update_callback_handle_(
      75             :           grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
      76           0 :               [this](absl::string_view resource_type_url) {
      77           0 :                 onDynamicContextUpdate(resource_type_url);
      78          11 :               })) {
      79          11 :   Config::Utility::checkLocalInfo("ads", local_info_);
      80          11 :   AllMuxes::get().insert(this);
      81          11 : }
      82             : 
      83          11 : GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); }
      84             : 
      85         135 : void GrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
      86             : 
      87           0 : void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
      88           0 :   auto api_state = api_state_.find(resource_type_url);
      89           0 :   if (api_state == api_state_.end()) {
      90           0 :     return;
      91           0 :   }
      92           0 :   api_state->second->must_send_node_ = true;
      93           0 :   queueDiscoveryRequest(resource_type_url);
      94           0 : }
      95             : 
      96          11 : void GrpcMuxImpl::start() {
      97          11 :   ASSERT(!started_);
      98          11 :   if (started_) {
      99           0 :     return;
     100           0 :   }
     101          11 :   started_ = true;
     102          11 :   grpc_stream_.establishNewStream();
     103          11 : }
     104             : 
     105         140 : void GrpcMuxImpl::sendDiscoveryRequest(absl::string_view type_url) {
     106         140 :   if (shutdown_) {
     107           0 :     return;
     108           0 :   }
     109             : 
     110         140 :   ApiState& api_state = apiStateFor(type_url);
     111         140 :   auto& request = api_state.request_;
     112         140 :   request.mutable_resource_names()->Clear();
     113             : 
     114             :   // Maintain a set to avoid dupes.
     115         140 :   absl::node_hash_set<std::string> resources;
     116         174 :   for (const auto* watch : api_state.watches_) {
     117         174 :     for (const std::string& resource : watch->resources_) {
     118         107 :       if (!resources.contains(resource)) {
     119         107 :         resources.emplace(resource);
     120         107 :         request.add_resource_names(resource);
     121         107 :       }
     122         107 :     }
     123         174 :   }
     124             : 
     125         140 :   if (api_state.must_send_node_ || !skip_subsequent_node_ || first_stream_request_) {
     126             :     // Node may have been cleared during a previous request.
     127          10 :     request.mutable_node()->CopyFrom(local_info_.node());
     128          10 :     api_state.must_send_node_ = false;
     129         130 :   } else {
     130         130 :     request.clear_node();
     131         130 :   }
     132         140 :   ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url, request.ShortDebugString());
     133         140 :   grpc_stream_.sendMessage(request);
     134         140 :   first_stream_request_ = false;
     135             : 
     136             :   // clear error_detail after the request is sent if it exists.
     137         140 :   if (apiStateFor(type_url).request_.has_error_detail()) {
     138           0 :     apiStateFor(type_url).request_.clear_error_detail();
     139           0 :   }
     140         140 : }
     141             : 
     142          11 : void GrpcMuxImpl::clearNonce() {
     143             :   // Iterate over all api_states (for each type_url), and clear its nonce.
     144          11 :   for (auto& [type_url, api_state] : api_state_) {
     145           0 :     if (api_state) {
     146           0 :       api_state->request_.clear_response_nonce();
     147           0 :     }
     148           0 :   }
     149          11 : }
     150             : 
     151             : void GrpcMuxImpl::loadConfigFromDelegate(const std::string& type_url,
     152          31 :                                          const absl::flat_hash_set<std::string>& resource_names) {
     153          31 :   if (!xds_resources_delegate_.has_value()) {
     154          31 :     return;
     155          31 :   }
     156           0 :   ApiState& api_state = apiStateFor(type_url);
     157           0 :   if (api_state.watches_.empty()) {
     158             :     // No watches, so exit without loading config from storage.
     159           0 :     return;
     160           0 :   }
     161             : 
     162           0 :   const XdsConfigSourceId source_id{target_xds_authority_, type_url};
     163           0 :   TRY_ASSERT_MAIN_THREAD {
     164           0 :     std::vector<envoy::service::discovery::v3::Resource> resources =
     165           0 :         xds_resources_delegate_->getResources(source_id, resource_names);
     166           0 :     if (resources.empty()) {
     167             :       // There are no persisted resources, so nothing to process.
     168           0 :       return;
     169           0 :     }
     170             : 
     171           0 :     std::vector<DecodedResourcePtr> decoded_resources;
     172           0 :     OpaqueResourceDecoder& resource_decoder = *api_state.watches_.front()->resource_decoder_;
     173           0 :     std::string version_info;
     174           0 :     for (const auto& resource : resources) {
     175           0 :       if (version_info.empty()) {
     176           0 :         version_info = resource.version();
     177           0 :       } else {
     178           0 :         ASSERT(resource.version() == version_info);
     179           0 :       }
     180             : 
     181           0 :       TRY_ASSERT_MAIN_THREAD {
     182           0 :         decoded_resources.emplace_back(
     183           0 :             std::make_unique<DecodedResourceImpl>(resource_decoder, resource));
     184           0 :       }
     185           0 :       END_TRY
     186           0 :       catch (const EnvoyException& e) {
     187           0 :         xds_resources_delegate_->onResourceLoadFailed(source_id, resource.name(), e);
     188           0 :       }
     189           0 :     }
     190             : 
     191           0 :     processDiscoveryResources(decoded_resources, api_state, type_url, version_info,
     192           0 :                               /*call_delegate=*/false);
     193           0 :   }
     194           0 :   END_TRY
     195           0 :   catch (const EnvoyException& e) {
     196             :     // TODO(abeyad): do something else here?
     197           0 :     ENVOY_LOG_MISC(warn, "Failed to load config from delegate for {}: {}", source_id.toKey(),
     198           0 :                    e.what());
     199           0 :   }
     200           0 : }
     201             : 
     202             : GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
     203             :                                       const absl::flat_hash_set<std::string>& resources,
     204             :                                       SubscriptionCallbacks& callbacks,
     205             :                                       OpaqueResourceDecoderSharedPtr resource_decoder,
     206          51 :                                       const SubscriptionOptions& options) {
     207             :   // Resource cache is only used for EDS resources.
     208          51 :   EdsResourcesCacheOptRef resources_cache{absl::nullopt};
     209          51 :   if (eds_resources_cache_ &&
     210          51 :       (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
     211           0 :     resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
     212           0 :   }
     213          51 :   auto watch = std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url,
     214          51 :                                                   *this, options, local_info_, resources_cache);
     215          51 :   ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);
     216             : 
     217             :   // Lazily kick off the requests based on first subscription. This has the
     218             :   // convenient side-effect that we order messages on the channel based on
     219             :   // Envoy's internal dependency ordering.
     220             :   // TODO(gsagula): move TokenBucketImpl params to a config.
     221          51 :   if (!apiStateFor(type_url).subscribed_) {
     222          40 :     apiStateFor(type_url).request_.set_type_url(type_url);
     223          40 :     apiStateFor(type_url).request_.mutable_node()->MergeFrom(local_info_.node());
     224          40 :     apiStateFor(type_url).subscribed_ = true;
     225          40 :     subscriptions_.emplace_back(type_url);
     226          40 :   }
     227             : 
     228             :   // This will send an updated request on each subscription.
     229             :   // TODO(htuch): For RDS/EDS, this will generate a new DiscoveryRequest on each resource we added.
     230             :   // Consider in the future adding some kind of collation/batching during CDS/LDS updates so that we
     231             :   // only send a single RDS/EDS update after the CDS/LDS update.
     232          51 :   queueDiscoveryRequest(type_url);
     233             : 
     234          51 :   return watch;
     235          51 : }
     236             : 
     237         108 : ScopedResume GrpcMuxImpl::pause(const std::string& type_url) {
     238         108 :   return pause(std::vector<std::string>{type_url});
     239         108 : }
     240             : 
     241         165 : ScopedResume GrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
     242         279 :   for (const auto& type_url : type_urls) {
     243         279 :     ApiState& api_state = apiStateFor(type_url);
     244         279 :     ENVOY_LOG(debug, "Pausing discovery requests for {} (previous count {})", type_url,
     245         279 :               api_state.pauses_);
     246         279 :     ++api_state.pauses_;
     247         279 :   }
     248         165 :   return std::make_unique<Cleanup>([this, type_urls]() {
     249         279 :     for (const auto& type_url : type_urls) {
     250         279 :       ApiState& api_state = apiStateFor(type_url);
     251         279 :       ENVOY_LOG(debug, "Decreasing pause count on discovery requests for {} (previous count {})",
     252         279 :                 type_url, api_state.pauses_);
     253         279 :       ASSERT(api_state.paused());
     254             : 
     255         279 :       if (--api_state.pauses_ == 0 && api_state.pending_ && api_state.subscribed_) {
     256         120 :         ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url);
     257         120 :         queueDiscoveryRequest(type_url);
     258         120 :         api_state.pending_ = false;
     259         120 :       }
     260         279 :     }
     261         165 :   });
     262         165 : }
     263             : 
     264             : void GrpcMuxImpl::onDiscoveryResponse(
     265             :     std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
     266          88 :     ControlPlaneStats& control_plane_stats) {
     267          88 :   const std::string type_url = message->type_url();
     268          88 :   ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url, message->version_info());
     269             : 
     270          88 :   if (api_state_.count(type_url) == 0) {
     271             :     // TODO(yuval-k): This should never happen. consider dropping the stream as this is a
     272             :     // protocol violation
     273           0 :     ENVOY_LOG(warn, "Ignoring the message for type URL {} as it has no current subscribers.",
     274           0 :               type_url);
     275           0 :     return;
     276           0 :   }
     277             : 
     278          88 :   ApiState& api_state = apiStateFor(type_url);
     279             : 
     280          88 :   if (message->has_control_plane()) {
     281           0 :     control_plane_stats.identifier_.set(message->control_plane().identifier());
     282             : 
     283           0 :     if (message->control_plane().identifier() != api_state.control_plane_identifier_) {
     284           0 :       api_state.control_plane_identifier_ = message->control_plane().identifier();
     285           0 :       ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", type_url,
     286           0 :                 api_state.control_plane_identifier_);
     287           0 :     }
     288           0 :   }
     289             : 
     290          88 :   if (api_state.watches_.empty()) {
     291             :     // update the nonce as we are processing this response.
     292           0 :     api_state.request_.set_response_nonce(message->nonce());
     293           0 :     if (message->resources().empty()) {
     294             :       // No watches and no resources. This can happen when envoy unregisters from a
     295             :       // resource that's removed from the server as well. For example, a deleted cluster
     296             :       // triggers un-watching the ClusterLoadAssignment watch, and at the same time the
     297             :       // xDS server sends an empty list of ClusterLoadAssignment resources. we'll accept
     298             :       // this update. no need to send a discovery request, as we don't watch for anything.
     299           0 :       api_state.request_.set_version_info(message->version_info());
     300           0 :     } else {
     301             :       // No watches and we have resources - this should not happen. send a NACK (by not
     302             :       // updating the version).
     303           0 :       ENVOY_LOG(warn, "Ignoring unwatched type URL {}", type_url);
     304           0 :       queueDiscoveryRequest(type_url);
     305           0 :     }
     306           0 :     return;
     307           0 :   }
     308          88 :   ScopedResume same_type_resume;
     309             :   // We pause updates of the same type. This is necessary for SotW and GrpcMuxImpl, since unlike
     310             :   // delta and NewGRpcMuxImpl, independent watch additions/removals trigger updates regardless of
     311             :   // the delta state. The proper fix for this is to converge these implementations,
     312             :   // see https://github.com/envoyproxy/envoy/issues/11477.
     313          88 :   same_type_resume = pause(type_url);
     314          88 :   TRY_ASSERT_MAIN_THREAD {
     315          88 :     std::vector<DecodedResourcePtr> resources;
     316          88 :     OpaqueResourceDecoder& resource_decoder = *api_state.watches_.front()->resource_decoder_;
     317             : 
     318         142 :     for (const auto& resource : message->resources()) {
     319             :       // TODO(snowp): Check the underlying type when the resource is a Resource.
     320         142 :       if (!resource.Is<envoy::service::discovery::v3::Resource>() &&
     321         142 :           type_url != resource.type_url()) {
     322           0 :         throw EnvoyException(
     323           0 :             fmt::format("{} does not match the message-wide type URL {} in DiscoveryResponse {}",
     324           0 :                         resource.type_url(), type_url, message->DebugString()));
     325           0 :       }
     326             : 
     327         142 :       auto decoded_resource =
     328         142 :           DecodedResourceImpl::fromResource(resource_decoder, resource, message->version_info());
     329             : 
     330         142 :       if (!isHeartbeatResource(type_url, *decoded_resource)) {
     331         142 :         resources.emplace_back(std::move(decoded_resource));
     332         142 :       }
     333         142 :     }
     334             : 
     335          88 :     processDiscoveryResources(resources, api_state, type_url, message->version_info(),
     336          88 :                               /*call_delegate=*/true);
     337             : 
     338             :     // Processing point when resources are successfully ingested.
     339          88 :     if (xds_config_tracker_.has_value()) {
     340           0 :       xds_config_tracker_->onConfigAccepted(type_url, resources);
     341           0 :     }
     342          88 :   }
     343          88 :   END_TRY
     344          88 :   catch (const EnvoyException& e) {
     345           0 :     for (auto watch : api_state.watches_) {
     346           0 :       watch->callbacks_.onConfigUpdateFailed(
     347           0 :           Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
     348           0 :     }
     349           0 :     ::google::rpc::Status* error_detail = api_state.request_.mutable_error_detail();
     350           0 :     error_detail->set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
     351           0 :     error_detail->set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
     352             : 
     353             :     // Processing point when there is any exception during the parse and ingestion process.
     354           0 :     if (xds_config_tracker_.has_value()) {
     355           0 :       xds_config_tracker_->onConfigRejected(*message, error_detail->message());
     356           0 :     }
     357           0 :   }
     358          88 :   api_state.previously_fetched_data_ = true;
     359          88 :   api_state.request_.set_response_nonce(message->nonce());
     360          88 :   ASSERT(api_state.paused());
     361          88 :   queueDiscoveryRequest(type_url);
     362          88 : }
     363             : 
     364             : void GrpcMuxImpl::processDiscoveryResources(const std::vector<DecodedResourcePtr>& resources,
     365             :                                             ApiState& api_state, const std::string& type_url,
     366             :                                             const std::string& version_info,
     367          88 :                                             const bool call_delegate) {
     368          88 :   ASSERT_IS_MAIN_OR_TEST_THREAD();
     369             :   // To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we
     370             :   // build a map here from resource name to resource and then walk watches_.
     371             :   // We have to walk all watches (and need an efficient map as a result) to
     372             :   // ensure we deliver empty config updates when a resource is dropped. We make the map ordered
     373             :   // for test determinism.
     374          88 :   absl::btree_map<std::string, DecodedResourceRef> resource_ref_map;
     375          88 :   std::vector<DecodedResourceRef> all_resource_refs;
     376             : 
     377          88 :   const auto scoped_ttl_update = api_state.ttl_.scopedTtlUpdate();
     378             : 
     379         142 :   for (const auto& resource : resources) {
     380         142 :     if (resource->ttl()) {
     381           0 :       api_state.ttl_.add(*resource->ttl(), resource->name());
     382         142 :     } else {
     383         142 :       api_state.ttl_.clear(resource->name());
     384         142 :     }
     385             : 
     386         142 :     all_resource_refs.emplace_back(*resource);
     387         142 :     if (XdsResourceIdentifier::hasXdsTpScheme(resource->name())) {
     388             :       // Sort the context params of an xdstp resource, so we can compare them easily.
     389           0 :       auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource->name());
     390           0 :       THROW_IF_STATUS_NOT_OK(resource_or_error, throw);
     391           0 :       xds::core::v3::ResourceName xdstp_resource = resource_or_error.value();
     392           0 :       XdsResourceIdentifier::EncodeOptions options;
     393           0 :       options.sort_context_params_ = true;
     394           0 :       resource_ref_map.emplace(XdsResourceIdentifier::encodeUrn(xdstp_resource, options),
     395           0 :                                *resource);
     396         142 :     } else {
     397         142 :       resource_ref_map.emplace(resource->name(), *resource);
     398         142 :     }
     399         142 :   }
     400             : 
     401             :   // Execute external config validators if there are any watches.
     402          88 :   if (!api_state.watches_.empty()) {
     403          88 :     config_validators_->executeValidators(type_url, resources);
     404          88 :   }
     405             : 
     406         108 :   for (auto watch : api_state.watches_) {
     407             :     // onConfigUpdate should be called in all cases for single watch xDS (Cluster and
     408             :     // Listener) even if the message does not have resources so that update_empty stat
     409             :     // is properly incremented and state-of-the-world semantics are maintained.
     410         108 :     if (watch->resources_.empty()) {
     411          47 :       THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(all_resource_refs, version_info));
     412          47 :       continue;
     413          47 :     }
     414          61 :     std::vector<DecodedResourceRef> found_resources;
     415          61 :     for (const auto& watched_resource_name : watch->resources_) {
     416             :       // Look for a singleton subscription.
     417          61 :       auto it = resource_ref_map.find(watched_resource_name);
     418          61 :       if (it != resource_ref_map.end()) {
     419          52 :         found_resources.emplace_back(it->second);
     420          52 :       } else if (isXdsTpWildcard(watched_resource_name)) {
     421             :         // See if the resources match the xdstp wildcard subscription.
     422             :         // Note: although it is unlikely that Envoy will need to support a resource that is mapped
     423             :         // to both a singleton and collection watch, this code still supports this use case.
     424             :         // TODO(abeyad): This could be made more efficient, e.g. by pre-computing and having a map
     425             :         // entry for each wildcard watch.
     426           0 :         for (const auto& resource_ref_it : resource_ref_map) {
     427           0 :           if (XdsResourceIdentifier::hasXdsTpScheme(resource_ref_it.first) &&
     428           0 :               convertToWildcard(resource_ref_it.first) == watched_resource_name) {
     429           0 :             found_resources.emplace_back(resource_ref_it.second);
     430           0 :           }
     431           0 :         }
     432           0 :       }
     433          61 :     }
     434             : 
     435             :     // onConfigUpdate should be called only on watches(clusters/listeners) that have
     436             :     // updates in the message for EDS/RDS.
     437          61 :     if (!found_resources.empty()) {
     438          52 :       THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(found_resources, version_info));
     439             :       // Resource cache is only used for EDS resources.
     440          52 :       if (eds_resources_cache_ &&
     441          52 :           (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
     442           0 :         for (const auto& resource : found_resources) {
     443           0 :           const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
     444           0 :               dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
     445           0 :                   resource.get().resource());
     446           0 :           eds_resources_cache_->setResource(resource.get().name(), cluster_load_assignment);
     447           0 :         }
     448             :         // No need to remove resources from the cache, as currently only non-collection
     449             :         // subscriptions are supported, and these resources are removed in the call
     450             :         // to updateWatchInterest().
     451           0 :       }
     452          52 :     }
     453          61 :   }
     454             : 
     455             :   // All config updates have been applied without throwing an exception, so we'll call the xDS
     456             :   // resources delegate, if any.
     457          88 :   if (call_delegate && xds_resources_delegate_.has_value()) {
     458           0 :     xds_resources_delegate_->onConfigUpdated(XdsConfigSourceId{target_xds_authority_, type_url},
     459           0 :                                              all_resource_refs);
     460           0 :   }
     461             : 
     462             :   // TODO(mattklein123): In the future if we start tracking per-resource versions, we
     463             :   // would do that tracking here.
     464          88 :   api_state.request_.set_version_info(version_info);
     465          88 :   Memory::Utils::tryShrinkHeap();
     466          88 : }
     467             : 
     468           0 : void GrpcMuxImpl::onWriteable() { drainRequests(); }
     469             : 
     470          11 : void GrpcMuxImpl::onStreamEstablished() {
     471          11 :   first_stream_request_ = true;
     472          11 :   grpc_stream_.maybeUpdateQueueSizeStat(0);
     473          11 :   clearNonce();
     474          11 :   request_queue_ = std::make_unique<std::queue<std::string>>();
     475          11 :   for (const auto& type_url : subscriptions_) {
     476           0 :     queueDiscoveryRequest(type_url);
     477           0 :   }
     478          11 : }
     479             : 
     480          10 : void GrpcMuxImpl::onEstablishmentFailure() {
     481          69 :   for (const auto& api_state : api_state_) {
     482          69 :     for (auto watch : api_state.second->watches_) {
     483          48 :       watch->callbacks_.onConfigUpdateFailed(
     484          48 :           Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr);
     485          48 :     }
     486          69 :     if (!api_state.second->previously_fetched_data_) {
     487             :       // On the initialization of the gRPC mux, if connection to the xDS server fails, load the
     488             :       // persisted config, if available. The locally persisted config will be used until
     489             :       // connectivity is established with the xDS server.
     490          31 :       loadConfigFromDelegate(
     491          31 :           /*type_url=*/api_state.first,
     492          31 :           absl::flat_hash_set<std::string>{api_state.second->request_.resource_names().begin(),
     493          31 :                                            api_state.second->request_.resource_names().end()});
     494          31 :       api_state.second->previously_fetched_data_ = true;
     495          31 :     }
     496          69 :   }
     497          10 : }
     498             : 
     499         290 : void GrpcMuxImpl::queueDiscoveryRequest(absl::string_view queue_item) {
     500         290 :   if (!grpc_stream_.grpcStreamAvailable()) {
     501          28 :     ENVOY_LOG(debug, "No stream available to queueDiscoveryRequest for {}", queue_item);
     502          28 :     return; // Drop this request; the reconnect will enqueue a new one.
     503          28 :   }
     504         262 :   ApiState& api_state = apiStateFor(queue_item);
     505         262 :   if (api_state.paused()) {
     506         120 :     ENVOY_LOG(trace, "API {} paused during queueDiscoveryRequest(), setting pending.", queue_item);
     507         120 :     api_state.pending_ = true;
     508         120 :     return; // Drop this request; the unpause will enqueue a new one.
     509         120 :   }
     510         142 :   request_queue_->emplace(std::string(queue_item));
     511         142 :   drainRequests();
     512         142 : }
     513             : 
     514             : void GrpcMuxImpl::expiryCallback(absl::string_view type_url,
     515           0 :                                  const std::vector<std::string>& expired) {
     516             :   // The TtlManager triggers a callback with a list of all the expired elements, which we need
     517             :   // to compare against the various watched resources to return the subset that each watch is
     518             :   // subscribed to.
     519             : 
     520             :   // We convert the incoming list into a set in order to more efficiently perform this
     521             :   // comparison when there are a lot of watches.
     522           0 :   absl::flat_hash_set<std::string> all_expired;
     523           0 :   all_expired.insert(expired.begin(), expired.end());
     524             : 
     525             :   // Note: We can blindly dereference the lookup here since the only time we call this is in a
     526             :   // callback that is created at the same time as we insert the ApiState for this type.
     527           0 :   for (auto watch : api_state_.find(type_url)->second->watches_) {
     528           0 :     Protobuf::RepeatedPtrField<std::string> found_resources_for_watch;
     529             : 
     530           0 :     for (const auto& resource : expired) {
     531           0 :       if (all_expired.find(resource) != all_expired.end()) {
     532           0 :         found_resources_for_watch.Add(std::string(resource));
     533           0 :       }
     534           0 :     }
     535             : 
     536           0 :     THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate({}, found_resources_for_watch, ""));
     537           0 :   }
     538           0 : }
     539             : 
     540        1410 : GrpcMuxImpl::ApiState& GrpcMuxImpl::apiStateFor(absl::string_view type_url) {
     541        1410 :   auto itr = api_state_.find(type_url);
     542        1410 :   if (itr == api_state_.end()) {
     543          70 :     api_state_.emplace(
     544          70 :         type_url, std::make_unique<ApiState>(dispatcher_, [this, type_url](const auto& expired) {
     545           0 :           expiryCallback(type_url, expired);
     546           0 :         }));
     547          70 :   }
     548             : 
     549        1410 :   return *api_state_.find(type_url)->second;
     550        1410 : }
     551             : 
     552         142 : void GrpcMuxImpl::drainRequests() {
     553         282 :   while (!request_queue_->empty() && grpc_stream_.checkRateLimitAllowsDrain()) {
     554             :     // Process the request, if rate limiting is not enabled at all or if it is under rate limit.
     555         140 :     sendDiscoveryRequest(request_queue_->front());
     556         140 :     request_queue_->pop();
     557         140 :   }
     558         142 :   grpc_stream_.maybeUpdateQueueSizeStat(request_queue_->size());
     559         142 : }
     560             : 
     561             : // A factory class for creating GrpcMuxImpl so it does not have to be
     562             : // hard-compiled into cluster_manager_impl.cc
     563             : class GrpcMuxFactory : public MuxFactory {
     564             : public:
     565          13 :   std::string name() const override { return "envoy.config_mux.grpc_mux_factory"; }
     566         135 :   void shutdownAll() override { return GrpcMuxImpl::shutdownAll(); }
     567             :   std::shared_ptr<GrpcMux>
     568             :   create(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
     569             :          Random::RandomGenerator&, Stats::Scope& scope,
     570             :          const envoy::config::core::v3::ApiConfigSource& ads_config,
     571             :          const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
     572             :          BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
     573          11 :          XdsResourcesDelegateOptRef xds_resources_delegate, bool use_eds_resources_cache) override {
     574          11 :     GrpcMuxContext grpc_mux_context{
     575          11 :         /*async_client_=*/std::move(async_client),
     576          11 :         /*dispatcher_=*/dispatcher,
     577             :         /*service_method_=*/
     578          11 :         *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
     579          11 :             "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
     580          11 :         /*local_info_=*/local_info,
     581          11 :         /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
     582          11 :         /*scope_=*/scope,
     583          11 :         /*config_validators_=*/std::move(config_validators),
     584          11 :         /*xds_resources_delegate_=*/xds_resources_delegate,
     585          11 :         /*xds_config_tracker_=*/xds_config_tracker,
     586          11 :         /*backoff_strategy_=*/std::move(backoff_strategy),
     587          11 :         /*target_xds_authority_=*/Config::Utility::getGrpcControlPlane(ads_config).value_or(""),
     588             :         /*eds_resources_cache_=*/
     589          11 :         (use_eds_resources_cache &&
     590          11 :          Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
     591          11 :             ? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
     592          11 :             : nullptr};
     593          11 :     return std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context,
     594          11 :                                                  ads_config.set_node_on_first_message_only());
     595          11 :   }
     596             : };
     597             : 
     598             : REGISTER_FACTORY(GrpcMuxFactory, MuxFactory);
     599             : 
     600             : } // namespace Config
     601             : } // namespace Envoy

Generated by: LCOV version 1.15