LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc/xds_mux - delta_subscription_state.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 156 238 65.5 %
Date: 2024-01-05 06:35:25 Functions: 11 13 84.6 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h"
       2             : 
       3             : #include "envoy/event/dispatcher.h"
       4             : #include "envoy/service/discovery/v3/discovery.pb.h"
       5             : 
       6             : #include "source/common/common/hash.h"
       7             : #include "source/common/config/utility.h"
       8             : #include "source/common/runtime/runtime_features.h"
       9             : 
      10             : namespace Envoy {
      11             : namespace Config {
      12             : namespace XdsMux {
      13             : 
      14             : DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
      15             :                                                UntypedConfigUpdateCallbacks& watch_map,
      16             :                                                Event::Dispatcher& dispatcher,
      17             :                                                XdsConfigTrackerOptRef xds_config_tracker)
      18             :     : BaseSubscriptionState(std::move(type_url), watch_map, dispatcher, xds_config_tracker),
      19             :       // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
      20             :       // empty resources as updates.
      21          15 :       supports_heartbeats_(type_url_ != "envoy.config.route.v3.VirtualHost") {}
      22             : 
      23          15 : DeltaSubscriptionState::~DeltaSubscriptionState() = default;
      24             : 
      25             : void DeltaSubscriptionState::updateSubscriptionInterest(
      26             :     const absl::flat_hash_set<std::string>& cur_added,
      27          34 :     const absl::flat_hash_set<std::string>& cur_removed) {
      28          34 :   for (const auto& a : cur_added) {
      29           9 :     if (in_initial_legacy_wildcard_ && a != Wildcard) {
      30           7 :       in_initial_legacy_wildcard_ = false;
      31           7 :     }
      32             :     // If the requested resource existed as a wildcard resource,
      33             :     // transition it to requested. Otherwise mark it as a resource
      34             :     // waiting for the server to receive the version.
      35           9 :     if (auto it = wildcard_resource_state_.find(a); it != wildcard_resource_state_.end()) {
      36           0 :       requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
      37           0 :       wildcard_resource_state_.erase(it);
      38           9 :     } else if (it = ambiguous_resource_state_.find(a); it != ambiguous_resource_state_.end()) {
      39           0 :       requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
      40           0 :       ambiguous_resource_state_.erase(it);
      41           9 :     } else {
      42           9 :       requested_resource_state_.insert_or_assign(a, ResourceState::waitingForServer());
      43           9 :     }
      44           9 :     ASSERT(requested_resource_state_.contains(a));
      45           9 :     ASSERT(!wildcard_resource_state_.contains(a));
      46           9 :     ASSERT(!ambiguous_resource_state_.contains(a));
      47             :     // If interest in a resource is removed-then-added (all before a discovery request
      48             :     // can be sent), we must treat it as a "new" addition: our user may have forgotten its
      49             :     // copy of the resource after instructing us to remove it, and need to be reminded of it.
      50           9 :     names_removed_.erase(a);
      51           9 :     names_added_.insert(a);
      52           9 :   }
      53          34 :   for (const auto& r : cur_removed) {
      54           9 :     auto actually_erased = false;
      55             :     // The resource we have lost the interest in could also come from our wildcard subscription. We
      56             :     // just don't know it at this point. Instead of removing it outright, mark the resource as not
      57             :     // interesting to us any more and the server will send us an update. If we don't have a wildcard
      58             :     // subscription then there is no ambiguity and just drop the resource.
      59           9 :     if (requested_resource_state_.contains(Wildcard)) {
      60           0 :       if (auto it = requested_resource_state_.find(r); it != requested_resource_state_.end()) {
      61             :         // Wildcard resources always have a version. If our requested resource has no version, it
      62             :         // won't be a wildcard resource then. If r is Wildcard itself, then it never has a version
      63             :         // attached to it, so it will not be moved to ambiguous category.
      64           0 :         if (!it->second.isWaitingForServer()) {
      65           0 :           ambiguous_resource_state_.insert_or_assign(it->first, it->second.version());
      66           0 :         }
      67           0 :         requested_resource_state_.erase(it);
      68           0 :         actually_erased = true;
      69           0 :       }
      70           9 :     } else {
      71           9 :       actually_erased = (requested_resource_state_.erase(r) > 0);
      72           9 :     }
      73           9 :     ASSERT(!requested_resource_state_.contains(r));
      74             :     // Ideally, when interest in a resource is added-then-removed in between requests,
      75             :     // we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
      76             :     // in the request. However, the removed-then-added case *does* need to go in the request,
      77             :     // and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
      78             :     // add-remove (because "remove-add" has to be treated as equivalent to just "add").
      79           9 :     names_added_.erase(r);
      80           9 :     if (actually_erased) {
      81           9 :       names_removed_.insert(r);
      82           9 :       in_initial_legacy_wildcard_ = false;
      83           9 :     }
      84           9 :   }
      85             :   // If we unsubscribe from wildcard resource, drop all the resources that came from wildcard from
      86             :   // cache.
      87          34 :   if (cur_removed.contains(Wildcard)) {
      88           0 :     wildcard_resource_state_.clear();
      89           0 :     ambiguous_resource_state_.clear();
      90           0 :   }
      91          34 : }
      92             : 
      93             : // Not having sent any requests yet counts as an "update pending" since you're supposed to resend
      94             : // the entirety of your interest at the start of a stream, even if nothing has changed.
      95         346 : bool DeltaSubscriptionState::subscriptionUpdatePending() const {
      96         346 :   if (!names_added_.empty() || !names_removed_.empty()) {
      97          36 :     return true;
      98          36 :   }
      99             :   // At this point, we have no new resources to subscribe to or any
     100             :   // resources to unsubscribe from.
     101         310 :   if (!any_request_sent_yet_in_current_stream_) {
     102             :     // If we haven't sent anything on the current stream, but we are actually interested in some
     103             :     // resource then we obviously need to let the server know about those.
     104          16 :     if (!requested_resource_state_.empty()) {
     105           0 :       return true;
     106           0 :     }
     107             :     // So there are no new names and we are interested in nothing. This may either mean that we want
     108             :     // the legacy wildcard subscription to kick in or we actually unsubscribed from everything. If
     109             :     // the latter is true, then we should not be sending any requests. In such case the initial
     110             :     // wildcard mode will be false. Otherwise it means that the legacy wildcard request should be
     111             :     // sent.
     112          16 :     return in_initial_legacy_wildcard_;
     113          16 :   }
     114             : 
     115             :   // At this point, we have no changes in subscription resources and this isn't a first request in
     116             :   // the stream, so even if there are no resources we are interested in, we can send the request,
     117             :   // because even if it's empty, it won't be interpreted as legacy wildcard subscription, which can
     118             :   // only for the first request in the stream. So sending an empty request at this point should be
     119             :   // harmless.
     120         294 :   return dynamicContextChanged();
     121         310 : }
     122             : 
     123             : bool DeltaSubscriptionState::isHeartbeatResource(
     124          22 :     const envoy::service::discovery::v3::Resource& resource) const {
     125          22 :   if (!supports_heartbeats_) {
     126           0 :     return false;
     127           0 :   }
     128          22 :   if (resource.has_resource()) {
     129          22 :     return false;
     130          22 :   }
     131             : 
     132           0 :   if (const auto maybe_resource = getRequestedResourceState(resource.name());
     133           0 :       maybe_resource.has_value()) {
     134           0 :     return !maybe_resource->isWaitingForServer() && resource.version() == maybe_resource->version();
     135           0 :   }
     136             : 
     137           0 :   if (const auto itr = wildcard_resource_state_.find(resource.name());
     138           0 :       itr != wildcard_resource_state_.end()) {
     139           0 :     return resource.version() == itr->second;
     140           0 :   }
     141             : 
     142           0 :   if (const auto itr = ambiguous_resource_state_.find(resource.name());
     143           0 :       itr != ambiguous_resource_state_.end()) {
     144             :     // In theory we should move the ambiguous resource to wildcard, because probably we shouldn't be
     145             :     // getting heartbeat responses about resources that we are not interested in, but the server
     146             :     // could have sent this heartbeat before it learned about our lack of interest in the resource.
     147           0 :     return resource.version() == itr->second;
     148           0 :   }
     149             : 
     150           0 :   return false;
     151           0 : }
     152             : 
     153             : void DeltaSubscriptionState::handleGoodResponse(
     154          23 :     const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
     155          23 :   absl::flat_hash_set<std::string> names_added_removed;
     156          23 :   Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> non_heartbeat_resources;
     157          23 :   for (const auto& resource : message.resources()) {
     158          22 :     if (!names_added_removed.insert(resource.name()).second) {
     159           0 :       throw EnvoyException(
     160           0 :           fmt::format("duplicate name {} found among added/updated resources", resource.name()));
     161           0 :     }
     162          22 :     if (isHeartbeatResource(resource)) {
     163           0 :       continue;
     164           0 :     }
     165             :     // TODO (dmitri-d) consider changing onConfigUpdate callback interface to avoid copying of
     166             :     // resources
     167          22 :     non_heartbeat_resources.Add()->CopyFrom(resource);
     168             :     // DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
     169          22 :     if (!resource.has_resource() && resource.aliases_size() > 0) {
     170           0 :       continue;
     171           0 :     }
     172          22 :     if (message.type_url() != resource.resource().type_url()) {
     173           0 :       throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
     174           0 :                                        "the message-wide type URL {} in DeltaDiscoveryResponse {}",
     175           0 :                                        resource.resource().type_url(), message.type_url(),
     176           0 :                                        message.DebugString()));
     177           0 :     }
     178          22 :   }
     179          23 :   for (const auto& name : message.removed_resources()) {
     180           1 :     if (!names_added_removed.insert(name).second) {
     181           0 :       throw EnvoyException(
     182           0 :           fmt::format("duplicate name {} found in the union of added+removed resources", name));
     183           0 :     }
     184           1 :   }
     185             : 
     186          23 :   callbacks().onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
     187          23 :                              message.system_version_info());
     188             : 
     189             :   // Processing point when resources are successfully ingested.
     190          23 :   if (xds_config_tracker_.has_value()) {
     191           0 :     xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources,
     192           0 :                                           message.removed_resources());
     193           0 :   }
     194             : 
     195          23 :   {
     196          23 :     const auto scoped_update = ttl_.scopedTtlUpdate();
     197          23 :     if (requested_resource_state_.contains(Wildcard)) {
     198          11 :       for (const auto& resource : message.resources()) {
     199          10 :         addResourceStateFromServer(resource);
     200          10 :       }
     201          12 :     } else {
     202             :       // We are not subscribed to wildcard, so we only take resources that we explicitly requested
     203             :       // and ignore the others.
     204             :       // NOTE: This is not gonna work for xdstp resources with glob resource matching.
     205          12 :       for (const auto& resource : message.resources()) {
     206          12 :         if (requested_resource_state_.contains(resource.name())) {
     207          12 :           addResourceStateFromServer(resource);
     208          12 :         }
     209          12 :       }
     210          12 :     }
     211          23 :   }
     212             : 
     213             :   // If a resource is gone, there is no longer a meaningful version for it that makes sense to
     214             :   // provide to the server upon stream reconnect: either it will continue to not exist, in which
     215             :   // case saying nothing is fine, or the server will bring back something new, which we should
     216             :   // receive regardless (which is the logic that not specifying a version will get you).
     217             :   //
     218             :   // So, leave the version map entry present but blank if we are still interested in the resource.
     219             :   // It will be left out of initial_resource_versions messages, but will remind us to explicitly
     220             :   // tell the server "I'm cancelling my subscription" when we lose interest. In case of resources
     221             :   // received as a part of the wildcard subscription or resources we already lost interest in, we
     222             :   // just drop them.
     223          23 :   for (const auto& resource_name : message.removed_resources()) {
     224           1 :     if (auto maybe_resource = getRequestedResourceState(resource_name);
     225           1 :         maybe_resource.has_value()) {
     226           0 :       maybe_resource->setAsWaitingForServer();
     227           1 :     } else if (const auto erased_count = ambiguous_resource_state_.erase(resource_name);
     228           1 :                erased_count == 0) {
     229           1 :       wildcard_resource_state_.erase(resource_name);
     230           1 :     }
     231           1 :   }
     232          23 :   ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", typeUrl(),
     233          23 :             message.resources().size(), message.removed_resources().size());
     234          23 : }
     235             : 
     236             : std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryRequest>
     237          40 : DeltaSubscriptionState::getNextRequestInternal() {
     238          40 :   auto request = std::make_unique<envoy::service::discovery::v3::DeltaDiscoveryRequest>();
     239          40 :   request->set_type_url(typeUrl());
     240          40 :   if (!any_request_sent_yet_in_current_stream_) {
     241          15 :     any_request_sent_yet_in_current_stream_ = true;
     242          15 :     const bool is_legacy_wildcard = isInitialRequestForLegacyWildcard();
     243             :     // initial_resource_versions "must be populated for first request in a stream".
     244             :     // Also, since this might be a new server, we must explicitly state *all* of our subscription
     245             :     // interest.
     246          15 :     for (auto const& [resource_name, resource_state] : requested_resource_state_) {
     247             :       // Populate initial_resource_versions with the resource versions we currently have.
     248             :       // Resources we are interested in, but are still waiting to get any version of from the
     249             :       // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
     250          15 :       if (!resource_state.isWaitingForServer()) {
     251           0 :         (*request->mutable_initial_resource_versions())[resource_name] = resource_state.version();
     252           0 :       }
     253             :       // We are going over a list of resources that we are interested in, so add them to
     254             :       // resource_names_subscribe.
     255          15 :       names_added_.insert(resource_name);
     256          15 :     }
     257          15 :     for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
     258           0 :       (*request->mutable_initial_resource_versions())[resource_name] = resource_version;
     259           0 :     }
     260          15 :     for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
     261           0 :       (*request->mutable_initial_resource_versions())[resource_name] = resource_version;
     262           0 :     }
     263             :     // If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
     264             :     // empty.
     265          15 :     if (is_legacy_wildcard) {
     266           8 :       names_added_.clear();
     267           8 :     }
     268          15 :     names_removed_.clear();
     269          15 :   }
     270             : 
     271          40 :   std::copy(names_added_.begin(), names_added_.end(),
     272          40 :             Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_subscribe()));
     273          40 :   std::copy(names_removed_.begin(), names_removed_.end(),
     274          40 :             Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_unsubscribe()));
     275          40 :   names_added_.clear();
     276          40 :   names_removed_.clear();
     277             : 
     278          40 :   return request;
     279          40 : }
     280             : 
     281          15 : bool DeltaSubscriptionState::isInitialRequestForLegacyWildcard() {
     282          15 :   if (in_initial_legacy_wildcard_) {
     283           8 :     requested_resource_state_.insert_or_assign(Wildcard, ResourceState::waitingForServer());
     284           8 :     ASSERT(requested_resource_state_.contains(Wildcard));
     285           8 :     ASSERT(!wildcard_resource_state_.contains(Wildcard));
     286           8 :     ASSERT(!ambiguous_resource_state_.contains(Wildcard));
     287           8 :     return true;
     288           8 :   }
     289             : 
     290             :   // If we are here, this means that we lost our initial wildcard mode, because we subscribed to
     291             :   // something in the past. We could still be in the situation now that all we are subscribed to now
     292             :   // is wildcard resource, so in such case try to send a legacy wildcard subscription request
     293             :   // anyway. For this to happen, two conditions need to apply:
     294             :   //
     295             :   // 1. No change in interest.
     296             :   // 2. The only requested resource is Wildcard resource.
     297             :   //
     298             :   // The invariant of the code here is that this code is executed only when
     299             :   // subscriptionUpdatePending actually returns true, which in our case can only happen if the
     300             :   // requested resources state_ isn't empty.
     301           7 :   ASSERT(!requested_resource_state_.empty());
     302             : 
     303             :   // If our subscription interest didn't change then the first condition for using legacy wildcard
     304             :   // subscription is met.
     305           7 :   if (!names_added_.empty() || !names_removed_.empty()) {
     306           7 :     return false;
     307           7 :   }
     308             :   // If we requested only a wildcard resource then the second condition for using legacy wildcard
     309             :   // condition is met.
     310           0 :   return requested_resource_state_.size() == 1 &&
     311           0 :          requested_resource_state_.begin()->first == Wildcard;
     312           7 : }
     313             : 
     314             : void DeltaSubscriptionState::addResourceStateFromServer(
     315          22 :     const envoy::service::discovery::v3::Resource& resource) {
     316          22 :   setResourceTtl(resource);
     317             : 
     318          22 :   if (auto maybe_resource = getRequestedResourceState(resource.name());
     319          22 :       maybe_resource.has_value()) {
     320             :     // It is a resource that we requested.
     321          12 :     maybe_resource->setVersion(resource.version());
     322          12 :     ASSERT(requested_resource_state_.contains(resource.name()));
     323          12 :     ASSERT(!wildcard_resource_state_.contains(resource.name()));
     324          12 :     ASSERT(!ambiguous_resource_state_.contains(resource.name()));
     325          12 :   } else {
     326             :     // It is a resource that is a part of our wildcard request.
     327          10 :     wildcard_resource_state_.insert_or_assign(resource.name(), resource.version());
     328             :     // The resource could be ambiguous before, but now the ambiguity
     329             :     // is resolved.
     330          10 :     ambiguous_resource_state_.erase(resource.name());
     331          10 :     ASSERT(!requested_resource_state_.contains(resource.name()));
     332          10 :     ASSERT(wildcard_resource_state_.contains(resource.name()));
     333          10 :     ASSERT(!ambiguous_resource_state_.contains(resource.name()));
     334          10 :   }
     335          22 : }
     336             : 
     337             : void DeltaSubscriptionState::setResourceTtl(
     338          22 :     const envoy::service::discovery::v3::Resource& resource) {
     339          22 :   if (resource.has_ttl()) {
     340           0 :     ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
     341           0 :              resource.name());
     342          22 :   } else {
     343          22 :     ttl_.clear(resource.name());
     344          22 :   }
     345          22 : }
     346             : 
     347           0 : void DeltaSubscriptionState::ttlExpiryCallback(const std::vector<std::string>& expired) {
     348           0 :   Protobuf::RepeatedPtrField<std::string> removed_resources;
     349           0 :   for (const auto& resource : expired) {
     350           0 :     if (auto maybe_resource = getRequestedResourceState(resource); maybe_resource.has_value()) {
     351           0 :       maybe_resource->setAsWaitingForServer();
     352           0 :       removed_resources.Add(std::string(resource));
     353           0 :     } else if (const auto erased_count = wildcard_resource_state_.erase(resource) +
     354           0 :                                          ambiguous_resource_state_.erase(resource);
     355           0 :                erased_count > 0) {
     356           0 :       removed_resources.Add(std::string(resource));
     357           0 :     }
     358           0 :   }
     359           0 :   callbacks().onConfigUpdate({}, removed_resources, "");
     360           0 : }
     361             : 
     362             : OptRef<DeltaSubscriptionState::ResourceState>
     363          23 : DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) {
     364          23 :   auto itr = requested_resource_state_.find(resource_name);
     365          23 :   if (itr == requested_resource_state_.end()) {
     366          11 :     return {};
     367          11 :   }
     368          12 :   return {itr->second};
     369          23 : }
     370             : 
     371             : OptRef<const DeltaSubscriptionState::ResourceState>
     372           0 : DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) const {
     373           0 :   auto itr = requested_resource_state_.find(resource_name);
     374           0 :   if (itr == requested_resource_state_.end()) {
     375           0 :     return {};
     376           0 :   }
     377           0 :   return {itr->second};
     378           0 : }
     379             : 
     380             : } // namespace XdsMux
     381             : } // namespace Config
     382             : } // namespace Envoy

Generated by: LCOV version 1.15