Line data Source code
1 : #include "source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h"
2 :
3 : #include "envoy/event/dispatcher.h"
4 : #include "envoy/service/discovery/v3/discovery.pb.h"
5 :
6 : #include "source/common/common/hash.h"
7 : #include "source/common/config/utility.h"
8 : #include "source/common/runtime/runtime_features.h"
9 :
10 : namespace Envoy {
11 : namespace Config {
12 : namespace XdsMux {
13 :
14 : DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
15 : UntypedConfigUpdateCallbacks& watch_map,
16 : Event::Dispatcher& dispatcher,
17 : XdsConfigTrackerOptRef xds_config_tracker)
18 : : BaseSubscriptionState(std::move(type_url), watch_map, dispatcher, xds_config_tracker),
19 : // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
20 : // empty resources as updates.
21 15 : supports_heartbeats_(type_url_ != "envoy.config.route.v3.VirtualHost") {}
22 :
23 15 : DeltaSubscriptionState::~DeltaSubscriptionState() = default;
24 :
25 : void DeltaSubscriptionState::updateSubscriptionInterest(
26 : const absl::flat_hash_set<std::string>& cur_added,
27 34 : const absl::flat_hash_set<std::string>& cur_removed) {
28 34 : for (const auto& a : cur_added) {
29 9 : if (in_initial_legacy_wildcard_ && a != Wildcard) {
30 7 : in_initial_legacy_wildcard_ = false;
31 7 : }
32 : // If the requested resource existed as a wildcard resource,
33 : // transition it to requested. Otherwise mark it as a resource
34 : // waiting for the server to receive the version.
35 9 : if (auto it = wildcard_resource_state_.find(a); it != wildcard_resource_state_.end()) {
36 0 : requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
37 0 : wildcard_resource_state_.erase(it);
38 9 : } else if (it = ambiguous_resource_state_.find(a); it != ambiguous_resource_state_.end()) {
39 0 : requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
40 0 : ambiguous_resource_state_.erase(it);
41 9 : } else {
42 9 : requested_resource_state_.insert_or_assign(a, ResourceState::waitingForServer());
43 9 : }
44 9 : ASSERT(requested_resource_state_.contains(a));
45 9 : ASSERT(!wildcard_resource_state_.contains(a));
46 9 : ASSERT(!ambiguous_resource_state_.contains(a));
47 : // If interest in a resource is removed-then-added (all before a discovery request
48 : // can be sent), we must treat it as a "new" addition: our user may have forgotten its
49 : // copy of the resource after instructing us to remove it, and need to be reminded of it.
50 9 : names_removed_.erase(a);
51 9 : names_added_.insert(a);
52 9 : }
53 34 : for (const auto& r : cur_removed) {
54 9 : auto actually_erased = false;
55 : // The resource we have lost the interest in could also come from our wildcard subscription. We
56 : // just don't know it at this point. Instead of removing it outright, mark the resource as not
57 : // interesting to us any more and the server will send us an update. If we don't have a wildcard
58 : // subscription then there is no ambiguity and just drop the resource.
59 9 : if (requested_resource_state_.contains(Wildcard)) {
60 0 : if (auto it = requested_resource_state_.find(r); it != requested_resource_state_.end()) {
61 : // Wildcard resources always have a version. If our requested resource has no version, it
62 : // won't be a wildcard resource then. If r is Wildcard itself, then it never has a version
63 : // attached to it, so it will not be moved to ambiguous category.
64 0 : if (!it->second.isWaitingForServer()) {
65 0 : ambiguous_resource_state_.insert_or_assign(it->first, it->second.version());
66 0 : }
67 0 : requested_resource_state_.erase(it);
68 0 : actually_erased = true;
69 0 : }
70 9 : } else {
71 9 : actually_erased = (requested_resource_state_.erase(r) > 0);
72 9 : }
73 9 : ASSERT(!requested_resource_state_.contains(r));
74 : // Ideally, when interest in a resource is added-then-removed in between requests,
75 : // we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
76 : // in the request. However, the removed-then-added case *does* need to go in the request,
77 : // and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
78 : // add-remove (because "remove-add" has to be treated as equivalent to just "add").
79 9 : names_added_.erase(r);
80 9 : if (actually_erased) {
81 9 : names_removed_.insert(r);
82 9 : in_initial_legacy_wildcard_ = false;
83 9 : }
84 9 : }
85 : // If we unsubscribe from wildcard resource, drop all the resources that came from wildcard from
86 : // cache.
87 34 : if (cur_removed.contains(Wildcard)) {
88 0 : wildcard_resource_state_.clear();
89 0 : ambiguous_resource_state_.clear();
90 0 : }
91 34 : }
92 :
93 : // Not having sent any requests yet counts as an "update pending" since you're supposed to resend
94 : // the entirety of your interest at the start of a stream, even if nothing has changed.
95 346 : bool DeltaSubscriptionState::subscriptionUpdatePending() const {
96 346 : if (!names_added_.empty() || !names_removed_.empty()) {
97 36 : return true;
98 36 : }
99 : // At this point, we have no new resources to subscribe to or any
100 : // resources to unsubscribe from.
101 310 : if (!any_request_sent_yet_in_current_stream_) {
102 : // If we haven't sent anything on the current stream, but we are actually interested in some
103 : // resource then we obviously need to let the server know about those.
104 16 : if (!requested_resource_state_.empty()) {
105 0 : return true;
106 0 : }
107 : // So there are no new names and we are interested in nothing. This may either mean that we want
108 : // the legacy wildcard subscription to kick in or we actually unsubscribed from everything. If
109 : // the latter is true, then we should not be sending any requests. In such case the initial
110 : // wildcard mode will be false. Otherwise it means that the legacy wildcard request should be
111 : // sent.
112 16 : return in_initial_legacy_wildcard_;
113 16 : }
114 :
115 : // At this point, we have no changes in subscription resources and this isn't a first request in
116 : // the stream, so even if there are no resources we are interested in, we can send the request,
117 : // because even if it's empty, it won't be interpreted as legacy wildcard subscription, which can
118 : // only for the first request in the stream. So sending an empty request at this point should be
119 : // harmless.
120 294 : return dynamicContextChanged();
121 310 : }
122 :
123 : bool DeltaSubscriptionState::isHeartbeatResource(
124 22 : const envoy::service::discovery::v3::Resource& resource) const {
125 22 : if (!supports_heartbeats_) {
126 0 : return false;
127 0 : }
128 22 : if (resource.has_resource()) {
129 22 : return false;
130 22 : }
131 :
132 0 : if (const auto maybe_resource = getRequestedResourceState(resource.name());
133 0 : maybe_resource.has_value()) {
134 0 : return !maybe_resource->isWaitingForServer() && resource.version() == maybe_resource->version();
135 0 : }
136 :
137 0 : if (const auto itr = wildcard_resource_state_.find(resource.name());
138 0 : itr != wildcard_resource_state_.end()) {
139 0 : return resource.version() == itr->second;
140 0 : }
141 :
142 0 : if (const auto itr = ambiguous_resource_state_.find(resource.name());
143 0 : itr != ambiguous_resource_state_.end()) {
144 : // In theory we should move the ambiguous resource to wildcard, because probably we shouldn't be
145 : // getting heartbeat responses about resources that we are not interested in, but the server
146 : // could have sent this heartbeat before it learned about our lack of interest in the resource.
147 0 : return resource.version() == itr->second;
148 0 : }
149 :
150 0 : return false;
151 0 : }
152 :
153 : void DeltaSubscriptionState::handleGoodResponse(
154 23 : const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
155 23 : absl::flat_hash_set<std::string> names_added_removed;
156 23 : Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> non_heartbeat_resources;
157 23 : for (const auto& resource : message.resources()) {
158 22 : if (!names_added_removed.insert(resource.name()).second) {
159 0 : throw EnvoyException(
160 0 : fmt::format("duplicate name {} found among added/updated resources", resource.name()));
161 0 : }
162 22 : if (isHeartbeatResource(resource)) {
163 0 : continue;
164 0 : }
165 : // TODO (dmitri-d) consider changing onConfigUpdate callback interface to avoid copying of
166 : // resources
167 22 : non_heartbeat_resources.Add()->CopyFrom(resource);
168 : // DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
169 22 : if (!resource.has_resource() && resource.aliases_size() > 0) {
170 0 : continue;
171 0 : }
172 22 : if (message.type_url() != resource.resource().type_url()) {
173 0 : throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
174 0 : "the message-wide type URL {} in DeltaDiscoveryResponse {}",
175 0 : resource.resource().type_url(), message.type_url(),
176 0 : message.DebugString()));
177 0 : }
178 22 : }
179 23 : for (const auto& name : message.removed_resources()) {
180 1 : if (!names_added_removed.insert(name).second) {
181 0 : throw EnvoyException(
182 0 : fmt::format("duplicate name {} found in the union of added+removed resources", name));
183 0 : }
184 1 : }
185 :
186 23 : callbacks().onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
187 23 : message.system_version_info());
188 :
189 : // Processing point when resources are successfully ingested.
190 23 : if (xds_config_tracker_.has_value()) {
191 0 : xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources,
192 0 : message.removed_resources());
193 0 : }
194 :
195 23 : {
196 23 : const auto scoped_update = ttl_.scopedTtlUpdate();
197 23 : if (requested_resource_state_.contains(Wildcard)) {
198 11 : for (const auto& resource : message.resources()) {
199 10 : addResourceStateFromServer(resource);
200 10 : }
201 12 : } else {
202 : // We are not subscribed to wildcard, so we only take resources that we explicitly requested
203 : // and ignore the others.
204 : // NOTE: This is not gonna work for xdstp resources with glob resource matching.
205 12 : for (const auto& resource : message.resources()) {
206 12 : if (requested_resource_state_.contains(resource.name())) {
207 12 : addResourceStateFromServer(resource);
208 12 : }
209 12 : }
210 12 : }
211 23 : }
212 :
213 : // If a resource is gone, there is no longer a meaningful version for it that makes sense to
214 : // provide to the server upon stream reconnect: either it will continue to not exist, in which
215 : // case saying nothing is fine, or the server will bring back something new, which we should
216 : // receive regardless (which is the logic that not specifying a version will get you).
217 : //
218 : // So, leave the version map entry present but blank if we are still interested in the resource.
219 : // It will be left out of initial_resource_versions messages, but will remind us to explicitly
220 : // tell the server "I'm cancelling my subscription" when we lose interest. In case of resources
221 : // received as a part of the wildcard subscription or resources we already lost interest in, we
222 : // just drop them.
223 23 : for (const auto& resource_name : message.removed_resources()) {
224 1 : if (auto maybe_resource = getRequestedResourceState(resource_name);
225 1 : maybe_resource.has_value()) {
226 0 : maybe_resource->setAsWaitingForServer();
227 1 : } else if (const auto erased_count = ambiguous_resource_state_.erase(resource_name);
228 1 : erased_count == 0) {
229 1 : wildcard_resource_state_.erase(resource_name);
230 1 : }
231 1 : }
232 23 : ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", typeUrl(),
233 23 : message.resources().size(), message.removed_resources().size());
234 23 : }
235 :
236 : std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryRequest>
237 40 : DeltaSubscriptionState::getNextRequestInternal() {
238 40 : auto request = std::make_unique<envoy::service::discovery::v3::DeltaDiscoveryRequest>();
239 40 : request->set_type_url(typeUrl());
240 40 : if (!any_request_sent_yet_in_current_stream_) {
241 15 : any_request_sent_yet_in_current_stream_ = true;
242 15 : const bool is_legacy_wildcard = isInitialRequestForLegacyWildcard();
243 : // initial_resource_versions "must be populated for first request in a stream".
244 : // Also, since this might be a new server, we must explicitly state *all* of our subscription
245 : // interest.
246 15 : for (auto const& [resource_name, resource_state] : requested_resource_state_) {
247 : // Populate initial_resource_versions with the resource versions we currently have.
248 : // Resources we are interested in, but are still waiting to get any version of from the
249 : // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
250 15 : if (!resource_state.isWaitingForServer()) {
251 0 : (*request->mutable_initial_resource_versions())[resource_name] = resource_state.version();
252 0 : }
253 : // We are going over a list of resources that we are interested in, so add them to
254 : // resource_names_subscribe.
255 15 : names_added_.insert(resource_name);
256 15 : }
257 15 : for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
258 0 : (*request->mutable_initial_resource_versions())[resource_name] = resource_version;
259 0 : }
260 15 : for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
261 0 : (*request->mutable_initial_resource_versions())[resource_name] = resource_version;
262 0 : }
263 : // If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
264 : // empty.
265 15 : if (is_legacy_wildcard) {
266 8 : names_added_.clear();
267 8 : }
268 15 : names_removed_.clear();
269 15 : }
270 :
271 40 : std::copy(names_added_.begin(), names_added_.end(),
272 40 : Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_subscribe()));
273 40 : std::copy(names_removed_.begin(), names_removed_.end(),
274 40 : Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_unsubscribe()));
275 40 : names_added_.clear();
276 40 : names_removed_.clear();
277 :
278 40 : return request;
279 40 : }
280 :
281 15 : bool DeltaSubscriptionState::isInitialRequestForLegacyWildcard() {
282 15 : if (in_initial_legacy_wildcard_) {
283 8 : requested_resource_state_.insert_or_assign(Wildcard, ResourceState::waitingForServer());
284 8 : ASSERT(requested_resource_state_.contains(Wildcard));
285 8 : ASSERT(!wildcard_resource_state_.contains(Wildcard));
286 8 : ASSERT(!ambiguous_resource_state_.contains(Wildcard));
287 8 : return true;
288 8 : }
289 :
290 : // If we are here, this means that we lost our initial wildcard mode, because we subscribed to
291 : // something in the past. We could still be in the situation now that all we are subscribed to now
292 : // is wildcard resource, so in such case try to send a legacy wildcard subscription request
293 : // anyway. For this to happen, two conditions need to apply:
294 : //
295 : // 1. No change in interest.
296 : // 2. The only requested resource is Wildcard resource.
297 : //
298 : // The invariant of the code here is that this code is executed only when
299 : // subscriptionUpdatePending actually returns true, which in our case can only happen if the
300 : // requested resources state_ isn't empty.
301 7 : ASSERT(!requested_resource_state_.empty());
302 :
303 : // If our subscription interest didn't change then the first condition for using legacy wildcard
304 : // subscription is met.
305 7 : if (!names_added_.empty() || !names_removed_.empty()) {
306 7 : return false;
307 7 : }
308 : // If we requested only a wildcard resource then the second condition for using legacy wildcard
309 : // condition is met.
310 0 : return requested_resource_state_.size() == 1 &&
311 0 : requested_resource_state_.begin()->first == Wildcard;
312 7 : }
313 :
314 : void DeltaSubscriptionState::addResourceStateFromServer(
315 22 : const envoy::service::discovery::v3::Resource& resource) {
316 22 : setResourceTtl(resource);
317 :
318 22 : if (auto maybe_resource = getRequestedResourceState(resource.name());
319 22 : maybe_resource.has_value()) {
320 : // It is a resource that we requested.
321 12 : maybe_resource->setVersion(resource.version());
322 12 : ASSERT(requested_resource_state_.contains(resource.name()));
323 12 : ASSERT(!wildcard_resource_state_.contains(resource.name()));
324 12 : ASSERT(!ambiguous_resource_state_.contains(resource.name()));
325 12 : } else {
326 : // It is a resource that is a part of our wildcard request.
327 10 : wildcard_resource_state_.insert_or_assign(resource.name(), resource.version());
328 : // The resource could be ambiguous before, but now the ambiguity
329 : // is resolved.
330 10 : ambiguous_resource_state_.erase(resource.name());
331 10 : ASSERT(!requested_resource_state_.contains(resource.name()));
332 10 : ASSERT(wildcard_resource_state_.contains(resource.name()));
333 10 : ASSERT(!ambiguous_resource_state_.contains(resource.name()));
334 10 : }
335 22 : }
336 :
337 : void DeltaSubscriptionState::setResourceTtl(
338 22 : const envoy::service::discovery::v3::Resource& resource) {
339 22 : if (resource.has_ttl()) {
340 0 : ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
341 0 : resource.name());
342 22 : } else {
343 22 : ttl_.clear(resource.name());
344 22 : }
345 22 : }
346 :
347 0 : void DeltaSubscriptionState::ttlExpiryCallback(const std::vector<std::string>& expired) {
348 0 : Protobuf::RepeatedPtrField<std::string> removed_resources;
349 0 : for (const auto& resource : expired) {
350 0 : if (auto maybe_resource = getRequestedResourceState(resource); maybe_resource.has_value()) {
351 0 : maybe_resource->setAsWaitingForServer();
352 0 : removed_resources.Add(std::string(resource));
353 0 : } else if (const auto erased_count = wildcard_resource_state_.erase(resource) +
354 0 : ambiguous_resource_state_.erase(resource);
355 0 : erased_count > 0) {
356 0 : removed_resources.Add(std::string(resource));
357 0 : }
358 0 : }
359 0 : callbacks().onConfigUpdate({}, removed_resources, "");
360 0 : }
361 :
362 : OptRef<DeltaSubscriptionState::ResourceState>
363 23 : DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) {
364 23 : auto itr = requested_resource_state_.find(resource_name);
365 23 : if (itr == requested_resource_state_.end()) {
366 11 : return {};
367 11 : }
368 12 : return {itr->second};
369 23 : }
370 :
371 : OptRef<const DeltaSubscriptionState::ResourceState>
372 0 : DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) const {
373 0 : auto itr = requested_resource_state_.find(resource_name);
374 0 : if (itr == requested_resource_state_.end()) {
375 0 : return {};
376 0 : }
377 0 : return {itr->second};
378 0 : }
379 :
380 : } // namespace XdsMux
381 : } // namespace Config
382 : } // namespace Envoy
|