1
#include "source/extensions/config_subscription/grpc/delta_subscription_state.h"
2

            
3
#include "envoy/event/dispatcher.h"
4
#include "envoy/service/discovery/v3/discovery.pb.h"
5

            
6
#include "source/common/common/assert.h"
7
#include "source/common/common/hash.h"
8
#include "source/common/config/utility.h"
9
#include "source/common/runtime/runtime_features.h"
10

            
11
namespace Envoy {
12
namespace Config {
13

            
14
DeltaSubscriptionState::DeltaSubscriptionState(std::string type_url,
15
                                               UntypedConfigUpdateCallbacks& watch_map,
16
                                               Event::Dispatcher& dispatcher,
17
                                               XdsConfigTrackerOptRef xds_config_tracker)
18
    // TODO(snowp): Hard coding VHDS here is temporary until we can move it away from relying on
19
    // empty resources as updates.
20
878
    : supports_heartbeats_(type_url != "envoy.config.route.v3.VirtualHost"),
21
878
      ttl_(
22
878
          [this](const auto& expired) {
23
4
            Protobuf::RepeatedPtrField<std::string> removed_resources;
24
4
            for (const auto& resource : expired) {
25
4
              if (auto maybe_resource = getRequestedResourceState(resource);
26
4
                  maybe_resource.has_value()) {
27
2
                maybe_resource->setAsWaitingForServer();
28
2
                removed_resources.Add(std::string(resource));
29
2
              } else if (const auto erased_count = wildcard_resource_state_.erase(resource) +
30
2
                                                   ambiguous_resource_state_.erase(resource);
31
2
                         erased_count > 0) {
32
2
                removed_resources.Add(std::string(resource));
33
2
              }
34
4
            }
35

            
36
4
            watch_map_.onConfigUpdate({}, removed_resources, "");
37
4
          },
38
878
          dispatcher, dispatcher.timeSource()),
39
878
      type_url_(std::move(type_url)), watch_map_(watch_map),
40
878
      xds_config_tracker_(xds_config_tracker) {}
41

            
42
void DeltaSubscriptionState::updateSubscriptionInterest(
43
    const absl::flat_hash_set<std::string>& cur_added,
44
2106
    const absl::flat_hash_set<std::string>& cur_removed) {
45
2179
  for (const auto& a : cur_added) {
46
819
    if (in_initial_legacy_wildcard_ && a != Wildcard) {
47
503
      in_initial_legacy_wildcard_ = false;
48
503
    }
49
    // If the requested resource existed as a wildcard resource,
50
    // transition it to requested. Otherwise mark it as a resource
51
    // waiting for the server to receive the version.
52
819
    if (auto it = wildcard_resource_state_.find(a); it != wildcard_resource_state_.end()) {
53
4
      requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
54
4
      wildcard_resource_state_.erase(it);
55
815
    } else if (it = ambiguous_resource_state_.find(a); it != ambiguous_resource_state_.end()) {
56
2
      requested_resource_state_.insert_or_assign(a, ResourceState::withVersion(it->second));
57
2
      ambiguous_resource_state_.erase(it);
58
813
    } else {
59
813
      requested_resource_state_.insert_or_assign(a, ResourceState::waitingForServer());
60
813
    }
61
819
    ASSERT(requested_resource_state_.contains(a));
62
819
    ASSERT(!wildcard_resource_state_.contains(a));
63
819
    ASSERT(!ambiguous_resource_state_.contains(a));
64
    // If interest in a resource is removed-then-added (all before a discovery request
65
    // can be sent), we must treat it as a "new" addition: our user may have forgotten its
66
    // copy of the resource after instructing us to remove it, and need to be reminded of it.
67
819
    names_removed_.erase(a);
68
819
    names_added_.insert(a);
69
819
  }
70
2107
  for (const auto& r : cur_removed) {
71
642
    auto actually_erased = false;
72
    // The resource we have lost the interest in could also come from our wildcard subscription. We
73
    // just don't know it at this point. Instead of removing it outright, mark the resource as not
74
    // interesting to us any more and the server will send us an update. If we don't have a wildcard
75
    // subscription then there is no ambiguity and just drop the resource.
76
642
    if (requested_resource_state_.contains(Wildcard)) {
77
106
      if (auto it = requested_resource_state_.find(r); it != requested_resource_state_.end()) {
78
        // Wildcard resources always have a version. If our requested resource has no version, it
79
        // won't be a wildcard resource then. If r is Wildcard itself, then it never has a version
80
        // attached to it, so it will not be moved to ambiguous category.
81
76
        if (!it->second.isWaitingForServer()) {
82
54
          ambiguous_resource_state_.insert_or_assign(it->first, it->second.version());
83
54
        }
84
76
        requested_resource_state_.erase(it);
85
76
        actually_erased = true;
86
76
      }
87
556
    } else {
88
536
      actually_erased = (requested_resource_state_.erase(r) > 0);
89
536
    }
90
642
    ASSERT(!requested_resource_state_.contains(r));
91
    // Ideally, when interest in a resource is added-then-removed in between requests,
92
    // we would avoid putting a superfluous "unsubscribe [resource that was never subscribed]"
93
    // in the request. However, the removed-then-added case *does* need to go in the request,
94
    // and due to how we accomplish that, it's difficult to distinguish remove-add-remove from
95
    // add-remove (because "remove-add" has to be treated as equivalent to just "add").
96
642
    names_added_.erase(r);
97
642
    if (actually_erased) {
98
612
      names_removed_.insert(r);
99
612
      in_initial_legacy_wildcard_ = false;
100
612
    }
101
642
  }
102
  // If we unsubscribe from wildcard resource, drop all the resources that came from wildcard from
103
  // cache. Also drop the ambiguous resources - we aren't interested in those, but we didn't know if
104
  // those came from wildcard subscription or not, but now it's not important any more.
105
2106
  if (cur_removed.contains(Wildcard)) {
106
6
    wildcard_resource_state_.clear();
107
6
    ambiguous_resource_state_.clear();
108
6
  }
109
2106
}
110

            
111
// Not having sent any requests yet counts as an "update pending" since you're supposed to resend
112
// the entirety of your interest at the start of a stream, even if nothing has changed.
113
14350
bool DeltaSubscriptionState::subscriptionUpdatePending() const {
114
14350
  if (!names_added_.empty() || !names_removed_.empty()) {
115
2078
    return true;
116
2078
  }
117
  // At this point, we have no new resources to subscribe to or any
118
  // resources to unsubscribe from.
119
12272
  if (!any_request_sent_yet_in_current_stream_) {
120
    // If we haven't sent anything on the current stream, but we are actually interested in some
121
    // resource then we obviously need to let the server know about those.
122
1261
    if (!requested_resource_state_.empty()) {
123
288
      return true;
124
288
    }
125
    // So there are no new names and we are interested in nothing. This may either mean that we want
126
    // the legacy wildcard subscription to kick in or we actually unsubscribed from everything. If
127
    // the latter is true, then we should not be sending any requests. In such case the initial
128
    // wildcard mode will be false. Otherwise it means that the legacy wildcard request should be
129
    // sent.
130
973
    return in_initial_legacy_wildcard_;
131
1261
  }
132

            
133
  // At this point, we have no changes in subscription resources and this isn't a first request in
134
  // the stream, so even if there are no resources we are interested in, we can send the request,
135
  // because even if it's empty, it won't be interpreted as legacy wildcard subscription, which can
136
  // only for the first request in the stream. So sending an empty request at this point should be
137
  // harmless.
138
11011
  return dynamic_context_changed_;
139
12272
}
140

            
141
618
void DeltaSubscriptionState::markStreamFresh(bool should_send_initial_resource_versions) {
142
618
  any_request_sent_yet_in_current_stream_ = false;
143
618
  should_send_initial_resource_versions_ = should_send_initial_resource_versions;
144
618
}
145

            
146
UpdateAck DeltaSubscriptionState::handleResponse(
147
1610
    envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
148
  // We *always* copy the response's nonce into the next request, even if we're going to make that
149
  // request a NACK by setting error_detail.
150
1610
  UpdateAck ack(message.nonce(), type_url_);
151
1610
  TRY_ASSERT_MAIN_THREAD { handleGoodResponse(message); }
152
1610
  END_TRY
153
1610
  CATCH(const EnvoyException& e, { handleBadResponse(e, ack); });
154
1610
  return ack;
155
1610
}
156

            
157
bool DeltaSubscriptionState::isHeartbeatResponse(
158
3015
    const envoy::service::discovery::v3::Resource& resource) const {
159
3015
  if (!supports_heartbeats_) {
160
8
    return false;
161
8
  }
162
3007
  if (resource.has_resource()) {
163
2640
    return false;
164
2640
  }
165

            
166
367
  if (const auto maybe_resource = getRequestedResourceState(resource.name());
167
367
      maybe_resource.has_value()) {
168
241
    return !maybe_resource->isWaitingForServer() && resource.version() == maybe_resource->version();
169
241
  }
170

            
171
126
  if (const auto itr = wildcard_resource_state_.find(resource.name());
172
126
      itr != wildcard_resource_state_.end()) {
173
12
    return resource.version() == itr->second;
174
12
  }
175

            
176
114
  if (const auto itr = ambiguous_resource_state_.find(resource.name());
177
114
      itr != ambiguous_resource_state_.end()) {
178
    // In theory we should move the ambiguous resource to wildcard, because probably we shouldn't be
179
    // getting heartbeat responses about resources that we are not interested in, but the server
180
    // could have sent this heartbeat before it learned about our lack of interest in the resource.
181
4
    return resource.version() == itr->second;
182
4
  }
183

            
184
110
  return false;
185
114
}
186

            
187
void DeltaSubscriptionState::handleGoodResponse(
188
1610
    envoy::service::discovery::v3::DeltaDiscoveryResponse& message) {
189
1610
  absl::flat_hash_set<std::string> names_added_removed;
190

            
191
1807
  for (const auto& resource : message.resources()) {
192
1531
    if (!names_added_removed.insert(resource.name()).second) {
193
10
      throwEnvoyExceptionOrPanic(
194
10
          fmt::format("duplicate name {} found among added/updated resources", resource.name()));
195
10
    }
196
1521
    if (isHeartbeatResponse(resource)) {
197
14
      continue;
198
14
    }
199
    // DeltaDiscoveryResponses for unresolved aliases don't contain an actual resource
200
1507
    if (!resource.has_resource() && resource.aliases_size() > 0) {
201
2
      continue;
202
2
    }
203
1505
    if (message.type_url() != resource.resource().type_url()) {
204
15
      throwEnvoyExceptionOrPanic(
205
15
          fmt::format("type URL {} embedded in an individual Any does not match "
206
15
                      "the message-wide type URL {} in DeltaDiscoveryResponse {}",
207
15
                      resource.resource().type_url(), message.type_url(), message.DebugString()));
208
15
    }
209
1505
  }
210
1585
  for (const auto& name : message.removed_resources()) {
211
366
    if (!names_added_removed.insert(name).second) {
212
4
      throwEnvoyExceptionOrPanic(
213
4
          fmt::format("duplicate name {} found in the union of added+removed resources", name));
214
4
    }
215
366
  }
216

            
217
  // Reorder the resources in the response, having all the non-heartbeat
218
  // resources at the front of the list. Note that although there's no
219
  // requirement to keep stable ordering, we do so to process the resources in
220
  // the order they were sent.
221
1581
  auto last_non_heartbeat = std::stable_partition(
222
1581
      message.mutable_resources()->begin(), message.mutable_resources()->end(),
223
1770
      [&](const envoy::service::discovery::v3::Resource& resource) {
224
1494
        return !isHeartbeatResponse(resource);
225
1494
      });
226

            
227
1581
  auto non_heartbeat_resources_span = absl::MakeConstSpan(
228
1581
      message.resources().data(), last_non_heartbeat - message.resources().begin());
229
1581
  watch_map_.onConfigUpdate(non_heartbeat_resources_span, message.removed_resources(),
230
1581
                            message.system_version_info());
231

            
232
  // Processing point when resources are successfully ingested.
233
1581
  if (xds_config_tracker_.has_value()) {
234
4
    xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources_span,
235
4
                                          message.removed_resources());
236
4
  }
237

            
238
1581
  {
239
1581
    const auto scoped_update = ttl_.scopedTtlUpdate();
240
1581
    if (requested_resource_state_.contains(Wildcard)) {
241
1150
      for (const auto& resource : message.resources()) {
242
878
        addResourceStateFromServer(resource);
243
878
      }
244
1243
    } else {
245
      // We are not subscribed to wildcard, so we only take resources that we explicitly requested
246
      // and ignore the others.
247
601
      for (const auto& resource : message.resources()) {
248
588
        if (requested_resource_state_.contains(resource.name())) {
249
489
          addResourceStateFromServer(resource);
250
489
        }
251
588
      }
252
515
    }
253
1581
  }
254

            
255
  // If a resource is gone, there is no longer a meaningful version for it that makes sense to
256
  // provide to the server upon stream reconnect: either it will continue to not exist, in which
257
  // case saying nothing is fine, or the server will bring back something new, which we should
258
  // receive regardless (which is the logic that not specifying a version will get you).
259
  //
260
  // So, leave the version map entry present but blank if we are still interested in the resource.
261
  // It will be left out of initial_resource_versions messages, but will remind us to explicitly
262
  // tell the server "I'm cancelling my subscription" when we lose interest. In case of resources
263
  // received as a part of the wildcard subscription or resources we already lost interest in, we
264
  // just drop them.
265
1581
  for (const auto& resource_name : message.removed_resources()) {
266
356
    if (auto maybe_resource = getRequestedResourceState(resource_name);
267
356
        maybe_resource.has_value()) {
268
26
      maybe_resource->setAsWaitingForServer();
269
344
    } else if (const auto erased_count = ambiguous_resource_state_.erase(resource_name);
270
330
               erased_count == 0) {
271
326
      wildcard_resource_state_.erase(resource_name);
272
326
    }
273
356
  }
274
1581
  ENVOY_LOG(debug, "Delta config for {} accepted with {} resources added, {} removed", type_url_,
275
1581
            message.resources().size(), message.removed_resources().size());
276
1581
}
277

            
278
46
void DeltaSubscriptionState::handleBadResponse(const EnvoyException& e, UpdateAck& ack) {
279
  // Note that error_detail being set is what indicates that a DeltaDiscoveryRequest is a NACK.
280
46
  ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
281
46
  ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
282
46
  ENVOY_LOG(warn, "delta config for {} rejected: {}", type_url_, e.what());
283
46
  watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
284
46
}
285

            
286
1010
void DeltaSubscriptionState::handleEstablishmentFailure() {
287
1010
  watch_map_.onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
288
1010
                                  nullptr);
289
1010
}
290

            
291
envoy::service::discovery::v3::DeltaDiscoveryRequest
292
2957
DeltaSubscriptionState::getNextRequestAckless() {
293
2957
  envoy::service::discovery::v3::DeltaDiscoveryRequest request;
294
2957
  if (!any_request_sent_yet_in_current_stream_) {
295
1215
    any_request_sent_yet_in_current_stream_ = true;
296
1215
    const bool is_legacy_wildcard = isInitialRequestForLegacyWildcard();
297
    // initial_resource_versions "must be populated for first request in a stream".
298
    // Also, since this might be a new server, we must explicitly state *all* of our subscription
299
    // interest.
300
1512
    for (auto const& [resource_name, resource_state] : requested_resource_state_) {
301
1512
      if (should_send_initial_resource_versions_) {
302
        // Populate initial_resource_versions with the resource versions we currently have.
303
        // Resources we are interested in, but are still waiting to get any version of from the
304
        // server, do not belong in initial_resource_versions. (But do belong in new subscriptions!)
305
1480
        if (!resource_state.isWaitingForServer()) {
306
165
          (*request.mutable_initial_resource_versions())[resource_name] = resource_state.version();
307
165
        }
308
1480
      }
309
      // We are going over a list of resources that we are interested in, so add them to
310
      // resource_names_subscribe.
311
1512
      names_added_.insert(resource_name);
312
1512
    }
313
1215
    if (should_send_initial_resource_versions_) {
314
1187
      for (auto const& [resource_name, resource_version] : wildcard_resource_state_) {
315
150
        (*request.mutable_initial_resource_versions())[resource_name] = resource_version;
316
150
      }
317
1187
      for (auto const& [resource_name, resource_version] : ambiguous_resource_state_) {
318
6
        (*request.mutable_initial_resource_versions())[resource_name] = resource_version;
319
6
      }
320
1187
    }
321
    // If this is a legacy wildcard request, then make sure that the resource_names_subscribe is
322
    // empty.
323
1215
    if (is_legacy_wildcard) {
324
611
      names_added_.clear();
325
611
    }
326
1215
    names_removed_.clear();
327
1215
  }
328
2957
  std::copy(names_added_.begin(), names_added_.end(),
329
2957
            Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_subscribe()));
