1
#include "source/extensions/config_subscription/grpc/watch_map.h"
2

            
3
#include "envoy/service/discovery/v3/discovery.pb.h"
4

            
5
#include "source/common/common/cleanup.h"
6
#include "source/common/common/utility.h"
7
#include "source/common/config/decoded_resource_impl.h"
8
#include "source/common/config/utility.h"
9
#include "source/common/config/xds_resource.h"
10

            
11
namespace Envoy {
12
namespace Config {
13

            
14
namespace {
15
// Returns the namespace part (if there's any) in the resource name.
16
97
std::string namespaceFromName(const std::string& resource_name) {
17
  // We simply remove the last / component. E.g. www.foo.com/bar becomes www.foo.com.
18
97
  const auto pos = resource_name.find_last_of('/');
19
  // We are not interested in the "/" character in the namespace
20
97
  return pos == std::string::npos ? "" : resource_name.substr(0, pos);
21
97
}
22
} // namespace
23

            
24
Watch* WatchMap::addWatch(SubscriptionCallbacks& callbacks,
25
1417
                          OpaqueResourceDecoder& resource_decoder) {
26
1417
  auto watch = std::make_unique<Watch>(callbacks, resource_decoder);
27
1417
  Watch* watch_ptr = watch.get();
28
1417
  wildcard_watches_.insert(watch_ptr);
29
1417
  watches_.insert(std::move(watch));
30
1417
  return watch_ptr;
31
1417
}
32

            
33
1398
void WatchMap::removeWatch(Watch* watch) {
34
1398
  if (deferred_removed_during_update_ != nullptr) {
35
8
    deferred_removed_during_update_->insert(watch);
36
1395
  } else {
37
1390
    wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
38
1390
    watches_.erase(watch);
39
1390
  }
40
1398
}
41

            
42
2870
void WatchMap::removeDeferredWatches() {
43
2870
  for (auto& watch : *deferred_removed_during_update_) {
44
8
    wildcard_watches_.erase(watch); // may or may not be in there, but we want it gone.
45
8
    watches_.erase(watch);
46
8
  }
47
2870
  deferred_removed_during_update_ = nullptr;
48
2870
}
49

            
50
AddedRemoved
51
WatchMap::updateWatchInterest(Watch* watch,
52
2850
                              const absl::flat_hash_set<std::string>& update_to_these_names) {
53
2850
  if (update_to_these_names.empty() || update_to_these_names.contains(Wildcard)) {
54
1965
    wildcard_watches_.insert(watch);
55
2021
  } else {
56
885
    wildcard_watches_.erase(watch);
57
885
  }
58

            
59
2850
  absl::flat_hash_set<std::string> newly_added_to_watch;
60
2850
  SetUtil::setDifference(update_to_these_names, watch->resource_names_, newly_added_to_watch);
61

            
62
2850
  absl::flat_hash_set<std::string> newly_removed_from_watch;
63
2850
  SetUtil::setDifference(watch->resource_names_, update_to_these_names, newly_removed_from_watch);
64

            
65
2850
  watch->resource_names_ = update_to_these_names;
66

            
67
  // First resources are added and only then removed, so a watch won't be removed
68
  // if its interest has been replaced (rather than completely removed).
69
2850
  absl::flat_hash_set<std::string> added_resources = findAdditions(newly_added_to_watch, watch);
70
2850
  absl::flat_hash_set<std::string> removed_resources =
71
2850
      findRemovals(newly_removed_from_watch, watch);
72
  // Remove cached resource that are no longer relevant.
73
2850
  if (eds_resources_cache_.has_value()) {
74
489
    for (const auto& resource_name : removed_resources) {
75
      // This may pass a resource_name that is not in the cache, for example
76
      // if the resource contents has never arrived.
77
230
      eds_resources_cache_->removeResource(resource_name);
78
230
    }
79
489
  }
80
2850
  return {std::move(added_resources), std::move(removed_resources)};
81
2850
}
82

            
83
2111
absl::flat_hash_set<Watch*> WatchMap::watchesInterestedIn(const std::string& resource_name) {
84
2111
  absl::flat_hash_set<Watch*> ret;
85
2111
  if (!use_namespace_matching_) {
86
2014
    ret = wildcard_watches_;
87
2014
  }
88
2111
  const bool is_xdstp = XdsResourceIdentifier::hasXdsTpScheme(resource_name);
89
2111
  xds::core::v3::ResourceName xdstp_resource;
90
2111
  XdsResourceIdentifier::EncodeOptions encode_options;
91
2111
  encode_options.sort_context_params_ = true;
92
  // First look for an exact match. If this is xdstp:// we need to normalize context parameters.
93
2111
  if (is_xdstp) {
94
    // TODO(htuch): factor this (and stuff in namespaceFromName) into a dedicated library.
95
    // This is not very efficient; it is possible to canonicalize etc. much faster with raw string
96
    // operations, but this implementation provides a reference for later optimization while we
97
    // adopt xdstp://.
98
239
    auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
99
239
    THROW_IF_NOT_OK_REF(resource_or_error.status());
100
239
    xdstp_resource = resource_or_error.value();
101
239
  }
102
2111
  auto watches_interested = watch_interest_.find(
103
2111
      is_xdstp ? XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options) : resource_name);
104
  // If that fails, consider namespace/glob matching. This is the slow path for xdstp:// and should
105
  // only happen for glob collections. TODO(htuch): It should be possible to have much more
106
  // efficient matchers here.
107
2111
  if (watches_interested == watch_interest_.end()) {
108
1427
    if (use_namespace_matching_) {
109
97
      watches_interested = watch_interest_.find(namespaceFromName(resource_name));
110
1390
    } else if (is_xdstp) {
111
      // Replace resource name component with glob for purpose of matching.
112
152
      const auto pos = xdstp_resource.id().find_last_of('/');
113
152
      xdstp_resource.set_id(pos == std::string::npos ? "*"
114
152
                                                     : xdstp_resource.id().substr(0, pos) + "/*");
115
152
      const std::string encoded_name =
116
152
          XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options);
117
152
      watches_interested = watch_interest_.find(encoded_name);
118
152
    }
119
1427
  }
120
2111
  if (watches_interested != watch_interest_.end()) {
121
929
    for (const auto& watch : watches_interested->second) {
122
929
      ret.insert(watch);
123
929
    }
124
908
  }
125
2111
  return ret;
126
2111
}
127

            
128
void WatchMap::onConfigUpdate(const std::vector<DecodedResourcePtr>& resources,
129
1085
                              const std::string& version_info) {
130
1085
  if (watches_.empty()) {
131
4
    return;
132
4
  }
133

            
134
  // Track any removals triggered by earlier watch updates.
135
1081
  ASSERT(deferred_removed_during_update_ == nullptr);
136
1081
  deferred_removed_during_update_ = std::make_unique<absl::flat_hash_set<Watch*>>();
137
1081
  Cleanup cleanup([this] { removeDeferredWatches(); });
138
  // The xDS server may send a resource that Envoy isn't interested in. This bit array
139
  // will hold an "interesting" bit for each of the resources sent in the update.
140
1081
  std::vector<bool> interesting_resources;
141
1081
  interesting_resources.reserve(resources.size());
142
  // Build a map from watches, to the set of updated resources that each watch cares about. Each
143
  // entry in the map is then a nice little bundle that can be fed directly into the individual
144
  // onConfigUpdate()s.
145
1081
  absl::flat_hash_map<Watch*, std::vector<DecodedResourceRef>> per_watch_updates;
146
1094
  for (const auto& r : resources) {
147
148
    const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r->name());
148
152
    for (const auto& interested_watch : interested_in_r) {
149
150
      per_watch_updates[interested_watch].emplace_back(*r);
150
150
    }
151
    // Set the corresponding interested_resources entry to true iff there is a
152
    // watch interested in the resource.
153
148
    interesting_resources.emplace_back(!interested_in_r.empty());
154
148
  }
