LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc - new_grpc_mux_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 188 250 75.2 %
Date: 2024-01-05 06:35:25 Functions: 24 28 85.7 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
       2             : 
       3             : #include "envoy/service/discovery/v3/discovery.pb.h"
       4             : 
       5             : #include "source/common/common/assert.h"
       6             : #include "source/common/common/backoff_strategy.h"
       7             : #include "source/common/common/token_bucket_impl.h"
       8             : #include "source/common/config/utility.h"
       9             : #include "source/common/config/xds_context_params.h"
      10             : #include "source/common/config/xds_resource.h"
      11             : #include "source/common/memory/utils.h"
      12             : #include "source/common/protobuf/protobuf.h"
      13             : #include "source/common/protobuf/utility.h"
      14             : #include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
      15             : 
      16             : namespace Envoy {
      17             : namespace Config {
      18             : 
      19             : namespace {
      20             : class AllMuxesState {
      21             : public:
      22           4 :   void insert(NewGrpcMuxImpl* mux) { muxes_.insert(mux); }
      23             : 
      24           4 :   void erase(NewGrpcMuxImpl* mux) { muxes_.erase(mux); }
      25             : 
      26         135 :   void shutdownAll() {
      27         135 :     for (auto& mux : muxes_) {
      28           4 :       mux->shutdown();
      29           4 :     }
      30         135 :   }
      31             : 
      32             : private:
      33             :   absl::flat_hash_set<NewGrpcMuxImpl*> muxes_;
      34             : };
      35             : using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
      36             : } // namespace
      37             : 
      38             : NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context)
      39             :     : grpc_stream_(this, std::move(grpc_mux_context.async_client_),
      40             :                    grpc_mux_context.service_method_, grpc_mux_context.dispatcher_,
      41             :                    grpc_mux_context.scope_, std::move(grpc_mux_context.backoff_strategy_),
      42             :                    grpc_mux_context.rate_limit_settings_),
      43             :       local_info_(grpc_mux_context.local_info_),
      44             :       config_validators_(std::move(grpc_mux_context.config_validators_)),
      45             :       dynamic_update_callback_handle_(
      46             :           grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
      47           0 :               [this](absl::string_view resource_type_url) {
      48           0 :                 onDynamicContextUpdate(resource_type_url);
      49           0 :               })),
      50             :       dispatcher_(grpc_mux_context.dispatcher_),
      51             :       xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
      52           4 :       eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)) {
      53           4 :   AllMuxes::get().insert(this);
      54           4 : }
      55             : 
      56           4 : NewGrpcMuxImpl::~NewGrpcMuxImpl() { AllMuxes::get().erase(this); }
      57             : 
      58         135 : void NewGrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
      59             : 
      60           0 : void NewGrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
      61           0 :   auto sub = subscriptions_.find(resource_type_url);
      62           0 :   if (sub == subscriptions_.end()) {
      63           0 :     return;
      64           0 :   }
      65           0 :   sub->second->sub_state_.setMustSendDiscoveryRequest();
      66           0 :   trySendDiscoveryRequests();
      67           0 : }
      68             : 
      69           8 : ScopedResume NewGrpcMuxImpl::pause(const std::string& type_url) {
      70           8 :   return pause(std::vector<std::string>{type_url});
      71           8 : }
      72             : 
      73          23 : ScopedResume NewGrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
      74          53 :   for (const auto& type_url : type_urls) {
      75          53 :     pausable_ack_queue_.pause(type_url);
      76          53 :   }
      77             : 
      78          23 :   return std::make_unique<Cleanup>([this, type_urls]() {
      79          53 :     for (const auto& type_url : type_urls) {
      80          53 :       pausable_ack_queue_.resume(type_url);
      81          53 :       if (!pausable_ack_queue_.paused(type_url)) {
      82          53 :         trySendDiscoveryRequests();
      83          53 :       }
      84          53 :     }
      85          23 :   });
      86          23 : }
      87             : 
      88             : void NewGrpcMuxImpl::onDiscoveryResponse(
      89             :     std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message,
      90          23 :     ControlPlaneStats& control_plane_stats) {
      91          23 :   ENVOY_LOG(debug, "Received DeltaDiscoveryResponse for {} at version {}", message->type_url(),
      92          23 :             message->system_version_info());
      93             : 
      94          23 :   auto sub = subscriptions_.find(message->type_url());
      95          23 :   if (sub == subscriptions_.end()) {
      96           0 :     ENVOY_LOG(warn,
      97           0 :               "Dropping received DeltaDiscoveryResponse (with version {}) for non-existent "
      98           0 :               "subscription {}.",
      99           0 :               message->system_version_info(), message->type_url());
     100           0 :     return;
     101           0 :   }
     102             : 
     103          23 :   if (message->has_control_plane()) {
     104           0 :     control_plane_stats.identifier_.set(message->control_plane().identifier());
     105             : 
     106           0 :     if (message->control_plane().identifier() != sub->second->control_plane_identifier_) {
     107           0 :       sub->second->control_plane_identifier_ = message->control_plane().identifier();
     108           0 :       ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", message->type_url(),
     109           0 :                 sub->second->control_plane_identifier_);
     110           0 :     }
     111           0 :   }
     112             : 
     113          23 :   auto ack = sub->second->sub_state_.handleResponse(*message);
     114             : 
     115             :   // Processing point to record error if there is any failure after the response is processed.
     116          23 :   if (xds_config_tracker_.has_value() &&
     117          23 :       ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
     118           0 :     xds_config_tracker_->onConfigRejected(*message, ack.error_detail_.message());
     119           0 :   }
     120          23 :   kickOffAck(ack);
     121          23 :   Memory::Utils::tryShrinkHeap();
     122          23 : }
     123             : 
     124           4 : void NewGrpcMuxImpl::onStreamEstablished() {
     125           4 :   for (auto& [type_url, subscription] : subscriptions_) {
     126           0 :     UNREFERENCED_PARAMETER(type_url);
     127           0 :     subscription->sub_state_.markStreamFresh();
     128           0 :   }
     129           4 :   pausable_ack_queue_.clear();
     130           4 :   trySendDiscoveryRequests();
     131           4 : }
     132             : 
     133           4 : void NewGrpcMuxImpl::onEstablishmentFailure() {
     134             :   // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
     135             :   // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
     136             :   // crash, the iteration needs to dance around a little: collect pointers to all
     137             :   // SubscriptionStates, call on all those pointers we haven't yet called on, repeat if there are
     138             :   // now more SubscriptionStates.
     139           4 :   absl::flat_hash_map<std::string, DeltaSubscriptionState*> all_subscribed;
     140           4 :   absl::flat_hash_map<std::string, DeltaSubscriptionState*> already_called;
     141           4 :   do {
     142          15 :     for (auto& [type_url, subscription] : subscriptions_) {
     143          15 :       all_subscribed[type_url] = &subscription->sub_state_;
     144          15 :     }
     145          15 :     for (auto& sub : all_subscribed) {
     146          15 :       if (already_called.insert(sub).second) { // insert succeeded ==> not already called
     147          15 :         sub.second->handleEstablishmentFailure();
     148          15 :       }
     149          15 :     }
     150           4 :   } while (all_subscribed.size() != subscriptions_.size());
     151           4 : }
     152             : 
     153           0 : void NewGrpcMuxImpl::onWriteable() { trySendDiscoveryRequests(); }
     154             : 
     155          23 : void NewGrpcMuxImpl::kickOffAck(UpdateAck ack) {
     156          23 :   pausable_ack_queue_.push(std::move(ack));
     157          23 :   trySendDiscoveryRequests();
     158          23 : }
     159             : 
     160             : // TODO(fredlas) to be removed from the GrpcMux interface very soon.
     161           4 : void NewGrpcMuxImpl::start() {
     162           4 :   ASSERT(!started_);
     163           4 :   if (started_) {
     164           0 :     return;
     165           0 :   }
     166           4 :   started_ = true;
     167           4 :   grpc_stream_.establishNewStream();
     168           4 : }
     169             : 
     170             : GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
     171             :                                          const absl::flat_hash_set<std::string>& resources,
     172             :                                          SubscriptionCallbacks& callbacks,
     173             :                                          OpaqueResourceDecoderSharedPtr resource_decoder,
     174          32 :                                          const SubscriptionOptions& options) {
     175          32 :   auto entry = subscriptions_.find(type_url);
     176          32 :   if (entry == subscriptions_.end()) {
     177             :     // We don't yet have a subscription for type_url! Make one!
     178          15 :     addSubscription(type_url, options.use_namespace_matching_);
     179          15 :     return addWatch(type_url, resources, callbacks, resource_decoder, options);
     180          15 :   }
     181             : 
     182          17 :   Watch* watch = entry->second->watch_map_.addWatch(callbacks, *resource_decoder);
     183             :   // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
     184          17 :   updateWatch(type_url, watch, resources, options);
     185          17 :   return std::make_unique<WatchImpl>(type_url, watch, *this, options);
     186          32 : }
     187             : 
     188             : // Updates the list of resource names watched by the given watch. If an added name is new across
     189             : // the whole subscription, or if a removed name has no other watch interested in it, then the
     190             : // subscription will enqueue and attempt to send an appropriate discovery request.
     191             : void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
     192             :                                  const absl::flat_hash_set<std::string>& resources,
     193          34 :                                  const SubscriptionOptions& options) {
     194          34 :   ASSERT(watch != nullptr);
     195          34 :   auto sub = subscriptions_.find(type_url);
     196          34 :   RELEASE_ASSERT(sub != subscriptions_.end(),
     197          34 :                  fmt::format("Watch of {} has no subscription to update.", type_url));
     198             :   // We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra
     199             :   // context parameters.
     200          34 :   absl::flat_hash_set<std::string> effective_resources;
     201          34 :   for (const auto& resource : resources) {
     202           9 :     if (XdsResourceIdentifier::hasXdsTpScheme(resource)) {
     203           0 :       auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource);
     204           0 :       THROW_IF_STATUS_NOT_OK(xdstp_resource_or_error, throw);
     205           0 :       auto xdstp_resource = xdstp_resource_or_error.value();
     206           0 :       if (options.add_xdstp_node_context_params_) {
     207           0 :         const auto context = XdsContextParams::encodeResource(
     208           0 :             local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
     209           0 :         xdstp_resource.mutable_context()->CopyFrom(context);
     210           0 :       }
     211           0 :       XdsResourceIdentifier::EncodeOptions encode_options;
     212           0 :       encode_options.sort_context_params_ = true;
     213           0 :       effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options));
     214           9 :     } else {
     215           9 :       effective_resources.insert(resource);
     216           9 :     }
     217           9 :   }
     218          34 :   auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, effective_resources);
     219          34 :   if (options.use_namespace_matching_) {
     220             :     // This is to prevent sending out of requests that contain prefixes instead of resource names
     221           0 :     sub->second->sub_state_.updateSubscriptionInterest({}, {});
     222          34 :   } else {
     223          34 :     sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_,
     224          34 :                                                        added_removed.removed_);
     225          34 :   }
     226             :   // Tell the server about our change in interest, if any.
     227          34 :   if (sub->second->sub_state_.subscriptionUpdatePending()) {
     228          26 :     trySendDiscoveryRequests();
     229          26 :   }
     230          34 : }
     231             : 
     232             : void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
     233           0 :                                            const absl::flat_hash_set<std::string>& for_update) {
     234           0 :   auto sub = subscriptions_.find(type_url);
     235           0 :   RELEASE_ASSERT(sub != subscriptions_.end(),
     236           0 :                  fmt::format("Watch of {} has no subscription to update.", type_url));
     237           0 :   sub->second->sub_state_.updateSubscriptionInterest(for_update, {});
     238             :   // Tell the server about our change in interest, if any.
     239           0 :   if (sub->second->sub_state_.subscriptionUpdatePending()) {
     240           0 :     trySendDiscoveryRequests();
     241           0 :   }
     242           0 : }
     243             : 
     244          17 : void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
     245          17 :   updateWatch(type_url, watch, {}, {});
     246          17 :   auto entry = subscriptions_.find(type_url);
     247          17 :   ASSERT(entry != subscriptions_.end(),
     248          17 :          fmt::format("removeWatch() called for non-existent subscription {}.", type_url));
     249          17 :   entry->second->watch_map_.removeWatch(watch);
     250          17 : }
     251             : 
     252             : void NewGrpcMuxImpl::addSubscription(const std::string& type_url,
     253          15 :                                      const bool use_namespace_matching) {
     254             :   // Resource cache is only used for EDS resources.
     255          15 :   EdsResourcesCacheOptRef resources_cache{absl::nullopt};
     256          15 :   if (eds_resources_cache_ &&
     257          15 :       (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
     258           0 :     resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
     259           0 :   }
     260          15 :   subscriptions_.emplace(
     261          15 :       type_url, std::make_unique<SubscriptionStuff>(type_url, local_info_, use_namespace_matching,
     262          15 :                                                     dispatcher_, *config_validators_.get(),
     263          15 :                                                     xds_config_tracker_, resources_cache));
     264          15 :   subscription_ordering_.emplace_back(type_url);
     265          15 : }
     266             : 
     267         106 : void NewGrpcMuxImpl::trySendDiscoveryRequests() {
     268         106 :   if (shutdown_) {
     269           9 :     return;
     270           9 :   }
     271             : 
     272         137 :   while (true) {
     273             :     // Do any of our subscriptions even want to send a request?
     274         137 :     absl::optional<std::string> maybe_request_type = whoWantsToSendDiscoveryRequest();
     275         137 :     if (!maybe_request_type.has_value()) {
     276          97 :       break;
     277          97 :     }
     278             :     // If so, which one (by type_url)?
     279          40 :     std::string next_request_type_url = maybe_request_type.value();
     280             :     // If we don't have a subscription object for this request's type_url, drop the request.
     281          40 :     auto sub = subscriptions_.find(next_request_type_url);
     282          40 :     RELEASE_ASSERT(sub != subscriptions_.end(),
     283          40 :                    fmt::format("Tried to send discovery request for non-existent subscription {}.",
     284          40 :                                next_request_type_url));
     285             : 
     286             :     // Try again later if paused/rate limited/stream down.
     287          40 :     if (!canSendDiscoveryRequest(next_request_type_url)) {
     288           0 :       break;
     289           0 :     }
     290          40 :     envoy::service::discovery::v3::DeltaDiscoveryRequest request;
     291             :     // Get our subscription state to generate the appropriate DeltaDiscoveryRequest, and send.
     292          40 :     if (!pausable_ack_queue_.empty()) {
     293             :       // Because ACKs take precedence over plain requests, if there is anything in the queue, it's
     294             :       // safe to assume it's of the type_url that we're wanting to send.
     295          23 :       request = sub->second->sub_state_.getNextRequestWithAck(pausable_ack_queue_.popFront());
     296          23 :     } else {
     297          17 :       request = sub->second->sub_state_.getNextRequestAckless();
     298          17 :     }
     299          40 :     grpc_stream_.sendMessage(request);
     300          40 :   }
     301          97 :   grpc_stream_.maybeUpdateQueueSizeStat(pausable_ack_queue_.size());
     302          97 : }
     303             : 
     304             : // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
     305             : // whether we *want* to send a DeltaDiscoveryRequest).
     306          40 : bool NewGrpcMuxImpl::canSendDiscoveryRequest(const std::string& type_url) {
     307          40 :   RELEASE_ASSERT(
     308          40 :       !pausable_ack_queue_.paused(type_url),
     309          40 :       fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is "
     310          40 :                   "supposed to be filtered out by whoWantsToSendDiscoveryRequest(). ",
     311          40 :                   type_url));
     312             : 
     313          40 :   if (!grpc_stream_.grpcStreamAvailable()) {
     314           0 :     ENVOY_LOG(trace, "No stream available to send a discovery request for {}.", type_url);
     315           0 :     return false;
     316          40 :   } else if (!grpc_stream_.checkRateLimitAllowsDrain()) {
     317           0 :     ENVOY_LOG(trace, "{} discovery request hit rate limit; will try later.", type_url);
     318           0 :     return false;
     319           0 :   }
     320          40 :   return true;
     321          40 : }
     322             : 
     323             : // Checks whether we have something to say in a DeltaDiscoveryRequest, which can be an ACK and/or
     324             : // a subscription update. (Does not check whether we *can* send that DeltaDiscoveryRequest).
     325             : // Returns the type_url we should send the DeltaDiscoveryRequest for (if any).
     326             : // First, prioritizes ACKs over non-ACK subscription interest updates.
     327             : // Then, prioritizes non-ACK updates in the order the various types
     328             : // of subscriptions were activated.
     329         137 : absl::optional<std::string> NewGrpcMuxImpl::whoWantsToSendDiscoveryRequest() {
     330             :   // All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose
     331             :   // type_url from pausable_ack_queue_ if possible, before looking at pending updates.
     332         137 :   if (!pausable_ack_queue_.empty()) {
     333          23 :     return pausable_ack_queue_.front().type_url_;
     334          23 :   }
     335             :   // If we're looking to send multiple non-ACK requests, send them in the order that their
     336             :   // subscriptions were initiated.
     337         312 :   for (const auto& sub_type : subscription_ordering_) {
     338         312 :     auto sub = subscriptions_.find(sub_type);
     339         312 :     if (sub != subscriptions_.end() && sub->second->sub_state_.subscriptionUpdatePending() &&
     340         312 :         !pausable_ack_queue_.paused(sub_type)) {
     341          17 :       return sub->first;
     342          17 :     }
     343         312 :   }
     344          97 :   return absl::nullopt;
     345         114 : }
     346             : 
     347             : // A factory class for creating NewGrpcMuxImpl so it does not have to be
     348             : // hard-compiled into cluster_manager_impl.cc
     349             : class NewGrpcMuxFactory : public MuxFactory {
     350             : public:
     351          13 :   std::string name() const override { return "envoy.config_mux.new_grpc_mux_factory"; }
     352         135 :   void shutdownAll() override { return NewGrpcMuxImpl::shutdownAll(); }
     353             :   std::shared_ptr<GrpcMux>
     354             :   create(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
     355             :          Random::RandomGenerator&, Stats::Scope& scope,
     356             :          const envoy::config::core::v3::ApiConfigSource& ads_config,
     357             :          const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
     358             :          BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
     359           4 :          OptRef<XdsResourcesDelegate>, bool use_eds_resources_cache) override {
     360           4 :     GrpcMuxContext grpc_mux_context{
     361           4 :         /*async_client_=*/std::move(async_client),
     362           4 :         /*dispatcher_=*/dispatcher,
     363             :         /*service_method_=*/
     364           4 :         *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
     365           4 :             "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"),
     366           4 :         /*local_info_=*/local_info,
     367           4 :         /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
     368           4 :         /*scope_=*/scope,
     369           4 :         /*config_validators_=*/std::move(config_validators),
     370           4 :         /*xds_resources_delegate_=*/absl::nullopt,
     371           4 :         /*xds_config_tracker_=*/xds_config_tracker,
     372           4 :         /*backoff_strategy_=*/std::move(backoff_strategy),
     373           4 :         /*target_xds_authority_=*/"",
     374             :         /*eds_resources_cache_=*/
     375           4 :         (use_eds_resources_cache &&
     376           4 :          Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
     377           4 :             ? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
     378           4 :             : nullptr};
     379           4 :     return std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context);
     380           4 :   }
     381             : };
     382             : 
     383             : REGISTER_FACTORY(NewGrpcMuxFactory, MuxFactory);
     384             : 
     385             : } // namespace Config
     386             : } // namespace Envoy

Generated by: LCOV version 1.15