330
2957
  std::copy(names_removed_.begin(), names_removed_.end(),
331
2957
            Protobuf::RepeatedFieldBackInserter(request.mutable_resource_names_unsubscribe()));
332
2957
  names_added_.clear();
333
2957
  names_removed_.clear();
334

            
335
2957
  request.set_type_url(type_url_);
336
2957
  return request;
337
2957
}
338

            
339
1215
bool DeltaSubscriptionState::isInitialRequestForLegacyWildcard() {
340
1215
  if (in_initial_legacy_wildcard_) {
341
605
    requested_resource_state_.insert_or_assign(Wildcard, ResourceState::waitingForServer());
342
605
    ASSERT(requested_resource_state_.contains(Wildcard));
343
605
    ASSERT(!wildcard_resource_state_.contains(Wildcard));
344
605
    ASSERT(!ambiguous_resource_state_.contains(Wildcard));
345
605
    return true;
346
605
  }
347

            
348
  // If we are here, this means that we lost our initial wildcard mode, because we subscribed to
349
  // something in the past. We could still be in the situation now that all we are subscribed to now
350
  // is wildcard resource, so in such case try to send a legacy wildcard subscription request
351
  // anyway. For this to happen, two conditions need to apply:
352
  //
353
  // 1. No change in interest.
354
  // 2. The only requested resource is Wildcard resource.
355
  //
356
  // The invariant of the code here is that this code is executed only when
357
  // subscriptionUpdatePending actually returns true, which in our case can only happen if the
358
  // requested resources state_ isn't empty.
359
610
  ASSERT(!requested_resource_state_.empty());
360

            
361
  // If our subscription interest didn't change then the first condition for using legacy wildcard
362
  // subscription is met.
363
610
  if (!names_added_.empty() || !names_removed_.empty()) {
364
440
    return false;
365
440
  }
366
  // If we requested only a wildcard resource then the second condition for using legacy wildcard
367
  // condition is met.
368
170
  return requested_resource_state_.size() == 1 &&
369
170
         requested_resource_state_.begin()->first == Wildcard;
370
610
}
371

            
372
envoy::service::discovery::v3::DeltaDiscoveryRequest
373
1488
DeltaSubscriptionState::getNextRequestWithAck(const UpdateAck& ack) {
374
1488
  envoy::service::discovery::v3::DeltaDiscoveryRequest request = getNextRequestAckless();
375
1488
  request.set_response_nonce(ack.nonce_);
376
1488
  if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
377
    // Don't needlessly make the field present-but-empty if status is ok.
378
32
    request.mutable_error_detail()->CopyFrom(ack.error_detail_);
379
32
  }