155

            
156
  // Execute external config validators.
157
1081
  config_validators_->executeValidators(type_url_, resources);
158

            
159
1081
  const bool map_is_single_wildcard = (watches_.size() == 1 && wildcard_watches_.size() == 1);
160
  // We just bundled up the updates into nice per-watch packages. Now, deliver them.
161
1100
  for (auto& watch : watches_) {
162
1100
    if (deferred_removed_during_update_->count(watch.get()) > 0) {
163
1
      continue;
164
1
    }
165
1099
    const auto this_watch_updates = per_watch_updates.find(watch);
166
1099
    if (this_watch_updates == per_watch_updates.end()) {
167
      // This update included no resources this watch cares about.
168
      // 1) If there is only a single, wildcard watch (i.e. Cluster or Listener), always call
169
      //    its onConfigUpdate even if just a no-op, to properly maintain state-of-the-world
170
      //    semantics and the update_empty stat.
171
      // 2) If this watch previously had some resources, it means this update is removing all
172
      //    of this watch's resources, so the watch must be informed with an onConfigUpdate.
173
      // 3) Otherwise, we can skip onConfigUpdate for this watch.
174
964
      if (map_is_single_wildcard || !watch->state_of_the_world_empty_) {
175
15
        watch->state_of_the_world_empty_ = true;
176
15
        THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate({}, version_info));
177
15
      }
178
1055
    } else {
179
135
      watch->state_of_the_world_empty_ = false;
180
135
      THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(this_watch_updates->second, version_info));
