1
#include "source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h"
2

            
3
#include "envoy/event/dispatcher.h"
4
#include "envoy/service/discovery/v3/discovery.pb.h"
5

            
6
#include "source/common/common/hash.h"
7
#include "source/common/config/utility.h"
8
#include "source/common/runtime/runtime_features.h"
9

            
10
namespace Envoy {
11
namespace Config {
12
namespace XdsMux {
13

            
14
DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
15
                                               UntypedConfigUpdateCallbacks& watch_map,
16
                                               Event::Dispatcher& dispatcher,
17
                                               XdsConfigTrackerOptRef xds_config_tracker)
18
309
    : BaseSubscriptionState(std::move(type_url), watch_map, dispatcher, xds_config_tracker),
19
      // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
20
      // empty resources as updates.
21
309
      supports_heartbeats_(type_url_ != "envoy.config.route.v3.VirtualHost") {}
22

            
23
309
DeltaSubscriptionState::~DeltaSubscriptionState() = default;
24

            
25
void DeltaSubscriptionState::updateSubscriptionInterest(
26
    const absl::flat_hash_set<std::string>& cur_added,
27
798
    const absl::flat_hash_set<std::string>& cur_removed) {
28
871
  for (const auto& a : cur_added) {
29
462
    if (in_initial_legacy_wildcard_ && a != Wildcard) {
30
210
      in_initial_legacy_wildcard_ = false;
31
210
    }
32
    // If the requested resource existed as a wildcard resource,
33
    // transition it to requested. Otherwise mark it as a resource
34
    // waiting for the server to receive the version.
35
462
    if (auto it = wildcard_resource_state_.find(a); it != wildcard_resource_state_.end()) {
36
4
      requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
37
4
      wildcard_resource_state_.erase(it);
38
458
    } else if (it = ambiguous_resource_state_.find(a); it != ambiguous_resource_state_.end()) {
39
2
      requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
40
2
      ambiguous_resource_state_.erase(it);
41
456
    } else {
42
456
      requested_resource_state_.insert_or_assign(a, ResourceState::waitingForServer());
43
456
    }
44
462
    ASSERT(requested_resource_state_.contains(a));
45
462
    ASSERT(!wildcard_resource_state_.contains(a));
46
462
    ASSERT(!ambiguous_resource_state_.contains(a));
47
    // If interest in a resource is removed-then-added (all before a discovery request
48
    // can be sent), we must treat it as a "new" addition: our user may have forgotten its
49
    // copy of the resource after instructing us to remove it, and need to be reminded of it.
50
462
    names_removed_.erase(a);
51
462
    names_added_.insert(a);
52
462
  }
53
799
  for (const auto& r : cur_removed) {
54
277
    auto actually_erased = false;
55
    // The resource we have lost the interest in could also come from our wildcard subscription. We
56
    // just don't know it at this point. Instead of removing it outright, mark the resource as not
57
    // interesting to us any more and the server will send us an update. If we don't have a wildcard
58
    // subscription then there is no ambiguity and just drop the resource.
59
277
    if (requested_resource_state_.contains(Wildcard)) {
60
90
      if (auto it = requested_resource_state_.find(r); it != requested_resource_state_.end()) {
61
        // Wildcard resources always have a version. If our requested resource has no version, it
62
        // won't be a wildcard resource then. If r is Wildcard itself, then it never has a version
63
        // attached to it, so it will not be moved to ambiguous category.
64
74
        if (!it->second.isWaitingForServer()) {
65
54
          ambiguous_resource_state_.insert_or_assign(it->first, it->second.version());
66
54
        }
67
74
        requested_resource_state_.erase(it);
68
74
        actually_erased = true;
69
74
      }
70
225
    } else {
71
187
      actually_erased = (requested_resource_state_.erase(r) > 0);
72
187
    }
73
277
    ASSERT(!requested_resource_state_.contains(r));
74
    // Ideally, when interest in a resource is added-then-removed in between requests,
75
    // we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
76
    // in the request. However, the removed-then-added case *does* need to go in the request,
77
    // and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
78
    // add-remove (because "remove-add" has to be treated as equivalent to just "add").
79
277
    names_added_.erase(r);
80
277
    if (actually_erased) {
81
261
      names_removed_.insert(r);
82
261
      in_initial_legacy_wildcard_ = false;
83
261
    }
84
277
  }
85
  // If we unsubscribe from wildcard resource, drop all the resources that came from wildcard from
86
  // cache.
87
798
  if (cur_removed.contains(Wildcard)) {
88
6
    wildcard_resource_state_.clear();
89
6
    ambiguous_resource_state_.clear();
90
6
  }
91
798
}
92

            
93
// Not having sent any requests yet counts as an "update pending" since you're supposed to resend
94
// the entirety of your interest at the start of a stream, even if nothing has changed.
95
4451
bool DeltaSubscriptionState::subscriptionUpdatePending() const {
96
4451
  if (!names_added_.empty() || !names_removed_.empty()) {
97
645
    return true;
98
645
  }
99
  // At this point, we have no new resources to subscribe to or any
100
  // resources to unsubscribe from.
101
3806
  if (!any_request_sent_yet_in_current_stream_) {
102
    // If we haven't sent anything on the current stream, but we are actually interested in some
103
    // resource then we obviously need to let the server know about those.
104
449
    if (!requested_resource_state_.empty()) {
105
137
      return true;
106
137
    }
107
    // So there are no new names and we are interested in nothing. This may either mean that we want
108
    // the legacy wildcard subscription to kick in or we actually unsubscribed from everything. If
109
    // the latter is true, then we should not be sending any requests. In such case the initial
110
    // wildcard mode will be false. Otherwise it means that the legacy wildcard request should be
111
    // sent.
112
312
    return in_initial_legacy_wildcard_;
113
449
  }
114

            
115
  // At this point, we have no changes in subscription resources and this isn't a first request in
116
  // the stream, so even if there are no resources we are interested in, we can send the request,
117
  // because even if it's empty, it won't be interpreted as legacy wildcard subscription, which can
118
  // only for the first request in the stream. So sending an empty request at this point should be
119
  // harmless.
120
3357
  return dynamicContextChanged();
121
3806
}
122

            
123
263
void DeltaSubscriptionState::markStreamFresh(bool should_send_initial_resource_versions) {
124
263
  any_request_sent_yet_in_current_stream_ = false;
125
263
  should_send_initial_resource_versions_ = should_send_initial_resource_versions;
126
263
}
127

            
128
bool DeltaSubscriptionState::isHeartbeatResource(
129
973
    const envoy::service::discovery::v3::Resource& resource) const {
130
973
  if (!supports_heartbeats_) {
131
8
    return false;
132
8
  }
133
965
  if (resource.has_resource()) {
134
598
    return false;
135
598
  }
136

            
137
367
  if (const auto maybe_resource = getRequestedResourceState(resource.name());
138
367
      maybe_resource.has_value()) {
139
241
    return !maybe_resource->isWaitingForServer() && resource.version() == maybe_resource->version();
140
241
  }
141

            
142
126
  if (const auto itr = wildcard_resource_state_.find(resource.name());
143
126
      itr != wildcard_resource_state_.end()) {
144
12
    return resource.version() == itr->second;
145
12
  }
146

            
147
114
  if (const auto itr = ambiguous_resource_state_.find(resource.name());
148
114
      itr != ambiguous_resource_state_.end()) {
149
    // In theory we should move the ambiguous resource to wildcard, because probably we shouldn't be
150
    // getting heartbeat responses about resources that we are not interested in, but the server
151
    // could have sent this heartbeat before it learned about our lack of interest in the resource.
152
4
    return resource.version() == itr->second;
153
4
  }
154

            
155
110
  return false;
156
114
}
157

            
158
void DeltaSubscriptionState::handleGoodResponse(
159
398
    envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
160
398
  absl::flat_hash_set<std::string> names_added_removed;
161

            
162
525
  for (const auto& resource : message.resources()) {
163
497
    if (!names_added_removed.insert(resource.name()).second) {
164
4
      throwEnvoyExceptionOrPanic(
165
4
          fmt::format("duplicate name {} found among added/updated resources", resource.name()));
166
4
    }
167
493
    if (isHeartbeatResource(resource)) {
168
14
      continue;
169
14
    }
170
    // DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
171
479
    if (!resource.has_resource() && resource.aliases_size() > 0) {
172
2
      continue;
173
2
    }
174
477
    if (message.type_url() != resource.resource().type_url()) {
175
7
      throwEnvoyExceptionOrPanic(
176
7
          fmt::format("type URL {} embedded in an individual Any does not match "
177
7
                      "the message-wide type URL {} in DeltaDiscoveryResponse {}",
178
7
                      resource.resource().type_url(), message.type_url(), message.DebugString()));
179
7
    }
180
477
  }
181
387
  for (const auto& name : message.removed_resources()) {
182
32
    if (!names_added_removed.insert(name).second) {
183
4
      throwEnvoyExceptionOrPanic(
184
4
          fmt::format("duplicate name {} found in the union of added+removed resources", name));
185
4
    }
186
32
  }
187

            
188
  // Reorder the resources in the response, having all the non-heartbeat
189
  // resources at the front of the list. Note that although there's no
190
  // requirement to keep stable ordering, we do so to process the resources in
191
  // the order they were sent.
192
383
  auto last_non_heartbeat = std::stable_partition(
193
383
      message.mutable_resources()->begin(), message.mutable_resources()->end(),
194
508
      [&](const envoy::service::discovery::v3::Resource& resource) {
195
480
        return !isHeartbeatResource(resource);
196
480
      });
197

            
198
383
  auto non_heartbeat_resources_span = absl::MakeConstSpan(
199
383
      message.resources().data(), last_non_heartbeat - message.resources().begin());
200
383
  callbacks().onConfigUpdate(non_heartbeat_resources_span, message.removed_resources(),
201
383
                             message.system_version_info());
202

            
203
  // Processing point when resources are successfully ingested.
204
383
  if (xds_config_tracker_.has_value()) {
205
4
    xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources_span,
206
4
                                          message.removed_resources());
207
4
  }
208

            
209
383
  {
210
383
    const auto scoped_update = ttl_.scopedTtlUpdate();
211
383
    if (requested_resource_state_.contains(Wildcard)) {
212
284
      for (const auto& resource : message.resources()) {
213
254
        addResourceStateFromServer(resource);
214
254
      }
215
297
    } else {
216
      // We are not subscribed to wildcard, so we only take resources that we explicitly requested
217
      // and ignore the others.
218
      // NOTE: This is not gonna work for xdstp resources with glob resource matching.
219
205
      for (const auto& resource : message.resources()) {
220
204
        if (requested_resource_state_.contains(resource.name())) {
221
167
          addResourceStateFromServer(resource);
222
167
        }
223
204
      }
224
145
    }
225
383
  }
226

            
227
  // If a resource is gone, there is no longer a meaningful version for it that makes sense to
228
  // provide to the server upon stream reconnect: either it will continue to not exist, in which
229
  // case saying nothing is fine, or the server will bring back something new, which we should
230
  // receive regardless (which is the logic that not specifying a version will get you).
231
  //
232
  // So, leave the version map entry present but blank if we are still interested in the resource.
233
  // It will be left out of initial_resource_versions messages, but will remind us to explicitly
234
  // tell the server "I'm cancelling my subscription" when we lose interest. In case of resources
235
  // received as a part of the wildcard subscription or resources we already lost interest in, we
236
  // just drop them.
237
383
  for (const auto& resource_name : message.removed_resources()) {
238
26
    if (auto maybe_resource = getRequestedResourceState(resource_name);
239
26
        maybe_resource.has_value()) {
240
12
      maybe_resource->setAsWaitingForServer();
241
20
    } else if (const auto erased_count = ambiguous_resource_state_.erase(resource_name);
242
14
               erased_count == 0) {
243
10
      wildcard_resource_state_.erase(resource_name);
244
10
    }
245
26
  }
246
383
  ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", typeUrl(),
247
383
            message.resources().size(), message.removed_resources().size());
248
383
}
249

            
250
std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryRequest>
251
974
DeltaSubscriptionState::getNextRequestInternal() {
252
974
  auto request = std::make_unique<envoy::service::discovery::v3::DeltaDiscoveryRequest>();
253
974
  request->set_type_url(typeUrl());
254
974
  if (!any_request_sent_yet_in_current_stream_) {
255
505
    any_request_sent_yet_in_current_stream_ = true;
256
505
    const bool is_legacy_wildcard = isInitialRequestForLegacyWildcard();
257
    // initial_resource_versions "must be populated for first request in a stream".
258
    // Also, since this might be a new server, we must explicitly state *all* of our subscription
259
    // interest.
260
774
    for (auto const& [resource_name, resource_state] : requested_resource_state_) {
261
774
      if (should_send_initial_resource_versions_) {
262
        // Populate initial_resource_versions with the resource versions we currently have.
263
        // Resources we are interested in, but are still waiting to get any version of from the
264
        // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
265
742
        if (!resource_state.isWaitingForServer()) {
266
98
          (*request->mutable_initial_resource_versions())[resource_name] = resource_state.version();
267
98
        }
268
742
      }
269
      // We are going over a list of resources that we are interested in, so add them to
270
      // resource_names_subscribe.
271
774
      names_added_.insert(resource_name);
272
774
    }
273
505
    if (should_send_initial_resource_versions_) {
274
477
      for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
275
83
        (*request->mutable_initial_resource_versions())[resource_name] = resource_version;
276
83
      }
277
477
      for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
278
6
        (*request->mutable_initial_resource_versions())[resource_name] = resource_version;
279
6
      }
280
477
    }
