Line data Source code
1 : #include "source/extensions/config_subscription/grpc/watch_map.h"
2 :
3 : #include "envoy/service/discovery/v3/discovery.pb.h"
4 :
5 : #include "source/common/common/cleanup.h"
6 : #include "source/common/common/utility.h"
7 : #include "source/common/config/decoded_resource_impl.h"
8 : #include "source/common/config/utility.h"
9 : #include "source/common/config/xds_resource.h"
10 :
11 : namespace Envoy {
12 : namespace Config {
13 :
14 : namespace {
15 : // Returns the namespace part (if there's any) in the resource name.
16 0 : std::string namespaceFromName(const std::string& resource_name) {
17 : // We simply remove the last / component. E.g. www.foo.com/bar becomes www.foo.com.
18 0 : const auto pos = resource_name.find_last_of('/');
19 : // We are not interested in the "/" character in the namespace
20 0 : return pos == std::string::npos ? "" : resource_name.substr(0, pos);
21 0 : }
22 : } // namespace
23 :
24 : Watch* WatchMap::addWatch(SubscriptionCallbacks& callbacks,
25 84 : OpaqueResourceDecoder& resource_decoder) {
26 84 : auto watch = std::make_unique<Watch>(callbacks, resource_decoder);
27 84 : Watch* watch_ptr = watch.get();
28 84 : wildcard_watches_.insert(watch_ptr);
29 84 : watches_.insert(std::move(watch));
30 84 : return watch_ptr;
31 84 : }
32 :
33 84 : void WatchMap::removeWatch(Watch* watch) {
34 84 : if (deferred_removed_during_update_ != nullptr) {
35 0 : deferred_removed_during_update_->insert(watch);
36 84 : } else {
37 84 : wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
38 84 : watches_.erase(watch);
39 84 : }
40 84 : }
41 :
42 134 : void WatchMap::removeDeferredWatches() {
43 134 : for (auto& watch : *deferred_removed_during_update_) {
44 0 : wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
45 0 : watches_.erase(watch);
46 0 : }
47 134 : deferred_removed_during_update_ = nullptr;
48 134 : }
49 :
50 : AddedRemoved
51 : WatchMap::updateWatchInterest(Watch* watch,
52 168 : const absl::flat_hash_set<std::string>& update_to_these_names) {
53 168 : if (update_to_these_names.empty() || update_to_these_names.contains(Wildcard)) {
54 120 : wildcard_watches_.insert(watch);
55 120 : } else {
56 48 : wildcard_watches_.erase(watch);
57 48 : }
58 :
59 168 : absl::flat_hash_set<std::string> newly_added_to_watch;
60 168 : SetUtil::setDifference(update_to_these_names, watch->resource_names_, newly_added_to_watch);
61 :
62 168 : absl::flat_hash_set<std::string> newly_removed_from_watch;
63 168 : SetUtil::setDifference(watch->resource_names_, update_to_these_names, newly_removed_from_watch);
64 :
65 168 : watch->resource_names_ = update_to_these_names;
66 :
67 : // First resources are added and only then removed, so a watch won't be removed
68 : // if its interest has been replaced (rather than completely removed).
69 168 : absl::flat_hash_set<std::string> added_resources = findAdditions(newly_added_to_watch, watch);
70 168 : absl::flat_hash_set<std::string> removed_resources =
71 168 : findRemovals(newly_removed_from_watch, watch);
72 : // Remove cached resource that are no longer relevant.
73 168 : if (eds_resources_cache_.has_value()) {
74 0 : for (const auto& resource_name : removed_resources) {
75 : // This may pass a resource_name that is not in the cache, for example
76 : // if the resource contents has never arrived.
77 0 : eds_resources_cache_->removeResource(resource_name);
78 0 : }
79 0 : }
80 168 : return {std::move(added_resources), std::move(removed_resources)};
81 168 : }
82 :
83 188 : absl::flat_hash_set<Watch*> WatchMap::watchesInterestedIn(const std::string& resource_name) {
84 188 : absl::flat_hash_set<Watch*> ret;
85 188 : if (!use_namespace_matching_) {
86 188 : ret = wildcard_watches_;
87 188 : }
88 188 : const bool is_xdstp = XdsResourceIdentifier::hasXdsTpScheme(resource_name);
89 188 : xds::core::v3::ResourceName xdstp_resource;
90 188 : XdsResourceIdentifier::EncodeOptions encode_options;
91 188 : encode_options.sort_context_params_ = true;
92 : // First look for an exact match. If this is xdstp:// we need to normalize context parameters.
93 188 : if (is_xdstp) {
94 : // TODO(htuch): factor this (and stuff in namespaceFromName) into a dedicated library.
95 : // This is not very efficient; it is possible to canonicalize etc. much faster with raw string
96 : // operations, but this implementation provides a reference for later optimization while we
97 : // adopt xdstp://.
98 0 : auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
99 0 : THROW_IF_STATUS_NOT_OK(resource_or_error, throw);
100 0 : xdstp_resource = resource_or_error.value();
101 0 : }
102 188 : auto watches_interested = watch_interest_.find(
103 188 : is_xdstp ? XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options) : resource_name);
104 : // If that fails, consider namespace/glob matching. This is the slow path for xdstp:// and should
105 : // only happen for glob collections. TODO(htuch): It should be possible to have much more
106 : // efficient matchers here.
107 188 : if (watches_interested == watch_interest_.end()) {
108 112 : if (use_namespace_matching_) {
109 0 : watches_interested = watch_interest_.find(namespaceFromName(resource_name));
110 112 : } else if (is_xdstp) {
111 : // Replace resource name component with glob for purpose of matching.
112 0 : const auto pos = xdstp_resource.id().find_last_of('/');
113 0 : xdstp_resource.set_id(pos == std::string::npos ? "*"
114 0 : : xdstp_resource.id().substr(0, pos) + "/*");
115 0 : const std::string encoded_name =
116 0 : XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options);
117 0 : watches_interested = watch_interest_.find(encoded_name);
118 0 : }
119 112 : }
120 188 : if (watches_interested != watch_interest_.end()) {
121 76 : for (const auto& watch : watches_interested->second) {
122 76 : ret.insert(watch);
123 76 : }
124 76 : }
125 188 : return ret;
126 188 : }
127 :
128 : void WatchMap::onConfigUpdate(const std::vector<DecodedResourcePtr>& resources,
129 88 : const std::string& version_info) {
130 88 : if (watches_.empty()) {
131 0 : return;
132 0 : }
133 :
134 : // Track any removals triggered by earlier watch updates.
135 88 : ASSERT(deferred_removed_during_update_ == nullptr);
136 88 : deferred_removed_during_update_ = std::make_unique<absl::flat_hash_set<Watch*>>();
137 88 : Cleanup cleanup([this] { removeDeferredWatches(); });
138 : // The xDS server may send a resource that Envoy isn't interested in. This bit array
139 : // will hold an "interesting" bit for each of the resources sent in the update.
140 88 : std::vector<bool> interesting_resources;
141 88 : interesting_resources.reserve(resources.size());
142 : // Build a map from watches, to the set of updated resources that each watch cares about. Each
143 : // entry in the map is then a nice little bundle that can be fed directly into the individual
144 : // onConfigUpdate()s.
145 88 : absl::flat_hash_map<Watch*, std::vector<DecodedResourceRef>> per_watch_updates;
146 142 : for (const auto& r : resources) {
147 142 : const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r->name());
148 142 : for (const auto& interested_watch : interested_in_r) {
149 131 : per_watch_updates[interested_watch].emplace_back(*r);
150 131 : }
151 : // Set the corresponding interested_resources entry to true iff there is a
152 : // watch interested in the resource.
153 142 : interesting_resources.emplace_back(!interested_in_r.empty());
154 142 : }
155 :
156 : // Execute external config validators.
157 88 : config_validators_.executeValidators(type_url_, resources);
158 :
159 88 : const bool map_is_single_wildcard = (watches_.size() == 1 && wildcard_watches_.size() == 1);
160 : // We just bundled up the updates into nice per-watch packages. Now, deliver them.
161 108 : for (auto& watch : watches_) {
162 108 : if (deferred_removed_during_update_->count(watch.get()) > 0) {
163 0 : continue;
164 0 : }
165 108 : const auto this_watch_updates = per_watch_updates.find(watch);
166 108 : if (this_watch_updates == per_watch_updates.end()) {
167 : // This update included no resources this watch cares about.
168 : // 1) If there is only a single, wildcard watch (i.e. Cluster or Listener), always call
169 : // its onConfigUpdate even if just a no-op, to properly maintain state-of-the-world
170 : // semantics and the update_empty stat.
171 : // 2) If this watch previously had some resources, it means this update is removing all
172 : // of this watch's resources, so the watch must be informed with an onConfigUpdate.
173 : // 3) Otherwise, we can skip onConfigUpdate for this watch.
174 11 : if (map_is_single_wildcard || !watch->state_of_the_world_empty_) {
175 2 : watch->state_of_the_world_empty_ = true;
176 2 : THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate({}, version_info));
177 2 : }
178 97 : } else {
179 97 : watch->state_of_the_world_empty_ = false;
180 97 : THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info));
181 97 : }
182 108 : }
183 :
184 88 : if (eds_resources_cache_.has_value()) {
185 : // Add/update the watched resources to/in the cache.
186 : // Only resources that have a watcher should be updated.
187 0 : for (uint32_t resource_idx = 0; resource_idx < resources.size(); ++resource_idx) {
188 0 : if (interesting_resources[resource_idx]) {
189 0 : const auto& resource = resources[resource_idx];
190 0 : const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
191 0 : dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
192 0 : resource.get()->resource());
193 0 : eds_resources_cache_->setResource(resource.get()->name(), cluster_load_assignment);
194 0 : }
195 0 : }
196 : // Note: No need to remove resources from the cache, as currently only non-collection
197 : // subscriptions are supported, and these resources are removed in the call
198 : // to updateWatchInterest().
199 0 : }
200 88 : }
201 :
202 : void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
203 0 : const std::string& version_info) {
204 0 : if (watches_.empty()) {
205 0 : return;
206 0 : }
207 :
208 0 : std::vector<DecodedResourcePtr> decoded_resources;
209 0 : for (const auto& r : resources) {
210 0 : decoded_resources.emplace_back(
211 0 : DecodedResourceImpl::fromResource((*watches_.begin())->resource_decoder_, r, version_info));
212 0 : }
213 :
214 0 : onConfigUpdate(decoded_resources, version_info);
215 0 : }
216 :
217 : void WatchMap::onConfigUpdate(
218 : const Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource>& added_resources,
219 : const Protobuf::RepeatedPtrField<std::string>& removed_resources,
220 46 : const std::string& system_version_info) {
221 : // Track any removals triggered by earlier watch updates.
222 46 : ASSERT(deferred_removed_during_update_ == nullptr);
223 46 : deferred_removed_during_update_ = std::make_unique<absl::flat_hash_set<Watch*>>();
224 46 : Cleanup cleanup([this] { removeDeferredWatches(); });
225 : // Build a pair of maps: from watches, to the set of resources {added,removed} that each watch
226 : // cares about. Each entry in the map-pair is then a nice little bundle that can be fed directly
227 : // into the individual onConfigUpdate()s.
228 46 : std::vector<DecodedResourcePtr> decoded_resources;
229 46 : absl::flat_hash_map<Watch*, std::vector<DecodedResourceRef>> per_watch_added;
230 46 : for (const auto& r : added_resources) {
231 44 : const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r.name());
232 : // If there are no watches, then we don't need to decode. If there are watches, they should all
233 : // be for the same resource type, so we can just use the callbacks of the first watch to decode.
234 44 : if (interested_in_r.empty()) {
235 0 : continue;
236 0 : }
237 44 : decoded_resources.emplace_back(
238 44 : new DecodedResourceImpl((*interested_in_r.begin())->resource_decoder_, r));
239 44 : for (const auto& interested_watch : interested_in_r) {
240 44 : per_watch_added[interested_watch].emplace_back(*decoded_resources.back());
241 44 : }
242 44 : }
243 46 : absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<std::string>> per_watch_removed;
244 46 : for (const auto& r : removed_resources) {
245 2 : const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r);
246 2 : for (const auto& interested_watch : interested_in_r) {
247 2 : *per_watch_removed[interested_watch].Add() = r;
248 2 : }
249 2 : }
250 :
251 : // Execute external config validators.
252 46 : config_validators_.executeValidators(type_url_, decoded_resources, removed_resources);
253 :
254 : // We just bundled up the updates into nice per-watch packages. Now, deliver them.
255 46 : for (const auto& [cur_watch, resource_to_add] : per_watch_added) {
256 44 : if (deferred_removed_during_update_->count(cur_watch) > 0) {
257 0 : continue;
258 0 : }
259 44 : const auto removed = per_watch_removed.find(cur_watch);
260 44 : if (removed == per_watch_removed.end()) {
261 : // additions only, no removals
262 44 : THROW_IF_NOT_OK(
263 44 : cur_watch->callbacks_.onConfigUpdate(resource_to_add, {}, system_version_info));
264 44 : } else {
265 : // both additions and removals
266 0 : THROW_IF_NOT_OK(cur_watch->callbacks_.onConfigUpdate(resource_to_add, removed->second,
267 0 : system_version_info));
268 : // Drop the removals now, so the final removals-only pass won't use them.
269 0 : per_watch_removed.erase(removed);
270 0 : }
271 44 : }
272 : // Any removals-only updates will not have been picked up in the per_watch_added loop.
273 46 : for (auto& [cur_watch, resource_to_remove] : per_watch_removed) {
274 2 : if (deferred_removed_during_update_->count(cur_watch) > 0) {
275 0 : continue;
276 0 : }
277 2 : THROW_IF_NOT_OK(
278 2 : cur_watch->callbacks_.onConfigUpdate({}, resource_to_remove, system_version_info));
279 2 : }
280 : // notify empty update
281 46 : if (added_resources.empty() && removed_resources.empty()) {
282 0 : for (auto& cur_watch : wildcard_watches_) {
283 0 : THROW_IF_NOT_OK(cur_watch->callbacks_.onConfigUpdate({}, {}, system_version_info));
284 0 : }
285 0 : }
286 :
287 46 : if (eds_resources_cache_.has_value()) {
288 : // Add/update the watched resources to/in the cache.
289 0 : for (const auto& resource : decoded_resources) {
290 0 : const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
291 0 : dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
292 0 : resource->resource());
293 0 : eds_resources_cache_->setResource(resource->name(), cluster_load_assignment);
294 0 : }
295 : // No need to remove resources from the cache, as currently only non-collection
296 : // subscriptions are supported, and these resources are removed in the call
297 : // to updateWatchInterest().
298 0 : }
299 46 : }
300 :
301 69 : void WatchMap::onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) {
302 82 : for (auto& watch : watches_) {
303 82 : watch->callbacks_.onConfigUpdateFailed(reason, e);
304 82 : }
305 69 : }
306 :
307 : absl::flat_hash_set<std::string>
308 : WatchMap::findAdditions(const absl::flat_hash_set<std::string>& newly_added_to_watch,
309 168 : Watch* watch) {
310 168 : absl::flat_hash_set<std::string> newly_added_to_subscription;
311 168 : for (const auto& name : newly_added_to_watch) {
312 48 : auto entry = watch_interest_.find(name);
313 48 : if (entry == watch_interest_.end()) {
314 48 : newly_added_to_subscription.insert(name);
315 48 : watch_interest_[name] = {watch};
316 48 : } else {
317 : // Add this watch to the already-existing set at watch_interest_[name]
318 0 : entry->second.insert(watch);
319 0 : }
320 48 : }
321 168 : return newly_added_to_subscription;
322 168 : }
323 :
324 : absl::flat_hash_set<std::string>
325 : WatchMap::findRemovals(const absl::flat_hash_set<std::string>& newly_removed_from_watch,
326 168 : Watch* watch) {
327 168 : absl::flat_hash_set<std::string> newly_removed_from_subscription;
328 168 : for (const auto& name : newly_removed_from_watch) {
329 48 : auto entry = watch_interest_.find(name);
330 48 : RELEASE_ASSERT(
331 48 : entry != watch_interest_.end(),
332 48 : fmt::format("WatchMap: tried to remove a watch from untracked resource {}", name));
333 :
334 48 : entry->second.erase(watch);
335 48 : if (entry->second.empty()) {
336 48 : watch_interest_.erase(entry);
337 48 : newly_removed_from_subscription.insert(name);
338 48 : }
339 48 : }
340 168 : return newly_removed_from_subscription;
341 168 : }
342 :
343 : } // namespace Config
344 : } // namespace Envoy
|