380
1488
  return request;
381
1488
}
382

            
383
void DeltaSubscriptionState::addResourceStateFromServer(
384
1367
    const envoy::service::discovery::v3::Resource& resource) {
385
1367
  if (resource.has_ttl()) {
386
40
    ttl_.add(std::chrono::milliseconds(DurationUtil::durationToMilliseconds(resource.ttl())),
387
40
             resource.name());
388
1327
  } else {
389
1327
    ttl_.clear(resource.name());
390
1327
  }
391

            
392
1367
  if (auto maybe_resource = getRequestedResourceState(resource.name());
393
1367
      maybe_resource.has_value()) {
394
    // It is a resource that we requested.
395
580
    maybe_resource->setVersion(resource.version());
396
580
    ASSERT(requested_resource_state_.contains(resource.name()));
397
580
    ASSERT(!wildcard_resource_state_.contains(resource.name()));
398
580
    ASSERT(!ambiguous_resource_state_.contains(resource.name()));
399
1010
  } else {
400
    // It is a resource that is a part of our wildcard request.
401
787
    wildcard_resource_state_.insert_or_assign(resource.name(), resource.version());
402
    // The resource could be ambiguous before, but now the ambiguity
403
    // is resolved.
404
787
    ambiguous_resource_state_.erase(resource.name());
405
787
    ASSERT(!requested_resource_state_.contains(resource.name()));
406
787
    ASSERT(wildcard_resource_state_.contains(resource.name()));
407
787
    ASSERT(!ambiguous_resource_state_.contains(resource.name()));
408
787
  }
409
1367
}
410

            
411
OptRef<DeltaSubscriptionState::ResourceState>
412
1727
DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) {
413
1727
  auto itr = requested_resource_state_.find(resource_name);
414
1727
  if (itr == requested_resource_state_.end()) {
415
1119
    return {};
416
1119
  }
417
608
  return {itr->second};
418
1727
}
419

            
420
OptRef<const DeltaSubscriptionState::ResourceState>
421
367
DeltaSubscriptionState::getRequestedResourceState(absl::string_view resource_name) const {
422
367
  auto itr = requested_resource_state_.find(resource_name);
423
367
  if (itr == requested_resource_state_.end()) {
424
126
    return {};
425
126
  }
426
241
  return {itr->second};
427
367
}
428

            
429
} // namespace Config
430
} // namespace Envoy