LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc/xds_mux - sotw_subscription_state.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 67 139 48.2 %
Date: 2024-01-05 06:35:25 Functions: 9 11 81.8 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h"
       2             : 
       3             : #include "source/common/config/utility.h"
       4             : #include "source/extensions/config_subscription/grpc/xds_source_id.h"
       5             : 
       6             : namespace Envoy {
       7             : namespace Config {
       8             : namespace XdsMux {
       9             : 
      10             : SotwSubscriptionState::SotwSubscriptionState(
      11             :     std::string type_url, UntypedConfigUpdateCallbacks& callbacks, Event::Dispatcher& dispatcher,
      12             :     OpaqueResourceDecoderSharedPtr resource_decoder, XdsConfigTrackerOptRef xds_config_tracker,
      13             :     XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority)
      14             :     : BaseSubscriptionState(std::move(type_url), callbacks, dispatcher, xds_config_tracker,
      15             :                             xds_resources_delegate, target_xds_authority),
      16          39 :       resource_decoder_(resource_decoder) {}
      17             : 
      18          39 : SotwSubscriptionState::~SotwSubscriptionState() = default;
      19             : 
      20             : void SotwSubscriptionState::updateSubscriptionInterest(
      21             :     const absl::flat_hash_set<std::string>& cur_added,
      22         100 :     const absl::flat_hash_set<std::string>& cur_removed) {
      23         100 :   for (const auto& a : cur_added) {
      24          30 :     names_tracked_.insert(a);
      25          30 :   }
      26         100 :   for (const auto& r : cur_removed) {
      27          30 :     names_tracked_.erase(r);
      28          30 :   }
      29         100 :   if (!cur_added.empty() || !cur_removed.empty()) {
      30          60 :     update_pending_ = true;
      31          60 :   }
      32         100 : }
      33             : 
      34             : // Not having sent any requests yet counts as an "update pending" since you're supposed to resend
      35             : // the entirety of your interest at the start of a stream, even if nothing has changed.
      36        1312 : bool SotwSubscriptionState::subscriptionUpdatePending() const {
      37        1312 :   return update_pending_ || dynamicContextChanged();
      38        1312 : }
      39             : 
      40           0 : void SotwSubscriptionState::markStreamFresh() {
      41           0 :   last_good_nonce_ = absl::nullopt;
      42           0 :   update_pending_ = true;
      43           0 :   clearDynamicContextChanged();
      44           0 : }
      45             : 
      46             : void SotwSubscriptionState::handleGoodResponse(
      47          88 :     const envoy::service::discovery::v3::DiscoveryResponse& message) {
      48          88 :   std::vector<DecodedResourcePtr> non_heartbeat_resources;
      49             : 
      50          88 :   {
      51          88 :     const auto scoped_update = ttl_.scopedTtlUpdate();
      52         142 :     for (const auto& any : message.resources()) {
      53         142 :       if (!any.Is<envoy::service::discovery::v3::Resource>() &&
      54         142 :           any.type_url() != message.type_url()) {
      55           0 :         throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
      56           0 :                                          "the message-wide type URL {} in DiscoveryResponse {}",
      57           0 :                                          any.type_url(), message.type_url(),
      58           0 :                                          message.DebugString()));
      59           0 :       }
      60             : 
      61         142 :       auto decoded_resource =
      62         142 :           DecodedResourceImpl::fromResource(*resource_decoder_, any, message.version_info());
      63         142 :       setResourceTtl(*decoded_resource);
      64         142 :       if (isHeartbeatResource(*decoded_resource, message.version_info())) {
      65           0 :         continue;
      66           0 :       }
      67         142 :       non_heartbeat_resources.push_back(std::move(decoded_resource));
      68         142 :     }
      69          88 :   }
      70             : 
      71             :   // TODO (dmitri-d) to eliminate decoding of resources twice consider expanding the interface to
      72             :   // support passing of decoded resources. This would also avoid a resource copy above.
      73          88 :   callbacks().onConfigUpdate(non_heartbeat_resources, message.version_info());
      74             :   // Now that we're passed onConfigUpdate() without an exception thrown, we know we're good.
      75          88 :   last_good_version_info_ = message.version_info();
      76          88 :   last_good_nonce_ = message.nonce();
      77             : 
      78             :   // Processing point when resources are successfully ingested.
      79          88 :   if (xds_config_tracker_.has_value()) {
      80           0 :     xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources);
      81           0 :   }
      82             : 
      83             :   // Send the resources to the xDS delegate, if configured.
      84          88 :   if (xds_resources_delegate_.has_value()) {
      85           0 :     XdsConfigSourceId source_id{target_xds_authority_, message.type_url()};
      86           0 :     std::vector<DecodedResourceRef> resource_refs;
      87           0 :     resource_refs.reserve(non_heartbeat_resources.size());
      88           0 :     for (const DecodedResourcePtr& r : non_heartbeat_resources) {
      89           0 :       resource_refs.emplace_back(*r);
      90           0 :     }
      91           0 :     xds_resources_delegate_->onConfigUpdated(source_id, resource_refs);
      92           0 :   }
      93             : 
      94          88 :   ENVOY_LOG(debug, "Config update for {} (version {}) accepted with {} resources", typeUrl(),
      95          88 :             message.version_info(), message.resources().size());
      96          88 : }
      97             : 
      98          39 : void SotwSubscriptionState::handleEstablishmentFailure() {
      99          39 :   BaseSubscriptionState::handleEstablishmentFailure();
     100             : 
     101          39 :   if (previously_fetched_data_ || !xds_resources_delegate_.has_value()) {
     102          39 :     return;
     103          39 :   }
     104             : 
     105           0 :   const XdsConfigSourceId source_id{target_xds_authority_, type_url_};
     106           0 :   TRY_ASSERT_MAIN_THREAD {
     107           0 :     std::vector<envoy::service::discovery::v3::Resource> resources =
     108           0 :         xds_resources_delegate_->getResources(source_id, names_tracked_);
     109             : 
     110           0 :     std::vector<DecodedResourcePtr> decoded_resources;
     111           0 :     const auto scoped_update = ttl_.scopedTtlUpdate();
     112           0 :     std::string version_info;
     113           0 :     size_t unaccounted = names_tracked_.size();
     114           0 :     if (names_tracked_.size() == 1 && names_tracked_.contains(Envoy::Config::Wildcard)) {
     115             :       // For wildcard requests, there are no expectations for the number of resources returned.
     116           0 :       unaccounted = 0;
     117           0 :     }
     118             : 
     119           0 :     for (const auto& resource : resources) {
     120           0 :       if (version_info.empty()) {
     121           0 :         version_info = resource.version();
     122           0 :       } else {
     123           0 :         ASSERT(version_info == resource.version());
     124           0 :       }
     125             : 
     126           0 :       TRY_ASSERT_MAIN_THREAD {
     127           0 :         auto decoded_resource = DecodedResourceImpl::fromResource(*resource_decoder_, resource);
     128           0 :         setResourceTtl(*decoded_resource);
     129           0 :         if (unaccounted > 0) {
     130           0 :           --unaccounted;
     131           0 :         }
     132           0 :         decoded_resources.emplace_back(std::move(decoded_resource));
     133           0 :       }
     134           0 :       END_TRY
     135           0 :       catch (const EnvoyException& e) {
     136           0 :         xds_resources_delegate_->onResourceLoadFailed(source_id, resource.name(), e);
     137           0 :       }
     138           0 :     }
     139             : 
     140           0 :     callbacks().onConfigUpdate(decoded_resources, version_info);
     141           0 :     previously_fetched_data_ = true;
     142           0 :     if (unaccounted == 0 && !version_info.empty()) {
     143             :       // All the requested resources were found and validated from the xDS delegate, so set the last
     144             :       // known good version.
     145           0 :       last_good_version_info_ = version_info;
     146           0 :     }
     147           0 :   }
     148           0 :   END_TRY
     149           0 :   catch (const EnvoyException& e) {
     150             :     // TODO(abeyad): do something more than just logging the error?
     151           0 :     ENVOY_LOG(warn, "xDS delegate failed onEstablishmentFailure() for {}: {}", source_id.toKey(),
     152           0 :               e.what());
     153           0 :   }
     154           0 : }
     155             : 
     156             : std::unique_ptr<envoy::service::discovery::v3::DiscoveryRequest>
     157         140 : SotwSubscriptionState::getNextRequestInternal() {
     158         140 :   auto request = std::make_unique<envoy::service::discovery::v3::DiscoveryRequest>();
     159         140 :   request->set_type_url(typeUrl());
     160         140 :   std::copy(names_tracked_.begin(), names_tracked_.end(),
     161         140 :             Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names()));
     162         140 :   if (last_good_version_info_.has_value()) {
     163         101 :     request->set_version_info(last_good_version_info_.value());
     164         101 :   }
     165             :   // Default response_nonce to the last known good one. If we are being called by
     166             :   // getNextRequestWithAck(), this value will be overwritten.
     167         140 :   if (last_good_nonce_.has_value()) {
     168         101 :     request->set_response_nonce(last_good_nonce_.value());
     169         101 :   }
     170             : 
     171         140 :   update_pending_ = false;
     172         140 :   return request;
     173         140 : }
     174             : 
     175           0 : void SotwSubscriptionState::ttlExpiryCallback(const std::vector<std::string>& expired) {
     176           0 :   Protobuf::RepeatedPtrField<std::string> removed_resources;
     177           0 :   for (const auto& resource : expired) {
     178           0 :     removed_resources.Add(std::string(resource));
     179           0 :   }
     180           0 :   callbacks().onConfigUpdate({}, removed_resources, "");
     181           0 : }
     182             : 
     183         142 : void SotwSubscriptionState::setResourceTtl(const DecodedResourceImpl& decoded_resource) {
     184         142 :   if (decoded_resource.ttl()) {
     185           0 :     ttl_.add(std::chrono::milliseconds(*decoded_resource.ttl()), decoded_resource.name());
     186         142 :   } else {
     187         142 :     ttl_.clear(decoded_resource.name());
     188         142 :   }
     189         142 : }
     190             : 
     191             : bool SotwSubscriptionState::isHeartbeatResource(const DecodedResource& resource,
     192         142 :                                                 const std::string& version) {
     193         142 :   return !resource.hasResource() && last_good_version_info_.has_value() &&
     194         142 :          version == last_good_version_info_.value();
     195         142 : }
     196             : 
     197             : } // namespace XdsMux
     198             : } // namespace Config
     199             : } // namespace Envoy

Generated by: LCOV version 1.15