181
135
    }
182
1099
  }
183

            
184
1079
  if (eds_resources_cache_.has_value()) {
185
    // Add/update the watched resources to/in the cache.
186
    // Only resources that have a watcher should be updated.
187
50
    for (uint32_t resource_idx = 0; resource_idx < resources.size(); ++resource_idx) {
188
25
      if (interesting_resources[resource_idx]) {
189
25
        const auto& resource = resources[resource_idx];
190
25
        const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
191
25
            dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
192
25
                resource.get()->resource());
193
25
        eds_resources_cache_->setResource(resource.get()->name(), cluster_load_assignment);
194
25
      }
195
25
    }
196
    // Note: No need to remove resources from the cache, as currently only non-collection
197
    // subscriptions are supported, and these resources are removed in the call
198
    // to updateWatchInterest().
199
25
  }
200
1079
}
201

            
202
void WatchMap::onConfigUpdate(const Protobuf::RepeatedPtrField<Protobuf::Any>& resources,
203
19
                              const std::string& version_info) {
204
19
  if (watches_.empty()) {
205
1
    return;
206
1
  }
207

            
208
18
  std::vector<DecodedResourcePtr> decoded_resources;
209
18
  decoded_resources.reserve(resources.size());
210
21
  for (const auto& r : resources) {
211
21
    decoded_resources.emplace_back(THROW_OR_RETURN_VALUE(
212
21
        DecodedResourceImpl::fromResource((*watches_.begin())->resource_decoder_, r, version_info),
213
21
        DecodedResourceImplPtr));
214
21
  }
215

            
216
18
  onConfigUpdate(decoded_resources, version_info);
217
18
}
218

            
219
void WatchMap::onConfigUpdate(
220
    absl::Span<const envoy::service::discovery::v3::Resource* const> added_resources,
221
    const Protobuf::RepeatedPtrField<std::string>& removed_resources,
222
1789
    const std::string& system_version_info) {
223
  // Track any removals triggered by earlier watch updates.
224
1789
  ASSERT(deferred_removed_during_update_ == nullptr);
225
1789
  deferred_removed_during_update_ = std::make_unique<absl::flat_hash_set<Watch*>>();
226
1789
  Cleanup cleanup([this] { removeDeferredWatches(); });
227
  // Build a pair of maps: from watches, to the set of resources {added,removed} that each watch
228
  // cares about. Each entry in the map-pair is then a nice little bundle that can be fed directly
229
  // into the individual onConfigUpdate()s.
230
1789
  std::vector<DecodedResourcePtr> decoded_resources;
231
1789
  absl::flat_hash_map<Watch*, std::vector<DecodedResourceRef>> per_watch_added;
232
  // If the server behaves according to the protocol, it will only send the
233
  // resources the watch map is interested in. Reserve the correct amount of
234
  // space for the vector for the good case.
235
1789
  decoded_resources.reserve(added_resources.size());
236
1895
  for (const auto* r : added_resources) {
237
1589
    const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r->name());
238
    // If there are no watches, then we don't need to decode. If there are watches, they should all
239
    // be for the same resource type, so we can just use the callbacks of the first watch to decode.
240
1589
    if (interested_in_r.empty()) {
241
25
      continue;
242
25
    }
243
1564
    decoded_resources.emplace_back(
244
1564
        new DecodedResourceImpl((*interested_in_r.begin())->resource_decoder_, *r));
245
1661
    for (const auto& interested_watch : interested_in_r) {
246
1655
      per_watch_added[interested_watch].emplace_back(*decoded_resources.back());
247
1655
    }
248
1564
  }
249
1789
  absl::flat_hash_map<Watch*, Protobuf::RepeatedPtrField<std::string>> per_watch_removed;
250
1789
  for (const auto& r : removed_resources) {
251
374
    const absl::flat_hash_set<Watch*>& interested_in_r = watchesInterestedIn(r);
252
382
    for (const auto& interested_watch : interested_in_r) {
253
361
      *per_watch_removed[interested_watch].Add() = r;
254
361
    }
255
374
  }
256

            
257
  // Execute external config validators.