281
    // If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
282
    // empty.
283
505
    if (is_legacy_wildcard) {
284
256
      names_added_.clear();
285
256
    }
286
505
    names_removed_.clear();
287
505
  }
288

            
289
974
  std::copy(names_added_.begin(), names_added_.end(),
290
974
            Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_subscribe()));
291
974
  std::copy(names_removed_.begin(), names_removed_.end(),
292
974
            Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names_unsubscribe()));
293
974
  names_added_.clear();
294
974
  names_removed_.clear();
295

            
296
974
  return request;
297
974
}
298

            
299
505
bool DeltaSubscriptionState::isInitialRequestForLegacyWildcard() {
300
505
  if (in_initial_legacy_wildcard_) {
301
250
    requested_resource_state_.insert_or_assign(Wildcard, ResourceState::waitingForServer());
302
250
    ASSERT(requested_resource_state_.contains(Wildcard));
303
250
    ASSERT(!wildcard_resource_state_.contains(Wildcard));
304
250
    ASSERT(!ambiguous_resource_state_.contains(Wildcard));
305
250
    return true;
306
250
  }
307

            
308
  // If we are here, this means that we lost our initial wildcard mode, because we subscribed to
309
  // something in the past. We could still be in the situation now that all we are subscribed to now
310
  // is wildcard resource, so in such case try to send a legacy wildcard subscription request
311
  // anyway. For this to happen, two conditions need to apply:
312
  //
313
  // 1. No change in interest.
314
  // 2. The only requested resource is Wildcard resource.
315
  //
316
  // The invariant of the code here is that this code is executed only when
317
  // subscriptionUpdatePending actually returns true, which in our case can only happen if the
318
  // requested resources state_ isn't empty.
319
255
  ASSERT(!requested_resource_state_.empty());
320

            
321
  // If our subscription interest didn't change then the first condition for using legacy wildcard
322
  // subscription is met.
323
255
  if (!names_added_.empty() || !names_removed_.empty()) {
324
155
    return false;
325
155
  }
326
  // If we requested only a wildcard resource then the second condition for using legacy wildcard
327
  // condition is met.
328
100
  return requested_resource_state_.size() == 1 &&
329
100
         requested_resource_state_.begin()->first == Wildcard;
330
255
}
331

            
332
void DeltaSubscriptionState::addResourceStateFromServer(
333
421
    const envoy::service::discovery::v3::Resource& resource) {
334
421
  setResourceTtl(resource);
335

            
336
421
  if (auto maybe_resource = getRequestedResourceState(resource.name());
337
421
      maybe_resource.has_value()) {
338
    // It is a resource that we requested.
339
258
    maybe_resource->setVersion(resource.version());
340
258
    ASSERT(requested_resource_state_.contains(resource.name()));
341
258
    ASSERT(!wildcard_resource_state_.contains(resource.name()));
342
258
    ASSERT(!ambiguous_resource_state_.contains(resource.name()));
343
303
  } else {
344
    // It is a resource that is a part of our wildcard request.
345
163
    wildcard_resource_state_.insert_or_assign(resource.name(), resource.version());
346
    // The resource could be ambiguous before, but now the ambiguity
347
    // is resolved.
348
163
    ambiguous_resource_state_.erase(resource.name());
349
163
    ASSERT(!requested_resource_state_.contains(resource.name()));
350
163
    ASSERT(wildcard_resource_state_.contains(resource.name()));
351
163
    ASSERT(!ambiguous_resource_state_.contains(resource.name()));
352
163
  }
353
421
}
354

            
355
void DeltaSubscriptionState::setResourceTtl(
356
421
    const envoy::service::discovery::v3::Resource& resource) {
357
421
  if (resource.has_ttl()) {
358
40
    ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
359
40
             resource.name());
360
381
  } else {
361
381
    ttl_.clear(resource.name());
362
381
  }
