Line data Source code
1 : #include "source/extensions/config_subscription/grpc/delta_subscription_state.h"
2 :
3 : #include "envoy/event/dispatcher.h"
4 : #include "envoy/service/discovery/v3/discovery.pb.h"
5 :
6 : #include "source/common/common/assert.h"
7 : #include "source/common/common/hash.h"
8 : #include "source/common/config/utility.h"
9 : #include "source/common/runtime/runtime_features.h"
10 :
11 : namespace Envoy {
12 : namespace Config {
13 :
14 : DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
15 : UntypedConfigUpdateCallbacks& watch_map,
16 : const LocalInfo::LocalInfo& local_info,
17 : Event::Dispatcher& dispatcher,
18 : XdsConfigTrackerOptRef xds_config_tracker)
19 : // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
20 : // empty resources as updates.
21 : : supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"),
22 : ttl_(
23 0 : [this](const auto& expired) {
24 0 : Protobuf::RepeatedPtrField<std::string> removed_resources;
25 0 : for (const auto& resource : expired) {
26 0 : if (auto maybe_resource = getRequestedResourceState(resource);
27 0 : maybe_resource.has_value()) {
28 0 : maybe_resource->setAsWaitingForServer();
29 0 : removed_resources.Add(std::string(resource));
30 0 : } else if (const auto erased_count = wildcard_resource_state_.erase(resource) +
31 0 : ambiguous_resource_state_.erase(resource);
32 0 : erased_count > 0) {
33 0 : removed_resources.Add(std::string(resource));
34 0 : }
35 0 : }
36 :
37 0 : watch_map_.onConfigUpdate({}, removed_resources, "");
38 0 : },
39 : dispatcher, dispatcher.timeSource()),
40 : type_url_(std::move(type_url)), watch_map_(watch_map), local_info_(local_info),
41 15 : xds_config_tracker_(xds_config_tracker) {}
42 :
43 : void DeltaSubscriptionState::updateSubscriptionInterest(
44 : const absl::flat_hash_set<std::string>& cur_added,
45 34 : const absl::flat_hash_set<std::string>& cur_removed) {
46 34 : for (const auto& a : cur_added) {
47 9 : if (in_initial_legacy_wildcard_ && a != Wildcard) {
48 7 : in_initial_legacy_wildcard_ = false;
49 7 : }
50 : // If the requested resource existed as a wildcard resource,
51 : // transition it to requested. Otherwise mark it as a resource
52 : // waiting for the server to receive the version.
53 9 : if (auto it = wildcard_resource_state_.find(a); it != wildcard_resource_state_.end()) {
54 0 : requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
55 0 : wildcard_resource_state_.erase(it);
56 9 : } else if (it = ambiguous_resource_state_.find(a); it != ambiguous_resource_state_.end()) {
57 0 : requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
58 0 : ambiguous_resource_state_.erase(it);
59 9 : } else {
60 9 : requested_resource_state_.insert_or_assign(a, ResourceState::waitingForServer());
61 9 : }
62 9 : ASSERT(requested_resource_state_.contains(a));
63 9 : ASSERT(!wildcard_resource_state_.contains(a));
64 9 : ASSERT(!ambiguous_resource_state_.contains(a));
65 : // If interest in a resource is removed-then-added (all before a discovery request
66 : // can be sent), we must treat it as a "new" addition: our user may have forgotten its
67 : // copy of the resource after instructing us to remove it, and need to be reminded of it.
68 9 : names_removed_.erase(a);
69 9 : names_added_.insert(a);
70 9 : }
71 34 : for (const auto& r : cur_removed) {
72 9 : auto actually_erased = false;
73 : // The resource we have lost the interest in could also come from our wildcard subscription. We
74 : // just don't know it at this point. Instead of removing it outright, mark the resource as not
75 : // interesting to us any more and the server will send us an update. If we don't have a wildcard
76 : // subscription then there is no ambiguity and just drop the resource.
77 9 : if (requested_resource_state_.contains(Wildcard)) {
78 0 : if (auto it = requested_resource_state_.find(r); it != requested_resource_state_.end()) {
79 : // Wildcard resources always have a version. If our requested resource has no version, it
80 : // won't be a wildcard resource then. If r is Wildcard itself, then it never has a version
81 : // attached to it, so it will not be moved to ambiguous category.
82 0 : if (!it->second.isWaitingForServer()) {
83 0 : ambiguous_resource_state_.insert_or_assign(it->first, it->second.version());
84 0 : }
85 0 : requested_resource_state_.erase(it);
86 0 : actually_erased = true;
87 0 : }
88 9 : } else {
89 9 : actually_erased = (requested_resource_state_.erase(r) > 0);
90 9 : }
91 9 : ASSERT(!requested_resource_state_.contains(r));
92 : // Ideally, when interest in a resource is added-then-removed in between requests,
93 : // we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
94 : // in the request. However, the removed-then-added case *does* need to go in the request,
95 : // and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
96 : // add-remove (because "remove-add" has to be treated as equivalent to just "add").
97 9 : names_added_.erase(r);
98 9 : if (actually_erased) {
99 9 : names_removed_.insert(r);
100 9 : in_initial_legacy_wildcard_ = false;
101 9 : }
102 9 : }
103 : // If we unsubscribe from wildcard resource, drop all the resources that came from wildcard from
104 : // cache. Also drop the ambiguous resources - we aren't interested in those, but we didn't know if
105 : // those came from wildcard subscription or not, but now it's not important any more.
106 34 : if (cur_removed.contains(Wildcard)) {
107 0 : wildcard_resource_state_.clear();
108 0 : ambiguous_resource_state_.clear();
109 0 : }
110 34 : }
111 :
112 : // Not having sent any requests yet counts as an "update pending" since you're supposed to resend
113 : // the entirety of your interest at the start of a stream, even if nothing has changed.
114 346 : bool DeltaSubscriptionState::subscriptionUpdatePending() const {
115 346 : if (!names_added_.empty() || !names_removed_.empty()) {
116 36 : return true;
117 36 : }
118 : // At this point, we have no new resources to subscribe to or any
119 : // resources to unsubscribe from.
120 310 : if (!any_request_sent_yet_in_current_stream_) {
121 : // If we haven't sent anything on the current stream, but we are actually interested in some
122 : // resource then we obviously need to let the server know about those.
123 16 : if (!requested_resource_state_.empty()) {
124 0 : return true;
125 0 : }
126 : // So there are no new names and we are interested in nothing. This may either mean that we want
127 : // the legacy wildcard subscription to kick in or we actually unsubscribed from everything. If
128 : // the latter is true, then we should not be sending any requests. In such case the initial
129 : // wildcard mode will be false. Otherwise it means that the legacy wildcard request should be
130 : // sent.
131 16 : return in_initial_legacy_wildcard_;
132 16 : }
133 :
134 : // At this point, we have no changes in subscription resources and this isn't a first request in
135 : // the stream, so even if there are no resources we are interested in, we can send the request,
136 : // because even if it's empty, it won't be interpreted as legacy wildcard subscription, which can
137 : // only for the first request in the stream. So sending an empty request at this point should be
138 : // harmless.
139 294 : return must_send_discovery_request_;
140 310 : }
141 :
142 : UpdateAck DeltaSubscriptionState::handleResponse(
143 23 : const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
144 : // We *always* copy the response's nonce into the next request, even if we're going to make that
145 : // request a NACK by setting error_detail.
146 23 : UpdateAck ack(message.nonce(), type_url_);
147 23 : TRY_ASSERT_MAIN_THREAD { handleGoodResponse(message); }
148 23 : END_TRY
149 23 : catch (const EnvoyException& e) {
150 0 : handleBadResponse(e, ack);
151 0 : }
152 23 : return ack;
153 23 : }
154 :
155 : bool DeltaSubscriptionState::isHeartbeatResponse(
156 22 : const envoy::service::discovery::v3::Resource& resource) const {
157 22 : if (!supports_heartbeats_) {
158 0 : return false;
159 0 : }
160 22 : if (resource.has_resource()) {
161 22 : return false;
162 22 : }
163 :
164 0 : if (const auto maybe_resource = getRequestedResourceState(resource.name());
165 0 : maybe_resource.has_value()) {
166 0 : return !maybe_resource->isWaitingForServer() && resource.version() == maybe_resource->version();
167 0 : }
168 :
169 0 : if (const auto itr = wildcard_resource_state_.find(resource.name());
170 0 : itr != wildcard_resource_state_.end()) {
171 0 : return resource.version() == itr->second;
172 0 : }
173 :
174 0 : if (const auto itr = ambiguous_resource_state_.find(resource.name());
175 0 : itr != ambiguous_resource_state_.end()) {
176 : // In theory we should move the ambiguous resource to wildcard, because probably we shouldn't be
177 : // getting heartbeat responses about resources that we are not interested in, but the server
178 : // could have sent this heartbeat before it learned about our lack of interest in the resource.
179 0 : return resource.version() == itr->second;
180 0 : }
181 :
182 0 : return false;
183 0 : }
184 :
185 : void DeltaSubscriptionState::handleGoodResponse(
186 23 : const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
187 23 : absl::flat_hash_set<std::string> names_added_removed;
188 23 : Protobuf::RepeatedPtrField<envoy::service::discovery::v3::Resource> non_heartbeat_resources;
189 23 : for (const auto& resource : message.resources()) {
190 22 : if (!names_added_removed.insert(resource.name()).second) {
191 0 : throw EnvoyException(
192 0 : fmt::format("duplicate name {} found among added/updated resources", resource.name()));
193 0 : }
194 22 : if (isHeartbeatResponse(resource)) {
195 0 : continue;
196 0 : }
197 22 : non_heartbeat_resources.Add()->CopyFrom(resource);
198 : // DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
199 22 : if (!resource.has_resource() && resource.aliases_size() > 0) {
200 0 : continue;
201 0 : }
202 22 : if (message.type_url() != resource.resource().type_url()) {
203 0 : throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match "
204 0 : "the message-wide type URL {} in DeltaDiscoveryResponse {}",
205 0 : resource.resource().type_url(), message.type_url(),
206 0 : message.DebugString()));
207 0 : }
208 22 : }
209 23 : for (const auto& name : message.removed_resources()) {
210 1 : if (!names_added_removed.insert(name).second) {
211 0 : throw EnvoyException(
212 0 : fmt::format("duplicate name {} found in the union of added+removed resources", name));
213 0 : }
214 1 : }
215 :
216 23 : watch_map_.onConfigUpdate(non_heartbeat_resources, message.removed_resources(),
217 23 : message.system_version_info());
218 :
219 : // Processing point when resources are successfully ingested.
220 23 : if (xds_config_tracker_.has_value()) {
221 0 : xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources,
222 0 : message.removed_resources());
223 0 : }
224 :
225 23 : {
226 23 : const auto scoped_update = ttl_.scopedTtlUpdate();
227 23 : if (requested_resource_state_.contains(Wildcard)) {
228 11 : for (const auto& resource : message.resources()) {
229 10 : addResourceStateFromServer(resource);
230 10 : }
231 12 : } else {
232 : // We are not subscribed to wildcard, so we only take resources that we explicitly requested
233 : // and ignore the others.
234 12 : for (const auto& resource : message.resources()) {
235 12 : if (requested_resource_state_.contains(resource.name())) {
236 12 : addResourceStateFromServer(resource);
237 12 : }
238 12 : }
239 12 : }
240 23 : }
241 :
242 : // If a resource is gone, there is no longer a meaningful version for it that makes sense to
243 : // provide to the server upon stream reconnect: either it will continue to not exist, in which
244 : // case saying nothing is fine, or the server will bring back something new, which we should
245 : // receive regardless (which is the logic that not specifying a version will get you).
246 : //
247 : // So, leave the version map entry present but blank if we are still interested in the resource.
248 : // It will be left out of initial_resource_versions messages, but will remind us to explicitly
249 : // tell the server "I'm cancelling my subscription" when we lose interest. In case of resources
250 : // received as a part of the wildcard subscription or resources we already lost interest in, we
251 : // just drop them.
252 23 : for (const auto& resource_name : message.removed_resources()) {
253 1 : if (auto maybe_resource = getRequestedResourceState(resource_name);
254 1 : maybe_resource.has_value()) {
255 0 : maybe_resource->setAsWaitingForServer();
256 1 : } else if (const auto erased_count = ambiguous_resource_state_.erase(resource_name);
257 1 : erased_count == 0) {
258 1 : wildcard_resource_state_.erase(resource_name);
259 1 : }
260 1 : }
261 23 : ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_,
262 23 : message.resources().size(), message.removed_resources().size());
263 23 : }
264 :
265 0 : void DeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAck& ack) {
266 : // Note that error_detail being set is what indicates that a DeltaDiscoveryRequest is a NACK.
267 0 : ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
268 0 : ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
269 0 : ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what());
270 0 : watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
271 0 : }
272 :
273 15 : void DeltaSubscriptionState::handleEstablishmentFailure() {
274 15 : watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
275 15 : nullptr);
276 15 : }
277 :
278 : envoy::service::discovery::v3::DeltaDiscoveryRequest
279 40 : DeltaSubscriptionState::getNextRequestAckless() {
280 40 : envoy::service::discovery::v3::DeltaDiscoveryRequest request;
281 40 : must_send_discovery_request_ = false;
282 40 : if (!any_request_sent_yet_in_current_stream_) {
283 15 : any_request_sent_yet_in_current_stream_ = true;
284 15 : const bool is_legacy_wildcard = isInitialRequestForLegacyWildcard();
285 : // initial_resource_versions "must be populated for first request in a stream".
286 : // Also, since this might be a new server, we must explicitly state *all* of our subscription
287 : // interest.
288 15 : for (auto const& [resource_name, resource_state] : requested_resource_state_) {
289 : // Populate initial_resource_versions with the resource versions we currently have.
290 : // Resources we are interested in, but are still waiting to get any version of from the
291 : // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
292 15 : if (!resource_state.isWaitingForServer()) {
293 0 : (*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
294 0 : }
295 : // We are going over a list of resources that we are interested in, so add them to
296 : // resource_names_subscribe.
297 15 : names_added_.insert(resource_name);
298 15 : }
299 15 : for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
300 0 : (*request.mutable_initial_resource_versions())[resource_name] = resource_version;
301 0 : }
302 15 : for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
303 0 : (*request.mutable_initial_resource_versions())[resource_name] = resource_version;
304 0 : }
305 : // If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
306 : // empty.
307 15 : if (is_legacy_wildcard) {
308 8 : names_added_.clear();
309 8 : }
310 15 : names_removed_.clear();
311 15 : }
312 40 : std::copy(names_added_.begin(), names_added_.end(),
313 40 : Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_subscribe()));
314 40 : std::copy(names_removed_.begin(), names_removed_.end(),
315 40 : Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_unsubscribe()));
316 40 : names_added_.clear();
317 40 : names_removed_.clear();
318 :
319 40 : request.set_type_url(type_url_);
320 40 : request.mutable_node()->MergeFrom(local_info_.node());
321 40 : return request;
322 40 : }
323 :
324 15 : bool DeltaSubscriptionState::isInitialRequestForLegacyWildcard() {
325 15 : if (in_initial_legacy_wildcard_) {
326 8 : requested_resource_state_.insert_or_assign(Wildcard, ResourceState::waitingForServer());
327 8 : ASSERT(requested_resource_state_.contains(Wildcard));
328 8 : ASSERT(!wildcard_resource_state_.contains(Wildcard));
329 8 : ASSERT(!ambiguous_resource_state_.contains(Wildcard));
330 8 : return true;
331 8 : }
332 :
333 : // If we are here, this means that we lost our initial wildcard mode, because we subscribed to
334 : // something in the past. We could still be in the situation now that all we are subscribed to now
335 : // is wildcard resource, so in such case try to send a legacy wildcard subscription request
336 : // anyway. For this to happen, two conditions need to apply:
337 : //
338 : // 1. No change in interest.
339 : // 2. The only requested resource is Wildcard resource.
340 : //
341 : // The invariant of the code here is that this code is executed only when
342 : // subscriptionUpdatePending actually returns true, which in our case can only happen if the
343 : // requested resources state_ isn't empty.
344 7 : ASSERT(!requested_resource_state_.empty());
345 :
346 : // If our subscription interest didn't change then the first condition for using legacy wildcard
347 : // subscription is met.
348 7 : if (!names_added_.empty() || !names_removed_.empty()) {
349 7 : return false;
350 7 : }
351 : // If we requested only a wildcard resource then the second condition for using legacy wildcard
352 : // condition is met.
353 0 : return requested_resource_state_.size() == 1 &&
354 0 : requested_resource_state_.begin()->first == Wildcard;
355 7 : }
356 :
357 : envoy::service::discovery::v3::DeltaDiscoveryRequest
358 23 : DeltaSubscriptionState::getNextRequestWithAck(const UpdateAck& ack) {
359 23 : envoy::service::discovery::v3::DeltaDiscoveryRequest request = getNextRequestAckless();
360 23 : request.set_response_nonce(ack.nonce_);
361 23 : if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
362 : // Don't needlessly make the field present-but-empty if status is ok.
363 0 : request.mutable_error_detail()->CopyFrom(ack.error_detail_);
364 0 : }
365 23 : return request;
366 23 : }
367 :
368 : void DeltaSubscriptionState::addResourceStateFromServer(
369 22 : const envoy::service::discovery::v3::Resource& resource) {
370 22 : if (resource.has_ttl()) {
371 0 : ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
372 0 : resource.name());
373 22 : } else {
374 22 : ttl_.clear(resource.name());
375 22 : }
376 :
377 22 : if (auto maybe_resource = getRequestedResourceState(resource.name());
378 22 : maybe_resource.has_value()) {
379 : // It is a resource that we requested.
380 12 : maybe_resource->setVersion(resource.version());
381 12 : ASSERT(requested_resource_state_.contains(resource.name()));
382 12 : ASSERT(!wildcard_resource_state_.contains(resource.name()));
383 12 : ASSERT(!ambiguous_resource_state_.contains(resource.name()));
384 12 : } else {
385 : // It is a resource that is a part of our wildcard request.
386 10 : wildcard_resource_state_.insert_or_assign(resource.name(), resource.version());
387 : // The resource could be ambiguous before, but now the ambiguity
388 : // is resolved.
389 10 : ambiguous_resource_state_.erase(resource.name());
390 10 : ASSERT(!requested_resource_state_.contains(resource.name()));
391 10 : ASSERT(wildcard_resource_state_.contains(resource.name()));
392 10 : ASSERT(!ambiguous_resource_state_.contains(resource.name()));
393 10 : }
394 22 : }
395 :
396 : OptRef<DeltaSubscriptionState::ResourceState>
397 23 : DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) {
398 23 : auto itr = requested_resource_state_.find(resource_name);
399 23 : if (itr == requested_resource_state_.end()) {
400 11 : return {};
401 11 : }
402 12 : return {itr->second};
403 23 : }
404 :
405 : OptRef<const DeltaSubscriptionState::ResourceState>
406 0 : DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) const {
407 0 : auto itr = requested_resource_state_.find(resource_name);
408 0 : if (itr == requested_resource_state_.end()) {
409 0 : return {};
410 0 : }
411 0 : return {itr->second};
412 0 : }
413 :
414 : } // namespace Config
415 : } // namespace Envoy
|