LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc - watch_map.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 160 232 69.0 %
Date: 2024-01-05 06:35:25 Functions: 12 14 85.7 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/grpc/watch_map.h"
       2             : 
       3             : #include "envoy/service/discovery/v3/discovery.pb.h"
       4             : 
       5             : #include "source/common/common/cleanup.h"
       6             : #include "source/common/common/utility.h"
       7             : #include "source/common/config/decoded_resource_impl.h"
       8             : #include "source/common/config/utility.h"
       9             : #include "source/common/config/xds_resource.h"
      10             : 
      11             : namespace Envoy {
      12             : namespace Config {
      13             : 
      14             : namespace {
      15             : // Returns the namespace part (if there's any) in the resource name.
      16           0 : std::string namespaceFromName(const std::string& resource_name) {
      17             :   // We simply remove the last / component. E.g. www.foo.com/bar becomes www.foo.com.
      18           0 :   const auto pos = resource_name.find_last_of('/');
      19             :   // We are not interested in the "/" character in the namespace
      20           0 :   return pos == std::string::npos ? "" : resource_name.substr(0, pos);
      21           0 : }
      22             : } // namespace
      23             : 
      24             : Watch* WatchMap::addWatch(SubscriptionCallbacks& callbacks,
      25          84 :                           OpaqueResourceDecoder& resource_decoder) {
      26          84 :   auto watch = std::make_unique<Watch>(callbacks, resource_decoder);
      27          84 :   Watch* watch_ptr = watch.get();
      28          84 :   wildcard_watches_.insert(watch_ptr);
      29          84 :   watches_.insert(std::move(watch));
      30          84 :   return watch_ptr;
      31          84 : }
      32             : 
      33          84 : void WatchMap::removeWatch(Watch* watch) {
      34          84 :   if (deferred_removed_during_update_ != nullptr) {
      35           0 :     deferred_removed_during_update_->insert(watch);
      36          84 :   } else {
      37          84 :     wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
      38          84 :     watches_.erase(watch);
      39          84 :   }
      40          84 : }
      41             : 
      42         134 : void WatchMap::removeDeferredWatches() {
      43         134 :   for (auto& watch : *deferred_removed_during_update_) {
      44           0 :     wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
      45           0 :     watches_.erase(watch);
      46           0 :   }
      47         134 :   deferred_removed_during_update_ = nullptr;
      48         134 : }
      49             : 
      50             : AddedRemoved
      51             : WatchMap::updateWatchInterest(Watch* watch,
      52         168 :                               const absl::flat_hash_set<std::string>& update_to_these_names) {
      53         168 :   if (update_to_these_names.empty() || update_to_these_names.contains(Wildcard)) {
      54         120 :     wildcard_watches_.insert(watch);
      55         120 :   } else {
      56          48 :     wildcard_watches_.erase(watch);
      57          48 :   }
      58             : 
      59         168 :   absl::flat_hash_set<std::string> newly_added_to_watch;
      60         168 :   SetUtil::setDifference(update_to_these_names, watch->resource_names_, newly_added_to_watch);
      61             : 
      62         168 :   absl::flat_hash_set<std::string> newly_removed_from_watch;
      63         168 :   SetUtil::setDifference(watch->resource_names_, update_to_these_names, newly_removed_from_watch);
      64             : 
      65         168 :   watch->resource_names_ = update_to_these_names;
      66             : 
      67             :   // First resources are added and only then removed, so a watch won't be removed
      68             :   // if its interest has been replaced (rather than completely removed).
      69         168 :   absl::flat_hash_set<std::string> added_resources = findAdditions(newly_added_to_watch, watch);
      70         168 :   absl::flat_hash_set<std::string> removed_resources =
      71         168 :       findRemovals(newly_removed_from_watch, watch);
      72             :   // Remove cached resource that are no longer relevant.
      73         168 :   if (eds_resources_cache_.has_value()) {
      74           0 :     for (const auto& resource_name : removed_resources) {
      75             :       // This may pass a resource_name that is not in the cache, for example
      76             :       // if the resource contents has never arrived.
      77           0 :       eds_resources_cache_->removeResource(resource_name);
      78           0 :     }
      79           0 :   }
      80         168 :   return {std::move(added_resources), std::move(removed_resources)};
      81         168 : }
      82             : 
      83         188 : absl::flat_hash_set<Watch*> WatchMap::watchesInterestedIn(const std::string& resource_name) {
      84         188 :   absl::flat_hash_set<Watch*> ret;
      85         188 :   if (!use_namespace_matching_) {
      86         188 :     ret = wildcard_watches_;
      87         188 :   }
      88         188 :   const bool is_xdstp = XdsResourceIdentifier::hasXdsTpScheme(resource_name);
      89         188 :   xds::core::v3::ResourceName xdstp_resource;
      90         188 :   XdsResourceIdentifier::EncodeOptions encode_options;
      91         188 :   encode_options.sort_context_params_ = true;
      92             :   // First look for an exact match. If this is xdstp:// we need to normalize context parameters.
      93         188 :   if (is_xdstp) {
      94             :     // TODO(htuch): factor this (and stuff in namespaceFromName) into a dedicated library.
      95             :     // This is not very efficient; it is possible to canonicalize etc. much faster with raw string
      96             :     // operations, but this implementation provides a reference for later optimization while we
      97             :     // adopt xdstp://.
      98           0 :     auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
      99           0 :     THROW_IF_STATUS_NOT_OK(resource_or_error, throw);
     100           0 :     xdstp_resource = resource_or_error.value();
     101           0 :   }
     102         188 :   auto watches_interested = watch_interest_.find(
     103         188 :       is_xdstp ? XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options) : resource_name);
     104             :   // If that fails, consider namespace/glob matching. This is the slow path for xdstp:// and should
     105             :   // only happen for glob collections. TODO(htuch): It should be possible to have much more
     106             :   // efficient matchers here.
     107         188 :   if (watches_interested == watch_interest_.end()) {
     108         112 :     if (use_namespace_matching_) {
     109           0 :       watches_interested = watch_interest_.find(namespaceFromName(resource_name));
     110         112 :     } else if (is_xdstp) {
     111             :       // Replace resource name component with glob for purpose of matching.
     112           0 :       const auto pos = xdstp_resource.id().find_last_of('/');
     113           0 :       xdstp_resource.set_id(pos == std::string::npos ? "*"
     114           0 :                                                      : xdstp_resource.id().substr(0, pos) + "/*");
     115           0 :       const std::string encoded_name =
     116           0 :           XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options);
     117           0 :       watches_interested = watch_interest_.find(encoded_name);
     118           0 :     }
     119         112 :   }
     120         188 :   if (watches_interested != watch_interest_.end()) {
     121          76 :     for (const auto& watch : watches_interested->second) {
     122          76 :       ret.insert(watch);
     123          76 :     }
     124          76 :   }
     125         188 :   return ret;
     126         188 : }
     127             : 
     128             : void WatchMap::onConfigUpdate(const std::vector<DecodedResourcePtr>& resources,
     129          88 :                               const std::string& version_info) {
     130          88 :   if (watches_.empty()) {
     131           0 :     return;
     132           0 :   }
     133             : 
     134             :   // Track any removals triggered by earlier watch updates.
     135          88 :   ASSERT(deferred_removed_during_update_ == nullptr);
     136          88 :   deferred_removed_during_update_ = std::make_unique<absl::flat_hash_set<Watch*>>();
     137          88 :   Cleanup cleanup([this] { removeDeferredWatches(); });
     138             :   // The xDS server may send a resource that Envoy isn't interested in. This bit array
     139             :   // will hold an "interesting" bit for each of the resources sent in the update.
     140          88 :   std::vector<bool> interesting_resources;
     141          88 :   interesting_resources.reserve(resources.size());
     142             :   // Build a map from watches, to the set of updated resources that each watch cares about. Each
     143             :   // entry in the map is then a nice little bundle that can be fed directly into the individual
     144             :   // onConfigUpdate()s.
     145          88 :   absl::flat_hash_map<Watch*, std::vector<DecodedResourceRef>> per_watch_updates;
     146         142 :   for (const auto& r : resources) {
     147         142 :     const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r->name());
     148         142 :     for (const auto& interested_watch : interested_in_r) {
     149         131 :       per_watch_updates[interested_watch].emplace_back(*r);
     150         131 :     }
     151             :     // Set the corresponding interested_resources entry to true iff there is a
     152             :     // watch interested in the resource.
     153         142 :     interesting_resources.emplace_back(!interested_in_r.empty());
     154         142 :   }
     155             : 
     156             :   // Execute external config validators.
     157          88 :   config_validators_.executeValidators(type_url_, resources);
     158             : 
     159          88 :   const bool map_is_single_wildcard = (watches_.size() == 1 && wildcard_watches_.size() == 1);
     160             :   // We just bundled up the updates into nice per-watch packages. Now, deliver them.
     161         108 :   for (auto& watch : watches_) {
     162         108 :     if (deferred_removed_during_update_->count(watch.get()) > 0) {
     163           0 :       continue;
     164           0 :     }
     165         108 :     const auto this_watch_updates = per_watch_updates.find(watch);
     166         108 :     if (this_watch_updates == per_watch_updates.end()) {
     167             :       // This update included no resources this watch cares about.
     168             :       // 1) If there is only a single, wildcard watch (i.e. Cluster or Listener), always call
     169             :       //    its onConfigUpdate even if just a no-op, to properly maintain state-of-the-world
     170             :       //    semantics and the update_empty stat.
     171             :       // 2) If this watch previously had some resources, it means this update is removing all
     172             :       //    of this watch's resources, so the watch must be informed with an onConfigUpdate.
     173             :       // 3) Otherwise, we can skip onConfigUpdate for this watch.
     174          11 :       if (map_is_single_wildcard || !watch->state_of_the_world_empty_) {
     175           2 :         watch->state_of_the_world_empty_ = true;
     176           2 :         THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate({}, version_info));
     177           2 :       }
     178          97 :     } else {
     179          97 :       watch->state_of_the_world_empty_ = false;
     180          97 :       THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info));
     181          97 :     }
     182         108 :   }
     183             : 
     184          88 :   if (eds_resources_cache_.has_value()) {
     185             :     // Add/update the watched resources to/in the cache.
     186             :     // Only resources that have a watcher should be updated.
     187           0 :     for (uint32_t resource_idx = 0; resource_idx < resources.size(); ++resource_idx) {
     188           0 :       if (interesting_resources[resource_idx]) {
     189           0 :         const auto& resource = resources[resource_idx];
     190           0 :         const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
     191           0 :             dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
     192           0 :                 resource.get()->resource());
     193           0 :         eds_resources_cache_->setResource(resource.get()->name(), cluster_load_assignment);
     194           0 :       }
     195           0 :     }
     196             :     // Note: No need to remove resources from the cache, as currently only non-collection
     197             :     // subscriptions are supported, and these resources are removed in the call
     198             :     // to updateWatchInterest().
     199           0 :   }
     200          88 : }
     201             : 
     202             : void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
     203           0 :                               const std::string& version_info) {
     204           0 :   if (watches_.empty()) {
     205           0 :     return;
     206           0 :   }
     207             : 
     208           0 :   std::vector<DecodedResourcePtr> decoded_resources;
     209           0 :   for (const auto& r : resources) {
     210           0 :     decoded_resources.emplace_back(
     211           0 :         DecodedResourceImpl::fromResource((*watches_.begin())->resource_decoder_, r, version_info));
     212           0 :   }
     213             : 
     214           0 :   onConfigUpdate(decoded_resources, version_info);
     215           0 : }
     216             : 
     217             : void WatchMap::onConfigUpdate(
     218             :     const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
     219             :     const Protobuf::RepeatedPtrField<std::string>& removed_resources,
     220          46 :     const std::string& system_version_info) {
     221             :   // Track any removals triggered by earlier watch updates.
     222          46 :   ASSERT(deferred_removed_during_update_ == nullptr);
     223          46 :   deferred_removed_during_update_ = std::make_unique<absl::flat_hash_set<Watch*>>();
     224          46 :   Cleanup cleanup([this] { removeDeferredWatches(); });
     225             :   // Build a pair of maps: from watches, to the set of resources {added,removed} that each watch
     226             :   // cares about. Each entry in the map-pair is then a nice little bundle that can be fed directly
     227             :   // into the individual onConfigUpdate()s.
     228          46 :   std::vector<DecodedResourcePtr> decoded_resources;
     229          46 :   absl::flat_hash_map<Watch*, std::vector<DecodedResourceRef>> per_watch_added;
     230          46 :   for (const auto& r : added_resources) {
     231          44 :     const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r.name());
     232             :     // If there are no watches, then we don't need to decode. If there are watches, they should all
     233             :     // be for the same resource type, so we can just use the callbacks of the first watch to decode.
     234          44 :     if (interested_in_r.empty()) {
     235           0 :       continue;
     236           0 :     }
     237          44 :     decoded_resources.emplace_back(
     238          44 :         new DecodedResourceImpl((*interested_in_r.begin())->resource_decoder_, r));
     239          44 :     for (const auto& interested_watch : interested_in_r) {
     240          44 :       per_watch_added[interested_watch].emplace_back(*decoded_resources.back());
     241          44 :     }
     242          44 :   }
     243          46 :   absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<std::string>> per_watch_removed;
     244          46 :   for (const auto& r : removed_resources) {
     245           2 :     const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r);
     246           2 :     for (const auto& interested_watch : interested_in_r) {
     247           2 :       *per_watch_removed[interested_watch].Add() = r;
     248           2 :     }
     249           2 :   }
     250             : 
     251             :   // Execute external config validators.
     252          46 :   config_validators_.executeValidators(type_url_, decoded_resources, removed_resources);
     253             : 
     254             :   // We just bundled up the updates into nice per-watch packages. Now, deliver them.
     255          46 :   for (const auto& [cur_watch, resource_to_add] : per_watch_added) {
     256          44 :     if (deferred_removed_during_update_->count(cur_watch) > 0) {
     257           0 :       continue;
     258           0 :     }
     259          44 :     const auto removed = per_watch_removed.find(cur_watch);
     260          44 :     if (removed == per_watch_removed.end()) {
     261             :       // additions only, no removals
     262          44 :       THROW_IF_NOT_OK(
     263          44 :           cur_watch->callbacks_.onConfigUpdate(resource_to_add, {}, system_version_info));
     264          44 :     } else {
     265             :       // both additions and removals
     266           0 :       THROW_IF_NOT_OK(cur_watch->callbacks_.onConfigUpdate(resource_to_add, removed->second,
     267           0 :                                                            system_version_info));
     268             :       // Drop the removals now, so the final removals-only pass won't use them.
     269           0 :       per_watch_removed.erase(removed);
     270           0 :     }
     271          44 :   }
     272             :   // Any removals-only updates will not have been picked up in the per_watch_added loop.
     273          46 :   for (auto& [cur_watch, resource_to_remove] : per_watch_removed) {
     274           2 :     if (deferred_removed_during_update_->count(cur_watch) > 0) {
     275           0 :       continue;
     276           0 :     }
     277           2 :     THROW_IF_NOT_OK(
     278           2 :         cur_watch->callbacks_.onConfigUpdate({}, resource_to_remove, system_version_info));
     279           2 :   }
     280             :   // notify empty update
     281          46 :   if (added_resources.empty() && removed_resources.empty()) {
     282           0 :     for (auto& cur_watch : wildcard_watches_) {
     283           0 :       THROW_IF_NOT_OK(cur_watch->callbacks_.onConfigUpdate({}, {}, system_version_info));
     284           0 :     }
     285           0 :   }
     286             : 
     287          46 :   if (eds_resources_cache_.has_value()) {
     288             :     // Add/update the watched resources to/in the cache.
     289           0 :     for (const auto& resource : decoded_resources) {
     290           0 :       const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
     291           0 :           dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
     292           0 :               resource->resource());
     293           0 :       eds_resources_cache_->setResource(resource->name(), cluster_load_assignment);
     294           0 :     }
     295             :     // No need to remove resources from the cache, as currently only non-collection
     296             :     // subscriptions are supported, and these resources are removed in the call
     297             :     // to updateWatchInterest().
     298           0 :   }
     299          46 : }
     300             : 
     301          69 : void WatchMap::onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) {
     302          82 :   for (auto& watch : watches_) {
     303          82 :     watch->callbacks_.onConfigUpdateFailed(reason, e);
     304          82 :   }
     305          69 : }
     306             : 
     307             : absl::flat_hash_set<std::string>
     308             : WatchMap::findAdditions(const absl::flat_hash_set<std::string>& newly_added_to_watch,
     309         168 :                         Watch* watch) {
     310         168 :   absl::flat_hash_set<std::string> newly_added_to_subscription;
     311         168 :   for (const auto& name : newly_added_to_watch) {
     312          48 :     auto entry = watch_interest_.find(name);
     313          48 :     if (entry == watch_interest_.end()) {
     314          48 :       newly_added_to_subscription.insert(name);
     315          48 :       watch_interest_[name] = {watch};
     316          48 :     } else {
     317             :       // Add this watch to the already-existing set at watch_interest_[name]
     318           0 :       entry->second.insert(watch);
     319           0 :     }
     320          48 :   }
     321         168 :   return newly_added_to_subscription;
     322         168 : }
     323             : 
     324             : absl::flat_hash_set<std::string>
     325             : WatchMap::findRemovals(const absl::flat_hash_set<std::string>& newly_removed_from_watch,
     326         168 :                        Watch* watch) {
     327         168 :   absl::flat_hash_set<std::string> newly_removed_from_subscription;
     328         168 :   for (const auto& name : newly_removed_from_watch) {
     329          48 :     auto entry = watch_interest_.find(name);
     330          48 :     RELEASE_ASSERT(
     331          48 :         entry != watch_interest_.end(),
     332          48 :         fmt::format("WatchMap: tried to remove a watch from untracked resource {}", name));
     333             : 
     334          48 :     entry->second.erase(watch);
     335          48 :     if (entry->second.empty()) {
     336          48 :       watch_interest_.erase(entry);
     337          48 :       newly_removed_from_subscription.insert(name);
     338          48 :     }
     339          48 :   }
     340         168 :   return newly_removed_from_subscription;
     341         168 : }
     342             : 
     343             : } // namespace Config
     344             : } // namespace Envoy

Generated by: LCOV version 1.15