363
421
}
364

            
365
4
void DeltaSubscriptionState::ttlExpiryCallback(const std::vector<std::string>& expired) {
366
4
  Protobuf::RepeatedPtrField<std::string> removed_resources;
367
4
  for (const auto& resource : expired) {
368
4
    if (auto maybe_resource = getRequestedResourceState(resource); maybe_resource.has_value()) {
369
2
      maybe_resource->setAsWaitingForServer();
370
2
      removed_resources.Add(std::string(resource));
371
2
    } else if (const auto erased_count = wildcard_resource_state_.erase(resource) +
372
2
                                         ambiguous_resource_state_.erase(resource);
373
2
               erased_count > 0) {
374
2
      removed_resources.Add(std::string(resource));
375
2
    }
376
4
  }
377
4
  callbacks().onConfigUpdate({}, removed_resources, "");
378
4
}
379

            
380
OptRef<DeltaSubscriptionState::ResourceState>
381
451
DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) {
382
451
  auto itr = requested_resource_state_.find(resource_name);
383
451
  if (itr == requested_resource_state_.end()) {
384
179
    return {};
385
179
  }
386
272
  return {itr->second};
387
451
}
388

            
389
OptRef<const DeltaSubscriptionState::ResourceState>
390
367
DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) const {
391
367
  auto itr = requested_resource_state_.find(resource_name);
392
367
  if (itr == requested_resource_state_.end()) {
393
126
    return {};
394
126
  }
395
241
  return {itr->second};
396
367
}
397

            
398
} // namespace XdsMux
399
} // namespace Config
400
} // namespace Envoy