LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc - delta_subscription_state.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 171 264 64.8 %
Date: 2024-01-05 06:35:25 Functions: 12 15 80.0 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/grpc/delta_subscription_state.h"
       2             : 
       3             : #include "envoy/event/dispatcher.h"
       4             : #include "envoy/service/discovery/v3/discovery.pb.h"
       5             : 
       6             : #include "source/common/common/assert.h"
       7             : #include "source/common/common/hash.h"
       8             : #include "source/common/config/utility.h"
       9             : #include "source/common/runtime/runtime_features.h"
      10             : 
      11             : namespace Envoy {
      12             : namespace Config {
      13             : 
      14             : DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
      15             :                                                UntypedConfigUpdateCallbacks& watch_map,
      16             :                                                const LocalInfo::LocalInfo& local_info,
      17             :                                                Event::Dispatcher& dispatcher,
      18             :                                                XdsConfigTrackerOptRef xds_config_tracker)
      19             :     // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
      20             :     // empty resources as updates.
      21             :     : supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"),
      22             :       ttl_(
      23           0 :           [this](const auto& expired) {
      24           0 :             Protobuf::RepeatedPtrField<std::string> removed_resources;
      25           0 :             for (const auto& resource : expired) {
      26           0 :               if (auto maybe_resource = getRequestedResourceState(resource);
      27           0 :                   maybe_resource.has_value()) {
      28           0 :                 maybe_resource->setAsWaitingForServer();
      29           0 :                 removed_resources.Add(std::string(resource));
      30           0 :               } else if (const auto erased_count = wildcard_resource_state_.erase(resource) +
      31           0 :                                                    ambiguous_resource_state_.erase(resource);
      32           0 :                          erased_count > 0) {
      33           0 :                 removed_resources.Add(std::string(resource));
      34           0 :               }
      35           0 :             }
      36             : 
      37           0 :             watch_map_.onConfigUpdate({}, removed_resources, "");
      38           0 :           },
      39             :           dispatcher, dispatcher.timeSource()),
      40             :       type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info),
      41          15 :       xds_config_tracker_(xds_config_tracker) {}
      42             : 
      43             : void DeltaSubscriptionState::updateSubscriptionInterest(
      44             :     const absl::flat_hash_set<std::string>& cur_added,
      45          34 :     const absl::flat_hash_set<std::string>& cur_removed) {
      46          34 :   for (const auto& a : cur_added) {
      47           9 :     if (in_initial_legacy_wildcard_ && a != Wildcard) {
      48           7 :       in_initial_legacy_wildcard_ = false;
      49           7 :     }
      50             :     // If the requested resource existed as a wildcard resource,
      51             :     // transition it to requested. Otherwise mark it as a resource
      52             :     // waiting for the server to receive the version.
      53           9 :     if (auto it = wildcard_resource_state_.find(a); it != wildcard_resource_state_.end()) {
      54           0 :       requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
      55           0 :       wildcard_resource_state_.erase(it);
      56           9 :     } else if (it = ambiguous_resource_state_.find(a); it != ambiguous_resource_state_.end()) {
      57           0 :       requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
      58           0 :       ambiguous_resource_state_.erase(it);
      59           9 :     } else {
      60           9 :       requested_resource_state_.insert_or_assign(a, ResourceState::waitingForServer());
      61           9 :     }
      62           9 :     ASSERT(requested_resource_state_.contains(a));
      63           9 :     ASSERT(!wildcard_resource_state_.contains(a));
      64           9 :     ASSERT(!ambiguous_resource_state_.contains(a));
      65             :     // If interest in a resource is removed-then-added (all before a discovery request
      66             :     // can be sent), we must treat it as a "new" addition: our user may have forgotten its
      67             :     // copy of the resource after instructing us to remove it, and need to be reminded of it.
      68           9 :     names_removed_.erase(a);
      69           9 :     names_added_.insert(a);
      70           9 :   }
      71          34 :   for (const auto& r : cur_removed) {
      72           9 :     auto actually_erased = false;
      73             :     // The resource we have lost the interest in could also come from our wildcard subscription. We
      74             :     // just don't know it at this point. Instead of removing it outright, mark the resource as not
      75             :     // interesting to us any more and the server will send us an update. If we don't have a wildcard
      76             :     // subscription then there is no ambiguity and just drop the resource.
      77           9 :     if (requested_resource_state_.contains(Wildcard)) {
      78           0 :       if (auto it = requested_resource_state_.find(r); it != requested_resource_state_.end()) {
      79             :         // Wildcard resources always have a version. If our requested resource has no version, it
      80             :         // won't be a wildcard resource then. If r is Wildcard itself, then it never has a version
      81             :         // attached to it, so it will not be moved to ambiguous category.
      82           0 :         if (!it->second.isWaitingForServer()) {
      83           0 :           ambiguous_resource_state_.insert_or_assign(it->first, it->second.version());
      84           0 :         }
      85           0 :         requested_resource_state_.erase(it);
      86           0 :         actually_erased = true;
      87           0 :       }
      88           9 :     } else {
      89           9 :       actually_erased = (requested_resource_state_.erase(r) > 0);
      90           9 :     }
      91           9 :     ASSERT(!requested_resource_state_.contains(r));
      92             :     // Ideally, when interest in a resource is added-then-removed in between requests,
      93             :     // we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
      94             :     // in the request. However, the removed-then-added case *does* need to go in the request,
      95             :     // and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
      96             :     // add-remove (because "remove-add" has to be treated as equivalent to just "add").
      97           9 :     names_added_.erase(r);
      98           9 :     if (actually_erased) {
      99           9 :       names_removed_.insert(r);
     100           9 :       in_initial_legacy_wildcard_ = false;
     101           9 :     }
     102           9 :   }
     103             :   // If we unsubscribe from wildcard resource, drop all the resources that came from wildcard from
     104             :   // cache. Also drop the ambiguous resources - we aren't interested in those, but we didn't know if
     105             :   // those came from wildcard subscription or not, but now it's not important any more.
     106          34 :   if (cur_removed.contains(Wildcard)) {
     107           0 :     wildcard_resource_state_.clear();
     108           0 :     ambiguous_resource_state_.clear();
     109           0 :   }
     110          34 : }
     111             : 
     112             : // Not having sent any requests yet counts as an "update pending" since you're supposed to resend
     113             : // the entirety of your interest at the start of a stream, even if nothing has changed.
     114         346 : bool DeltaSubscriptionState::subscriptionUpdatePending() const {
     115         346 :   if (!names_added_.empty() || !names_removed_.empty()) {
     116          36 :     return true;
     117          36 :   }
     118             :   // At this point, we have no new resources to subscribe to or any
     119             :   // resources to unsubscribe from.
     120         310 :   if (!any_request_sent_yet_in_current_stream_) {
     121             :     // If we haven't sent anything on the current stream, but we are actually interested in some
     122             :     // resource then we obviously need to let the server know about those.
     123          16 :     if (!requested_resource_state_.empty()) {
     124           0 :       return true;
     125           0 :     }
     126             :     // So there are no new names and we are interested in nothing. This may either mean that we want
     127             :     // the legacy wildcard subscription to kick in or we actually unsubscribed from everything. If
     128             :     // the latter is true, then we should not be sending any requests. In such case the initial
     129             :     // wildcard mode will be false. Otherwise it means that the legacy wildcard request should be
     130             :     // sent.
     131          16 :     return in_initial_legacy_wildcard_;
     132          16 :   }
     133             : 
     134             :   // At this point, we have no changes in subscription resources and this isn't a first request in
     135             :   // the stream, so even if there are no resources we are interested in, we can send the request,
     136             :   // because even if it's empty, it won't be interpreted as legacy wildcard subscription, which can
     137             :   // only for the first request in the stream. So sending an empty request at this point should be
     138             :   // harmless.
     139         294 :   return must_send_discovery_request_;
     140         310 : }
     141             : 
     142             : UpdateAck DeltaSubscriptionState::handleResponse(
     143          23 :     const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
     144             :   // We *always* copy the response's nonce into the next request, even if we're going to make that
     145             :   // request a NACK by setting error_detail.
     146          23 :   UpdateAck ack(message.nonce(), type_url_);
     147          23 :   TRY_ASSERT_MAIN_THREAD { handleGoodResponse(message); }
     148          23 :   END_TRY
     149          23 :   catch (const EnvoyException& e) {
     150           0 :     handleBadResponse(e, ack);
     151           0 :   }
     152          23 :   return ack;
     153          23 : }
     154             : 
     155             : bool DeltaSubscriptionState::isHeartbeatResponse(
     156          22 :     const envoy::service::discovery::v3::Resource& resource) const {
     157          22 :   if (!supports_heartbeats_) {
     158           0 :     return false;
     159           0 :   }
     160          22 :   if (resource.has_resource()) {
     161          22 :     return false;
     162          22 :   }
     163             : 
     164           0 :   if (const auto maybe_resource = getRequestedResourceState(resource.name());
     165           0 :       maybe_resource.has_value()) {
     166           0 :     return !maybe_resource->isWaitingForServer() && resource.version() == maybe_resource->version();
     167           0 :   }
     168             : 
     169           0 :   if (const auto itr = wildcard_resource_state_.find(resource.name());
     170           0 :       itr != wildcard_resource_state_.end()) {
     171           0 :     return resource.version() == itr->second;
     172           0 :   }
     173             : 
     174           0 :   if (const auto itr = ambiguous_resource_state_.find(resource.name());
     175           0 :       itr != ambiguous_resource_state_.end()) {
     176             :     // In theory we should move the ambiguous resource to wildcard, because probably we shouldn't be
     177             :     // getting heartbeat responses about resources that we are not interested in, but the server
     178             :     // could have sent this heartbeat before it learned about our lack of interest in the resource.
     179           0 :     return resource.version() == itr->second;
     180           0 :   }
     181             : 
     182           0 :   return false;
     183           0 : }
     184             : 
     185             : void DeltaSubscriptionState::handleGoodResponse(
     186          23 :     const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
     187          23 :   absl::flat_hash_set<std::string> names_added_removed;
     188          23 :   Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> non_heartbeat_resources;
     189          23 :   for (const auto& resource : message.resources()) {
     190          22 :     if (!names_added_removed.insert(resource.name()).second) {
     191           0 :       throw EnvoyException(
     192           0 :           fmt::format("duplicate name {} found among added/updated resources", resource.name()));
     193           0 :     }
     194          22 :     if (isHeartbeatResponse(resource)) {
     195           0 :       continue;
     196           0 :     }
     197          22 :     non_heartbeat_resources.Add()->CopyFrom(resource);
     198             :     // DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
     199          22 :     if (!resource.has_resource() && resource.aliases_size() > 0) {
     200           0 :       continue;
     201           0 :     }
     202          22 :     if (message.type_url() != resource.resource().type_url()) {
     203           0 :       throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
     204           0 :                                        "the message-wide type URL {} in DeltaDiscoveryResponse {}",
     205           0 :                                        resource.resource().type_url(), message.type_url(),
     206           0 :                                        message.DebugString()));
     207           0 :     }
     208          22 :   }
     209          23 :   for (const auto& name : message.removed_resources()) {
     210           1 :     if (!names_added_removed.insert(name).second) {
     211           0 :       throw EnvoyException(
     212           0 :           fmt::format("duplicate name {} found in the union of added+removed resources", name));
     213           0 :     }
     214           1 :   }
     215             : 
     216          23 :   watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
     217          23 :                             message.system_version_info());
     218             : 
     219             :   // Processing point when resources are successfully ingested.
     220          23 :   if (xds_config_tracker_.has_value()) {
     221           0 :     xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources,
     222           0 :                                           message.removed_resources());
     223           0 :   }
     224             : 
     225          23 :   {
     226          23 :     const auto scoped_update = ttl_.scopedTtlUpdate();
     227          23 :     if (requested_resource_state_.contains(Wildcard)) {
     228          11 :       for (const auto& resource : message.resources()) {
     229          10 :         addResourceStateFromServer(resource);
     230          10 :       }
     231          12 :     } else {
     232             :       // We are not subscribed to wildcard, so we only take resources that we explicitly requested
     233             :       // and ignore the others.
     234          12 :       for (const auto& resource : message.resources()) {
     235          12 :         if (requested_resource_state_.contains(resource.name())) {
     236          12 :           addResourceStateFromServer(resource);
     237          12 :         }
     238          12 :       }
     239          12 :     }
     240          23 :   }
     241             : 
     242             :   // If a resource is gone, there is no longer a meaningful version for it that makes sense to
     243             :   // provide to the server upon stream reconnect: either it will continue to not exist, in which
     244             :   // case saying nothing is fine, or the server will bring back something new, which we should
     245             :   // receive regardless (which is the logic that not specifying a version will get you).
     246             :   //
     247             :   // So, leave the version map entry present but blank if we are still interested in the resource.
     248             :   // It will be left out of initial_resource_versions messages, but will remind us to explicitly
     249             :   // tell the server "I'm cancelling my subscription" when we lose interest. In case of resources
     250             :   // received as a part of the wildcard subscription or resources we already lost interest in, we
     251             :   // just drop them.
     252          23 :   for (const auto& resource_name : message.removed_resources()) {
     253           1 :     if (auto maybe_resource = getRequestedResourceState(resource_name);
     254           1 :         maybe_resource.has_value()) {
     255           0 :       maybe_resource->setAsWaitingForServer();
     256           1 :     } else if (const auto erased_count = ambiguous_resource_state_.erase(resource_name);
     257           1 :                erased_count == 0) {
     258           1 :       wildcard_resource_state_.erase(resource_name);
     259           1 :     }
     260           1 :   }
     261          23 :   ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_,
     262          23 :             message.resources().size(), message.removed_resources().size());
     263          23 : }
     264             : 
     265           0 : void DeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAck& ack) {
     266             :   // Note that error_detail being set is what indicates that a DeltaDiscoveryRequest is a NACK.
     267           0 :   ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
     268           0 :   ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
     269           0 :   ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what());
     270           0 :   watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
     271           0 : }
     272             : 
     273          15 : void DeltaSubscriptionState::handleEstablishmentFailure() {
     274          15 :   watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
     275          15 :                                   nullptr);
     276          15 : }
     277             : 
     278             : envoy::service::discovery::v3::DeltaDiscoveryRequest
     279          40 : DeltaSubscriptionState::getNextRequestAckless() {
     280          40 :   envoy::service::discovery::v3::DeltaDiscoveryRequest request;
     281          40 :   must_send_discovery_request_ = false;
     282          40 :   if (!any_request_sent_yet_in_current_stream_) {
     283          15 :     any_request_sent_yet_in_current_stream_ = true;
     284          15 :     const bool is_legacy_wildcard = isInitialRequestForLegacyWildcard();
     285             :     // initial_resource_versions "must be populated for first request in a stream".
     286             :     // Also, since this might be a new server, we must explicitly state *all* of our subscription
     287             :     // interest.
     288          15 :     for (auto const& [resource_name, resource_state] : requested_resource_state_) {
     289             :       // Populate initial_resource_versions with the resource versions we currently have.
     290             :       // Resources we are interested in, but are still waiting to get any version of from the
     291             :       // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
     292          15 :       if (!resource_state.isWaitingForServer()) {
     293           0 :         (*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
     294           0 :       }
     295             :       // We are going over a list of resources that we are interested in, so add them to
     296             :       // resource_names_subscribe.
     297          15 :       names_added_.insert(resource_name);
     298          15 :     }
     299          15 :     for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
     300           0 :       (*request.mutable_initial_resource_versions())[resource_name] = resource_version;
     301           0 :     }
     302          15 :     for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
     303           0 :       (*request.mutable_initial_resource_versions())[resource_name] = resource_version;
     304           0 :     }
     305             :     // If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
     306             :     // empty.
     307          15 :     if (is_legacy_wildcard) {
     308           8 :       names_added_.clear();
     309           8 :     }
     310          15 :     names_removed_.clear();
     311          15 :   }
     312          40 :   std::copy(names_added_.begin(), names_added_.end(),
     313          40 :             Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_subscribe()));
     314          40 :   std::copy(names_removed_.begin(), names_removed_.end(),
     315          40 :             Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_unsubscribe()));
     316          40 :   names_added_.clear();
     317          40 :   names_removed_.clear();
     318             : 
     319          40 :   request.set_type_url(type_url_);
     320          40 :   request.mutable_node()->MergeFrom(local_info_.node());
     321          40 :   return request;
     322          40 : }
     323             : 
     324          15 : bool DeltaSubscriptionState::isInitialRequestForLegacyWildcard() {
     325          15 :   if (in_initial_legacy_wildcard_) {
     326           8 :     requested_resource_state_.insert_or_assign(Wildcard, ResourceState::waitingForServer());
     327           8 :     ASSERT(requested_resource_state_.contains(Wildcard));
     328           8 :     ASSERT(!wildcard_resource_state_.contains(Wildcard));
     329           8 :     ASSERT(!ambiguous_resource_state_.contains(Wildcard));
     330           8 :     return true;
     331           8 :   }
     332             : 
     333             :   // If we are here, this means that we lost our initial wildcard mode, because we subscribed to
     334             :   // something in the past. We could still be in the situation now that all we are subscribed to now
     335             :   // is wildcard resource, so in such case try to send a legacy wildcard subscription request
     336             :   // anyway. For this to happen, two conditions need to apply:
     337             :   //
     338             :   // 1. No change in interest.
     339             :   // 2. The only requested resource is Wildcard resource.
     340             :   //
     341             :   // The invariant of the code here is that this code is executed only when
     342             :   // subscriptionUpdatePending actually returns true, which in our case can only happen if the
     343             :   // requested resources state_ isn't empty.
     344           7 :   ASSERT(!requested_resource_state_.empty());
     345             : 
     346             :   // If our subscription interest didn't change then the first condition for using legacy wildcard
     347             :   // subscription is met.
     348           7 :   if (!names_added_.empty() || !names_removed_.empty()) {
     349           7 :     return false;
     350           7 :   }
     351             :   // If we requested only a wildcard resource then the second condition for using legacy wildcard
     352             :   // condition is met.
     353           0 :   return requested_resource_state_.size() == 1 &&
     354           0 :          requested_resource_state_.begin()->first == Wildcard;
     355           7 : }
     356             : 
     357             : envoy::service::discovery::v3::DeltaDiscoveryRequest
     358          23 : DeltaSubscriptionState::getNextRequestWithAck(const UpdateAck& ack) {
     359          23 :   envoy::service::discovery::v3::DeltaDiscoveryRequest request = getNextRequestAckless();
     360          23 :   request.set_response_nonce(ack.nonce_);
     361          23 :   if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
     362             :     // Don't needlessly make the field present-but-empty if status is ok.
     363           0 :     request.mutable_error_detail()->CopyFrom(ack.error_detail_);
     364           0 :   }
     365          23 :   return request;
     366          23 : }
     367             : 
     368             : void DeltaSubscriptionState::addResourceStateFromServer(
     369          22 :     const envoy::service::discovery::v3::Resource& resource) {
     370          22 :   if (resource.has_ttl()) {
     371           0 :     ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
     372           0 :              resource.name());
     373          22 :   } else {
     374          22 :     ttl_.clear(resource.name());
     375          22 :   }
     376             : 
     377          22 :   if (auto maybe_resource = getRequestedResourceState(resource.name());
     378          22 :       maybe_resource.has_value()) {
     379             :     // It is a resource that we requested.
     380          12 :     maybe_resource->setVersion(resource.version());
     381          12 :     ASSERT(requested_resource_state_.contains(resource.name()));
     382          12 :     ASSERT(!wildcard_resource_state_.contains(resource.name()));
     383          12 :     ASSERT(!ambiguous_resource_state_.contains(resource.name()));
     384          12 :   } else {
     385             :     // It is a resource that is a part of our wildcard request.
     386          10 :     wildcard_resource_state_.insert_or_assign(resource.name(), resource.version());
     387             :     // The resource could be ambiguous before, but now the ambiguity
     388             :     // is resolved.
     389          10 :     ambiguous_resource_state_.erase(resource.name());
     390          10 :     ASSERT(!requested_resource_state_.contains(resource.name()));
     391          10 :     ASSERT(wildcard_resource_state_.contains(resource.name()));
     392          10 :     ASSERT(!ambiguous_resource_state_.contains(resource.name()));
     393          10 :   }
     394          22 : }
     395             : 
     396             : OptRef<DeltaSubscriptionState::ResourceState>
     397          23 : DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) {
     398          23 :   auto itr = requested_resource_state_.find(resource_name);
     399          23 :   if (itr == requested_resource_state_.end()) {
     400          11 :     return {};
     401          11 :   }
     402          12 :   return {itr->second};
     403          23 : }
     404             : 
     405             : OptRef<const DeltaSubscriptionState::ResourceState>
     406           0 : DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) const {
     407           0 :   auto itr = requested_resource_state_.find(resource_name);
     408           0 :   if (itr == requested_resource_state_.end()) {
     409           0 :     return {};
     410           0 :   }
     411           0 :   return {itr->second};
     412           0 : }
     413             : 
     414             : } // namespace Config
     415             : } // namespace Envoy

Generated by: LCOV version 1.15