LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc/xds_mux - grpc_mux_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 234 294 79.6 %
Date: 2024-01-05 06:35:25 Functions: 49 55 89.1 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h"
       2             : 
       3             : #include "envoy/config/endpoint/v3/endpoint.pb.h"
       4             : #include "envoy/service/discovery/v3/discovery.pb.h"
       5             : 
       6             : #include "source/common/common/assert.h"
       7             : #include "source/common/common/backoff_strategy.h"
       8             : #include "source/common/config/decoded_resource_impl.h"
       9             : #include "source/common/config/utility.h"
      10             : #include "source/common/config/xds_context_params.h"
      11             : #include "source/common/config/xds_resource.h"
      12             : #include "source/common/memory/utils.h"
      13             : #include "source/common/protobuf/protobuf.h"
      14             : #include "source/common/protobuf/utility.h"
      15             : #include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
      16             : 
      17             : namespace Envoy {
      18             : namespace Config {
      19             : namespace XdsMux {
      20             : 
      21             : namespace {
      22             : class AllMuxesState {
      23             : public:
      24          14 :   void insert(ShutdownableMux* mux) { muxes_.insert(mux); }
      25             : 
      26          14 :   void erase(ShutdownableMux* mux) { muxes_.erase(mux); }
      27             : 
      28         270 :   void shutdownAll() {
      29         270 :     for (auto& mux : muxes_) {
      30          28 :       mux->shutdown();
      31          28 :     }
      32         270 :   }
      33             : 
      34             : private:
      35             :   absl::flat_hash_set<ShutdownableMux*> muxes_;
      36             : };
      37             : using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
      38             : } // namespace
      39             : 
      40             : template <class S, class F, class RQ, class RS>
      41             : GrpcMuxImpl<S, F, RQ, RS>::GrpcMuxImpl(std::unique_ptr<F> subscription_state_factory,
      42             :                                        GrpcMuxContext& grpc_mux_content, bool skip_subsequent_node)
      43             :     : grpc_stream_(this, std::move(grpc_mux_content.async_client_),
      44             :                    grpc_mux_content.service_method_, grpc_mux_content.dispatcher_,
      45             :                    grpc_mux_content.scope_, std::move(grpc_mux_content.backoff_strategy_),
      46             :                    grpc_mux_content.rate_limit_settings_),
      47             :       subscription_state_factory_(std::move(subscription_state_factory)),
      48             :       skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_content.local_info_),
      49             :       dynamic_update_callback_handle_(
      50             :           grpc_mux_content.local_info_.contextProvider().addDynamicContextUpdateCallback(
      51           0 :               [this](absl::string_view resource_type_url) {
      52           0 :                 onDynamicContextUpdate(resource_type_url);
      53           0 :               })),
      54             :       config_validators_(std::move(grpc_mux_content.config_validators_)),
      55             :       xds_config_tracker_(grpc_mux_content.xds_config_tracker_),
      56             :       xds_resources_delegate_(grpc_mux_content.xds_resources_delegate_),
      57             :       eds_resources_cache_(std::move(grpc_mux_content.eds_resources_cache_)),
      58          14 :       target_xds_authority_(grpc_mux_content.target_xds_authority_) {
      59          14 :   Config::Utility::checkLocalInfo("ads", grpc_mux_content.local_info_);
      60          14 :   AllMuxes::get().insert(this);
      61          14 : }
      62             : 
      63          14 : template <class S, class F, class RQ, class RS> GrpcMuxImpl<S, F, RQ, RS>::~GrpcMuxImpl() {
      64          14 :   AllMuxes::get().erase(this);
      65          14 : }
      66             : 
      67         270 : template <class S, class F, class RQ, class RS> void GrpcMuxImpl<S, F, RQ, RS>::shutdownAll() {
      68         270 :   AllMuxes::get().shutdownAll();
      69         270 : }
      70             : 
      71             : template <class S, class F, class RQ, class RS>
      72           0 : void GrpcMuxImpl<S, F, RQ, RS>::onDynamicContextUpdate(absl::string_view resource_type_url) {
      73           0 :   ENVOY_LOG(debug, "GrpcMuxImpl::onDynamicContextUpdate for {}", resource_type_url);
      74           0 :   auto sub = subscriptions_.find(resource_type_url);
      75           0 :   if (sub == subscriptions_.end()) {
      76           0 :     return;
      77           0 :   }
      78           0 :   sub->second->setDynamicContextChanged();
      79           0 :   trySendDiscoveryRequests();
      80           0 : }
      81             : 
      82             : template <class S, class F, class RQ, class RS>
      83             : Config::GrpcMuxWatchPtr GrpcMuxImpl<S, F, RQ, RS>::addWatch(
      84             :     const std::string& type_url, const absl::flat_hash_set<std::string>& resources,
      85             :     SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
      86          67 :     const SubscriptionOptions& options) {
      87          67 :   auto watch_map = watch_maps_.find(type_url);
      88          67 :   if (watch_map == watch_maps_.end()) {
      89             :     // Resource cache is only used for EDS resources.
      90          54 :     EdsResourcesCacheOptRef resources_cache{absl::nullopt};
      91          54 :     if (eds_resources_cache_ &&
      92          54 :         (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
      93           0 :       resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
      94           0 :     }
      95             : 
      96             :     // We don't yet have a subscription for type_url! Make one!
      97          54 :     watch_map = watch_maps_
      98          54 :                     .emplace(type_url,
      99          54 :                              std::make_unique<WatchMap>(options.use_namespace_matching_, type_url,
     100          54 :                                                         *config_validators_.get(), resources_cache))
     101          54 :                     .first;
     102          54 :     subscriptions_.emplace(type_url, subscription_state_factory_->makeSubscriptionState(
     103          54 :                                          type_url, *watch_maps_[type_url], resource_decoder,
     104          54 :                                          xds_config_tracker_, xds_resources_delegate_,
     105          54 :                                          target_xds_authority_));
     106          54 :     subscription_ordering_.emplace_back(type_url);
     107          54 :   }
     108             : 
     109          67 :   Watch* watch = watch_map->second->addWatch(callbacks, *resource_decoder);
     110             :   // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
     111          67 :   updateWatch(type_url, watch, resources, options);
     112          67 :   return std::make_unique<WatchImpl>(type_url, watch, *this, options);
     113          67 : }
     114             : 
     115             : // Updates the list of resource names watched by the given watch. If an added name is new across
     116             : // the whole subscription, or if a removed name has no other watch interested in it, then the
     117             : // subscription will enqueue and attempt to send an appropriate discovery request.
     118             : template <class S, class F, class RQ, class RS>
     119             : void GrpcMuxImpl<S, F, RQ, RS>::updateWatch(const std::string& type_url, Watch* watch,
     120             :                                             const absl::flat_hash_set<std::string>& resources,
     121         134 :                                             const SubscriptionOptions& options) {
     122         134 :   ENVOY_LOG(debug, "GrpcMuxImpl::updateWatch for {}", type_url);
     123         134 :   ASSERT(watch != nullptr);
     124         134 :   auto& sub = subscriptionStateFor(type_url);
     125         134 :   WatchMap& watch_map = watchMapFor(type_url);
     126             : 
     127             :   // We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra
     128             :   // context parameters.
     129         134 :   absl::flat_hash_set<std::string> effective_resources;
     130         134 :   for (const auto& resource : resources) {
     131          39 :     if (XdsResourceIdentifier::hasXdsTpScheme(resource)) {
     132           0 :       auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource);
     133           0 :       THROW_IF_STATUS_NOT_OK(xdstp_resource_or_error, throw);
     134           0 :       auto xdstp_resource = xdstp_resource_or_error.value();
     135           0 :       if (options.add_xdstp_node_context_params_) {
     136           0 :         const auto context = XdsContextParams::encodeResource(
     137           0 :             local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
     138           0 :         xdstp_resource.mutable_context()->CopyFrom(context);
     139           0 :       }
     140           0 :       XdsResourceIdentifier::EncodeOptions encode_options;
     141           0 :       encode_options.sort_context_params_ = true;
     142           0 :       effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options));
     143          39 :     } else {
     144          39 :       effective_resources.insert(resource);
     145          39 :     }
     146          39 :   }
     147             : 
     148         134 :   auto added_removed = watch_map.updateWatchInterest(watch, effective_resources);
     149         134 :   if (options.use_namespace_matching_) {
     150             :     // This is to prevent sending out of requests that contain prefixes instead of resource names
     151           0 :     sub.updateSubscriptionInterest({}, {});
     152         134 :   } else {
     153         134 :     sub.updateSubscriptionInterest(added_removed.added_, added_removed.removed_);
     154         134 :   }
     155             : 
     156             :   // Tell the server about our change in interest, if any.
     157         134 :   if (sub.subscriptionUpdatePending()) {
     158         106 :     trySendDiscoveryRequests();
     159         106 :   }
     160         134 : }
     161             : 
     162             : template <class S, class F, class RQ, class RS>
     163          67 : void GrpcMuxImpl<S, F, RQ, RS>::removeWatch(const std::string& type_url, Watch* watch) {
     164          67 :   updateWatch(type_url, watch, {}, {});
     165          67 :   watchMapFor(type_url).removeWatch(watch);
     166          67 : }
     167             : 
     168             : template <class S, class F, class RQ, class RS>
     169          28 : ScopedResume GrpcMuxImpl<S, F, RQ, RS>::pause(const std::string& type_url) {
     170          28 :   return pause(std::vector<std::string>{type_url});
     171          28 : }
     172             : 
     173             : template <class S, class F, class RQ, class RS>
     174         100 : ScopedResume GrpcMuxImpl<S, F, RQ, RS>::pause(const std::vector<std::string> type_urls) {
     175         244 :   for (const auto& type_url : type_urls) {
     176         244 :     pausable_ack_queue_.pause(type_url);
     177         244 :   }
     178             : 
     179         100 :   return std::make_unique<Cleanup>([this, type_urls]() {
     180         244 :     for (const auto& type_url : type_urls) {
     181         244 :       pausable_ack_queue_.resume(type_url);
     182         244 :       trySendDiscoveryRequests();
     183         244 :     }
     184         100 :   });
     185         100 : }
     186             : 
     187             : template <class S, class F, class RQ, class RS>
     188         180 : void GrpcMuxImpl<S, F, RQ, RS>::sendGrpcMessage(RQ& msg_proto, S& sub_state) {
     189         180 :   if (sub_state.dynamicContextChanged() || !anyRequestSentYetInCurrentStream() ||
     190         180 :       !skipSubsequentNode()) {
     191          14 :     msg_proto.mutable_node()->CopyFrom(localInfo().node());
     192          14 :   }
     193         180 :   sendMessage(msg_proto);
     194         180 :   setAnyRequestSentYetInCurrentStream(true);
     195         180 :   sub_state.clearDynamicContextChanged();
     196         180 : }
     197             : 
     198             : template <class S, class F, class RQ, class RS>
     199             : void GrpcMuxImpl<S, F, RQ, RS>::genericHandleResponse(const std::string& type_url,
     200             :                                                       const RS& response_proto,
     201         111 :                                                       ControlPlaneStats& control_plane_stats) {
     202         111 :   auto sub = subscriptions_.find(type_url);
     203         111 :   if (sub == subscriptions_.end()) {
     204           0 :     ENVOY_LOG(warn,
     205           0 :               "The server sent an xDS response proto with type_url {}, which we have "
     206           0 :               "not subscribed to. Ignoring.",
     207           0 :               type_url);
     208           0 :     return;
     209           0 :   }
     210             : 
     211         111 :   if (response_proto.has_control_plane()) {
     212           0 :     control_plane_stats.identifier_.set(response_proto.control_plane().identifier());
     213             : 
     214           0 :     if (response_proto.control_plane().identifier() != sub->second->controlPlaneIdentifier()) {
     215           0 :       sub->second->setControlPlaneIdentifier(response_proto.control_plane().identifier());
     216           0 :       ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", response_proto.type_url(),
     217           0 :                 sub->second->controlPlaneIdentifier());
     218           0 :     }
     219           0 :   }
     220             : 
     221         111 :   pausable_ack_queue_.push(sub->second->handleResponse(response_proto));
     222         111 :   trySendDiscoveryRequests();
     223         111 :   Memory::Utils::tryShrinkHeap();
     224         111 : }
     225             : 
     226          14 : template <class S, class F, class RQ, class RS> void GrpcMuxImpl<S, F, RQ, RS>::start() {
     227          14 :   ASSERT(!started_);
     228          14 :   if (started_) {
     229           0 :     return;
     230           0 :   }
     231          14 :   started_ = true;
     232          14 :   ENVOY_LOG(debug, "GrpcMuxImpl now trying to establish a stream");
     233          14 :   grpc_stream_.establishNewStream();
     234          14 : }
     235             : 
     236             : template <class S, class F, class RQ, class RS>
     237          14 : void GrpcMuxImpl<S, F, RQ, RS>::handleEstablishedStream() {
     238          14 :   ENVOY_LOG(debug, "GrpcMuxImpl stream successfully established");
     239          14 :   for (auto& [type_url, subscription_state] : subscriptions_) {
     240           0 :     subscription_state->markStreamFresh();
     241           0 :   }
     242          14 :   setAnyRequestSentYetInCurrentStream(false);
     243          14 :   maybeUpdateQueueSizeStat(0);
     244          14 :   pausable_ack_queue_.clear();
     245          14 :   trySendDiscoveryRequests();
     246          14 : }
     247             : 
     248             : template <class S, class F, class RQ, class RS>
     249          14 : void GrpcMuxImpl<S, F, RQ, RS>::handleStreamEstablishmentFailure() {
     250          14 :   ENVOY_LOG(debug, "GrpcMuxImpl stream failed to establish");
     251             :   // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
     252             :   // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
     253             :   // crash, the iteration needs to dance around a little: collect pointers to all
     254             :   // SubscriptionStates, call on all those pointers we haven't yet called on, repeat if there are
     255             :   // now more SubscriptionStates.
     256          14 :   absl::flat_hash_map<std::string, S*> all_subscribed;
     257          14 :   absl::flat_hash_map<std::string, S*> already_called;
     258          14 :   do {
     259          54 :     for (auto& [type_url, subscription_state] : subscriptions_) {
     260          54 :       all_subscribed[type_url] = subscription_state.get();
     261          54 :     }
     262          54 :     for (auto& sub : all_subscribed) {
     263          54 :       if (already_called.insert(sub).second) { // insert succeeded ==> not already called
     264          54 :         sub.second->handleEstablishmentFailure();
     265          54 :       }
     266          54 :     }
     267          14 :   } while (all_subscribed.size() != subscriptions_.size());
     268          14 : }
     269             : 
     270             : template <class S, class F, class RQ, class RS>
     271        1838 : S& GrpcMuxImpl<S, F, RQ, RS>::subscriptionStateFor(const std::string& type_url) {
     272        1838 :   auto sub = subscriptions_.find(type_url);
     273        1838 :   RELEASE_ASSERT(sub != subscriptions_.end(),
     274        1838 :                  fmt::format("Tried to look up SubscriptionState for non-existent subscription {}.",
     275        1838 :                              type_url));
     276        1838 :   return *sub->second;
     277        1838 : }
     278             : 
     279             : template <class S, class F, class RQ, class RS>
     280         201 : WatchMap& GrpcMuxImpl<S, F, RQ, RS>::watchMapFor(const std::string& type_url) {
     281         201 :   auto watch_map = watch_maps_.find(type_url);
     282         201 :   RELEASE_ASSERT(
     283         201 :       watch_map != watch_maps_.end(),
     284         201 :       fmt::format("Tried to look up WatchMap for non-existent subscription {}.", type_url));
     285         201 :   return *watch_map->second;
     286         201 : }
     287             : 
     288             : template <class S, class F, class RQ, class RS>
     289         475 : void GrpcMuxImpl<S, F, RQ, RS>::trySendDiscoveryRequests() {
     290         475 :   if (shutdown_) {
     291          37 :     return;
     292          37 :   }
     293             : 
     294         618 :   while (true) {
     295             :     // Do any of our subscriptions even want to send a request?
     296         618 :     absl::optional<std::string> request_type_if_any = whoWantsToSendDiscoveryRequest();
     297         618 :     if (!request_type_if_any.has_value()) {
     298         438 :       break;
     299         438 :     }
     300             :     // If so, which one (by type_url)?
     301         180 :     std::string next_request_type_url = request_type_if_any.value();
     302         180 :     auto& sub = subscriptionStateFor(next_request_type_url);
     303         180 :     ENVOY_LOG(debug, "GrpcMuxImpl wants to send discovery request for {}", next_request_type_url);
     304             :     // Try again later if paused/rate limited/stream down.
     305         180 :     if (!canSendDiscoveryRequest(next_request_type_url)) {
     306           0 :       break;
     307           0 :     }
     308         180 :     std::unique_ptr<RQ> request;
     309             :     // Get our subscription state to generate the appropriate discovery request, and send.
     310         180 :     if (!pausable_ack_queue_.empty()) {
     311             :       // Because ACKs take precedence over plain requests, if there is anything in the queue, it's
     312             :       // safe to assume it's of the type_url that we're wanting to send.
     313             :       //
     314             :       // getNextRequestWithAck() returns a raw unowned pointer, which sendGrpcMessage deletes.
     315         111 :       request = sub.getNextRequestWithAck(pausable_ack_queue_.popFront());
     316         111 :       ENVOY_LOG(debug, "GrpcMuxImpl sent ACK discovery request for {}", next_request_type_url);
     317         111 :     } else {
     318             :       // Returns a raw unowned pointer, which sendGrpcMessage deletes.
     319          69 :       request = sub.getNextRequestAckless();
     320          69 :       ENVOY_LOG(debug, "GrpcMuxImpl sent non-ACK discovery request for {}", next_request_type_url);
     321          69 :     }
     322         180 :     ENVOY_LOG(debug, "GrpcMuxImpl skip_subsequent_node: {}", skipSubsequentNode());
     323         180 :     sendGrpcMessage(*request, sub);
     324         180 :   }
     325         438 :   maybeUpdateQueueSizeStat(pausable_ack_queue_.size());
     326         438 : }
     327             : 
     328             : // Checks whether external conditions allow sending a discovery request. (Does not check
     329             : // whether we *want* to send a discovery request).
     330             : template <class S, class F, class RQ, class RS>
     331         180 : bool GrpcMuxImpl<S, F, RQ, RS>::canSendDiscoveryRequest(const std::string& type_url) {
     332         180 :   RELEASE_ASSERT(
     333         180 :       !pausable_ack_queue_.paused(type_url),
     334         180 :       fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is "
     335         180 :                   "supposed to be filtered out by whoWantsToSendDiscoveryRequest(). ",
     336         180 :                   type_url));
     337             : 
     338         180 :   if (!grpcStreamAvailable()) {
     339           0 :     ENVOY_LOG(trace, "No stream available to send a discovery request for {}.", type_url);
     340           0 :     return false;
     341         180 :   } else if (!rateLimitAllowsDrain()) {
     342           0 :     ENVOY_LOG(trace, "{} discovery request hit rate limit; will try later.", type_url);
     343           0 :     return false;
     344           0 :   }
     345         180 :   return true;
     346         180 : }
     347             : 
     348             : // Checks whether we have something to say in a discovery request, which can be an ACK and/or
     349             : // a subscription update. (Does not check whether we *can* send that discovery request).
     350             : // Returns the type_url we should send the discovery request for (if any).
     351             : // First, prioritizes ACKs over non-ACK subscription interest updates.
     352             : // Then, prioritizes non-ACK updates in the order the various types
     353             : // of subscriptions were activated.
     354             : template <class S, class F, class RQ, class RS>
     355         618 : absl::optional<std::string> GrpcMuxImpl<S, F, RQ, RS>::whoWantsToSendDiscoveryRequest() {
     356             :   // All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose
     357             :   // type_url from pausable_ack_queue_ if possible, before looking at pending updates.
     358         618 :   if (!pausable_ack_queue_.empty()) {
     359         111 :     return pausable_ack_queue_.front().type_url_;
     360         111 :   }
     361             :   // If we're looking to send multiple non-ACK requests, send them in the order that their
     362             :   // subscriptions were initiated.
     363        1524 :   for (const auto& sub_type : subscription_ordering_) {
     364        1524 :     auto& sub = subscriptionStateFor(sub_type);
     365        1524 :     if (sub.subscriptionUpdatePending() && !pausable_ack_queue_.paused(sub_type)) {
     366          69 :       return sub_type;
     367          69 :     }
     368        1524 :   }
     369         438 :   return absl::nullopt;
     370         507 : }
     371             : 
     372             : template class GrpcMuxImpl<DeltaSubscriptionState, DeltaSubscriptionStateFactory,
     373             :                            envoy::service::discovery::v3::DeltaDiscoveryRequest,
     374             :                            envoy::service::discovery::v3::DeltaDiscoveryResponse>;
     375             : template class GrpcMuxImpl<SotwSubscriptionState, SotwSubscriptionStateFactory,
     376             :                            envoy::service::discovery::v3::DiscoveryRequest,
     377             :                            envoy::service::discovery::v3::DiscoveryResponse>;
     378             : 
     379             : // Delta- and SotW-specific concrete subclasses:
     380             : GrpcMuxDelta::GrpcMuxDelta(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
     381             :     : GrpcMuxImpl(std::make_unique<DeltaSubscriptionStateFactory>(grpc_mux_context.dispatcher_),
     382           4 :                   grpc_mux_context, skip_subsequent_node) {}
     383             : 
     384             : // GrpcStreamCallbacks for GrpcMuxDelta
     385             : void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url,
     386           0 :                                          const absl::flat_hash_set<std::string>& for_update) {
     387           0 :   auto& sub = subscriptionStateFor(type_url);
     388           0 :   sub.updateSubscriptionInterest(for_update, {});
     389             :   // Tell the server about our change in interest, if any.
     390           0 :   if (sub.subscriptionUpdatePending()) {
     391           0 :     trySendDiscoveryRequests();
     392           0 :   }
     393           0 : }
     394             : 
     395             : GrpcMuxSotw::GrpcMuxSotw(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
     396             :     : GrpcMuxImpl(std::make_unique<SotwSubscriptionStateFactory>(grpc_mux_context.dispatcher_),
     397          10 :                   grpc_mux_context, skip_subsequent_node) {}
     398             : 
     399             : Config::GrpcMuxWatchPtr NullGrpcMuxImpl::addWatch(const std::string&,
     400             :                                                   const absl::flat_hash_set<std::string>&,
     401             :                                                   SubscriptionCallbacks&,
     402             :                                                   OpaqueResourceDecoderSharedPtr,
     403           0 :                                                   const SubscriptionOptions&) {
     404           0 :   throw EnvoyException("ADS must be configured to support an ADS config source");
     405           0 : }
     406             : 
     407             : class DeltaGrpcMuxFactory : public MuxFactory {
     408             : public:
     409          13 :   std::string name() const override { return "envoy.config_mux.delta_grpc_mux_factory"; }
     410         135 :   void shutdownAll() override { return GrpcMuxDelta::shutdownAll(); }
     411             :   std::shared_ptr<GrpcMux>
     412             :   create(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
     413             :          Random::RandomGenerator&, Stats::Scope& scope,
     414             :          const envoy::config::core::v3::ApiConfigSource& ads_config,
     415             :          const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
     416             :          BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
     417           4 :          XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override {
     418           4 :     GrpcMuxContext grpc_mux_context{
     419           4 :         /*async_client_=*/std::move(async_client),
     420           4 :         /*dispatcher_=*/dispatcher,
     421             :         /*service_method_=*/
     422           4 :         *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
     423           4 :             "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"),
     424           4 :         /*local_info_=*/local_info,
     425           4 :         /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
     426           4 :         /*scope_=*/scope,
     427           4 :         /*config_validators_=*/std::move(config_validators),
     428           4 :         /*xds_resources_delegate_=*/absl::nullopt,
     429           4 :         /*xds_config_tracker_=*/xds_config_tracker,
     430           4 :         /*backoff_strategy_=*/std::move(backoff_strategy),
     431           4 :         /*target_xds_authority_=*/"",
     432             :         /*eds_resources_cache_=*/
     433           4 :         (use_eds_resources_cache &&
     434           4 :          Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
     435           4 :             ? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
     436           4 :             : nullptr};
     437           4 :     return std::make_shared<GrpcMuxDelta>(grpc_mux_context,
     438           4 :                                           ads_config.set_node_on_first_message_only());
     439           4 :   }
     440             : };
     441             : 
     442             : class SotwGrpcMuxFactory : public MuxFactory {
     443             : public:
     444          13 :   std::string name() const override { return "envoy.config_mux.sotw_grpc_mux_factory"; }
     445         135 :   void shutdownAll() override { return GrpcMuxSotw::shutdownAll(); }
     446             :   std::shared_ptr<GrpcMux>
     447             :   create(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
     448             :          Random::RandomGenerator&, Stats::Scope& scope,
     449             :          const envoy::config::core::v3::ApiConfigSource& ads_config,
     450             :          const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
     451             :          BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
     452          10 :          XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override {
     453          10 :     GrpcMuxContext grpc_mux_context{
     454          10 :         /*async_client_=*/std::move(async_client),
     455          10 :         /*dispatcher_=*/dispatcher,
     456             :         /*service_method_=*/
     457          10 :         *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
     458          10 :             "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
     459          10 :         /*local_info_=*/local_info,
     460          10 :         /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
     461          10 :         /*scope_=*/scope,
     462          10 :         /*config_validators_=*/std::move(config_validators),
     463          10 :         /*xds_resources_delegate_=*/absl::nullopt,
     464          10 :         /*xds_config_tracker_=*/xds_config_tracker,
     465          10 :         /*backoff_strategy_=*/std::move(backoff_strategy),
     466          10 :         /*target_xds_authority_=*/"",
     467             :         /*eds_resources_cache_=*/
     468          10 :         (use_eds_resources_cache &&
     469          10 :          Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
     470          10 :             ? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
     471          10 :             : nullptr};
     472          10 :     return std::make_shared<GrpcMuxSotw>(grpc_mux_context,
     473          10 :                                          ads_config.set_node_on_first_message_only());
     474          10 :   }
     475             : };
     476             : 
     477             : REGISTER_FACTORY(DeltaGrpcMuxFactory, MuxFactory);
     478             : REGISTER_FACTORY(SotwGrpcMuxFactory, MuxFactory);
     479             : 
     480             : } // namespace XdsMux
     481             : } // namespace Config
     482             : } // namespace Envoy

Generated by: LCOV version 1.15