258
1789
  config_validators_->executeValidators(type_url_, decoded_resources, removed_resources);
259

            
260
  // We just bundled up the updates into nice per-watch packages. Now, deliver them.
261
1801
  for (const auto& [cur_watch, resource_to_add] : per_watch_added) {
262
1518
    if (deferred_removed_during_update_->count(cur_watch) > 0) {
263
2
      continue;
264
2
    }
265
1516
    const auto removed = per_watch_removed.find(cur_watch);
266
1516
    if (removed == per_watch_removed.end()) {
267
      // additions only, no removals
268
1486
      THROW_IF_NOT_OK(
269
1486
          cur_watch->callbacks_.onConfigUpdate(resource_to_add, {}, system_version_info));
270
1486
    } else {
271
      // both additions and removals
272
30
      THROW_IF_NOT_OK(cur_watch->callbacks_.onConfigUpdate(resource_to_add, removed->second,
273
30
                                                           system_version_info));
274
      // Drop the removals now, so the final removals-only pass won't use them.
275
30
      per_watch_removed.erase(removed);
276
30
    }
277
1516
  }
278
  // Any removals-only updates will not have been picked up in the per_watch_added loop.
279
1785
  for (auto& [cur_watch, resource_to_remove] : per_watch_removed) {
280
319
    if (deferred_removed_during_update_->count(cur_watch) > 0) {
281
1
      continue;
282
1
    }
283
318
    THROW_IF_NOT_OK(
284
318
        cur_watch->callbacks_.onConfigUpdate({}, resource_to_remove, system_version_info));
285
318
  }
286
  // notify empty update
287
1785
  if (added_resources.empty() && removed_resources.empty()) {
288
79
    for (auto& cur_watch : wildcard_watches_) {
289
79
      THROW_IF_NOT_OK(cur_watch->callbacks_.onConfigUpdate({}, {}, system_version_info));
290
79
    }
291
79
  }
292

            
293
1785
  if (eds_resources_cache_.has_value()) {
294
    // Add/update the watched resources to/in the cache.
295
203
    for (const auto& resource : decoded_resources) {
296
203
      const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
297
203
          dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
298
203
              resource->resource());
299
203
      eds_resources_cache_->setResource(resource->name(), cluster_load_assignment);
300
203
    }
301
    // No need to remove resources from the cache, as currently only non-collection
302
    // subscriptions are supported, and these resources are removed in the call
303
    // to updateWatchInterest().
304
187
  }
305
1785
}
306

            
307
1529
void WatchMap::onConfigUpdateFailed(ConfigUpdateFailureReason reason, const EnvoyException* e) {
308
1743
  for (auto& watch : watches_) {
309
1740
    watch->callbacks_.onConfigUpdateFailed(reason, e);
310
1740
  }
311
1529
}
312

            
313
absl::flat_hash_set<std::string>
314
WatchMap::findAdditions(const absl::flat_hash_set<std::string>& newly_added_to_watch,
315
2850
                        Watch* watch) {
316
2850
  absl::flat_hash_set<std::string> newly_added_to_subscription;
317
2854
  for (const auto& name : newly_added_to_watch) {
318
965
    auto entry = watch_interest_.find(name);
319
965
    if (entry == watch_interest_.end()) {
320
936
      newly_added_to_subscription.insert(name);
321
936
      watch_interest_[name] = {watch};
322
936
    } else {
323
      // Add this watch to the already-existing set at watch_interest_[name]
324
29
      entry->second.insert(watch);
325
29
    }
326
965
  }
327
2850
  return newly_added_to_subscription;
328
2850
}
329

            
330
absl::flat_hash_set<std::string>
331
WatchMap::findRemovals(const absl::flat_hash_set<std::string>& newly_removed_from_watch,
332
2850
                       Watch* watch) {
333
2850
  absl::flat_hash_set<std::string> newly_removed_from_subscription;
334
2852
  for (const auto& name : newly_removed_from_watch) {
335
937
    auto entry = watch_interest_.find(name);
336
937
    RELEASE_ASSERT(
337
937
        entry != watch_interest_.end(),
338
937
        fmt::format("WatchMap: tried to remove a watch from untracked resource {}", name));
339

            
340
937
    entry->second.erase(watch);
341
937
    if (entry->second.empty()) {
342
917
      watch_interest_.erase(entry);
343
917
      newly_removed_from_subscription.insert(name);
344
917
    }
345
937
  }
346
2850
  return newly_removed_from_subscription;
347
2850
}
348

            
349
} // namespace Config
350
} // namespace Envoy