1
#include "source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h"
2

            
3
#include "envoy/config/endpoint/v3/endpoint.pb.h"
4
#include "envoy/service/discovery/v3/discovery.pb.h"
5

            
6
#include "source/common/common/assert.h"
7
#include "source/common/common/backoff_strategy.h"
8
#include "source/common/config/decoded_resource_impl.h"
9
#include "source/common/config/utility.h"
10
#include "source/common/config/xds_context_params.h"
11
#include "source/common/config/xds_resource.h"
12
#include "source/common/memory/utils.h"
13
#include "source/common/protobuf/protobuf.h"
14
#include "source/common/protobuf/utility.h"
15
#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
16

            
17
namespace Envoy {
18
namespace Config {
19
namespace XdsMux {
20

            
21
namespace {
22
class AllMuxesState {
23
public:
24
255
  void insert(ShutdownableMux* mux) { muxes_.insert(mux); }
25

            
26
255
  void erase(ShutdownableMux* mux) { muxes_.erase(mux); }
27

            
28
21306
  void shutdownAll() {
29
21308
    for (auto& mux : muxes_) {
30
294
      mux->shutdown();
31
294
    }
32
21306
  }
33

            
34
private:
35
  absl::flat_hash_set<ShutdownableMux*> muxes_;
36
};
37
using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
38
} // namespace
39

            
40
template <class S, class F, class RQ, class RS>
41
GrpcMuxImpl<S, F, RQ, RS>::GrpcMuxImpl(std::unique_ptr<F> subscription_state_factory,
42
                                       GrpcMuxContext& grpc_mux_context)
43
259
    : dispatcher_(grpc_mux_context.dispatcher_),
44
259
      grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_),
45
259
                                          std::move(grpc_mux_context.failover_async_client_),
46
259
                                          grpc_mux_context.service_method_, grpc_mux_context.scope_,
47
259
                                          std::move(grpc_mux_context.backoff_strategy_),
48
259
                                          grpc_mux_context.rate_limit_settings_)),
49
259
      subscription_state_factory_(std::move(subscription_state_factory)),
50
259
      skip_subsequent_node_(grpc_mux_context.skip_subsequent_node_),
51
259
      local_info_(grpc_mux_context.local_info_),
52
      dynamic_update_callback_handle_(
53
259
          grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
54
259
              [this](absl::string_view resource_type_url) {
55
12
                onDynamicContextUpdate(resource_type_url);
56
12
                return absl::OkStatus();
57
12
              })),
58
259
      config_validators_(std::move(grpc_mux_context.config_validators_)),
59
259
      xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
60
259
      xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_),
61
259
      eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)),
62
259
      target_xds_authority_(grpc_mux_context.target_xds_authority_) {
63
259
  THROW_IF_NOT_OK(Config::Utility::checkLocalInfo("ads", grpc_mux_context.local_info_));
64
255
  AllMuxes::get().insert(this);
65
255
}
66

            
67
template <class S, class F, class RQ, class RS>
68
std::unique_ptr<GrpcStreamInterface<RQ, RS>> GrpcMuxImpl<S, F, RQ, RS>::createGrpcStreamObject(
69
    Grpc::RawAsyncClientSharedPtr&& async_client,
70
    Grpc::RawAsyncClientSharedPtr&& failover_async_client,
71
    const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope,
72
269
    BackOffStrategyPtr&& backoff_strategy, const RateLimitSettings& rate_limit_settings) {
73
269
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
74
88
    return std::make_unique<GrpcMuxFailover<RQ, RS>>(
75
        /*primary_stream_creator=*/
76
88
        [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy,
77
88
         &rate_limit_settings](
78
88
            GrpcStreamCallbacks<RS>* callbacks) -> GrpcStreamInterfacePtr<RQ, RS> {
79
88
          return std::make_unique<GrpcStream<RQ, RS>>(
80
88
              callbacks, std::move(async_client), service_method, dispatcher, scope,
81
88
              std::move(backoff_strategy), rate_limit_settings,
82
88
              GrpcStream<RQ, RS>::ConnectedStateValue::FirstEntry);
83
88
        },
84
        /*failover_stream_creator=*/
85
88
        failover_async_client
86
88
            ? absl::make_optional(
87
28
                  [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope,
88
28
                   &rate_limit_settings](
89
28
                      GrpcStreamCallbacks<RS>* callbacks) -> GrpcStreamInterfacePtr<RQ, RS> {
90
28
                    return std::make_unique<GrpcStream<RQ, RS>>(
91
28
                        callbacks, std::move(failover_async_client), service_method, dispatcher,
92
28
                        scope,
93
                        // TODO(adisuissa): the backoff strategy for the failover should
94
                        // be the same as the primary source.
95
28
                        std::make_unique<FixedBackOffStrategy>(
96
28
                            GrpcMuxFailover<RQ, RS>::DefaultFailoverBackoffMilliseconds),
97
28
                        rate_limit_settings, GrpcStream<RQ, RS>::ConnectedStateValue::SecondEntry);
98
28
                  })
99
88
            : absl::nullopt,
100
88
        /*grpc_mux_callbacks=*/*this,
101
88
        /*dispatch=*/dispatcher_);
102
88
  }
103
181
  return std::make_unique<GrpcStream<RQ, RS>>(this, std::move(async_client), service_method,
104
181
                                              dispatcher_, scope, std::move(backoff_strategy),
105
181
                                              rate_limit_settings,
106
181
                                              GrpcStream<RQ, RS>::ConnectedStateValue::FirstEntry);
107
269
}
108

            
109
255
template <class S, class F, class RQ, class RS> GrpcMuxImpl<S, F, RQ, RS>::~GrpcMuxImpl() {
110
255
  AllMuxes::get().erase(this);
111
255
}
112

            
113
21306
template <class S, class F, class RQ, class RS> void GrpcMuxImpl<S, F, RQ, RS>::shutdownAll() {
114
21306
  AllMuxes::get().shutdownAll();
115
21306
}
116

            
117
template <class S, class F, class RQ, class RS>
118
12
void GrpcMuxImpl<S, F, RQ, RS>::onDynamicContextUpdate(absl::string_view resource_type_url) {
119
12
  ENVOY_LOG(debug, "GrpcMuxImpl::onDynamicContextUpdate for {}", resource_type_url);
120
12
  auto sub = subscriptions_.find(resource_type_url);
121
12
  if (sub == subscriptions_.end()) {
122
4
    return;
123
4
  }
124
8
  sub->second->setDynamicContextChanged();
125
8
  trySendDiscoveryRequests();
126
8
}
127

            
128
template <class S, class F, class RQ, class RS>
129
Config::GrpcMuxWatchPtr GrpcMuxImpl<S, F, RQ, RS>::addWatch(
130
    const std::string& type_url, const absl::flat_hash_set<std::string>& resources,
131
    SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
132
449
    const SubscriptionOptions& options) {
133
449
  auto watch_map = watch_maps_.find(type_url);
134
449
  if (watch_map == watch_maps_.end()) {
135
    // Resource cache is only used for EDS resources.
136
362
    EdsResourcesCacheOptRef resources_cache{absl::nullopt};
137
362
    if (eds_resources_cache_ &&
138
362
        (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
139
49
      resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
140
49
    }
141

            
142
    // We don't yet have a subscription for type_url! Make one!
143
362
    watch_map = watch_maps_
144
362
                    .emplace(type_url,
145
362
                             std::make_unique<WatchMap>(options.use_namespace_matching_, type_url,
146
362
                                                        config_validators_.get(), resources_cache))
147
362
                    .first;
148
362
    subscriptions_.emplace(type_url, subscription_state_factory_->makeSubscriptionState(
149
362
                                         type_url, *watch_maps_[type_url], resource_decoder,
150
362
                                         xds_config_tracker_, xds_resources_delegate_,
151
362
                                         target_xds_authority_));
152
362
    subscription_ordering_.emplace_back(type_url);
153
362
  }
154

            
155
449
  Watch* watch = watch_map->second->addWatch(callbacks, *resource_decoder);
156
  // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
157
449
  updateWatch(type_url, watch, resources, options);
158
449
  return std::make_unique<WatchImpl>(type_url, watch, *this, options);
159
449
}
160

            
161
// Updates the list of resource names watched by the given watch. If an added name is new across
162
// the whole subscription, or if a removed name has no other watch interested in it, then the
163
// subscription will enqueue and attempt to send an appropriate discovery request.
164
template <class S, class F, class RQ, class RS>
165
void GrpcMuxImpl<S, F, RQ, RS>::updateWatch(const std::string& type_url, Watch* watch,
166
                                            const absl::flat_hash_set<std::string>& resources,
167
918
                                            const SubscriptionOptions& options) {
168
918
  ENVOY_LOG(debug, "GrpcMuxImpl::updateWatch for {}", type_url);
169
918
  ASSERT(watch != nullptr);
170
918
  auto& sub = subscriptionStateFor(type_url);
171
918
  WatchMap& watch_map = watchMapFor(type_url);
172

            
173
  // We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra
174
  // context parameters.
175
918
  absl::flat_hash_set<std::string> effective_resources;
176
937
  for (const auto& resource : resources) {
177
362
    if (XdsResourceIdentifier::hasXdsTpScheme(resource)) {
178
54
      auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource);
179
54
      THROW_IF_NOT_OK_REF(xdstp_resource_or_error.status());
180
54
      auto xdstp_resource = xdstp_resource_or_error.value();
181
54
      if (options.add_xdstp_node_context_params_) {
182
23
        const auto context = XdsContextParams::encodeResource(
183
23
            local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
184
23
        xdstp_resource.mutable_context()->CopyFrom(context);
185
23
      }
186
54
      XdsResourceIdentifier::EncodeOptions encode_options;
187
54
      encode_options.sort_context_params_ = true;
188
54
      effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options));
189
342
    } else {
190
308
      effective_resources.insert(resource);
191
308
    }
192
362
  }
193

            
194
918
  auto added_removed = watch_map.updateWatchInterest(watch, effective_resources);
195
918
  if (options.use_namespace_matching_) {
196
    // This is to prevent sending out of requests that contain prefixes instead of resource names
197
18
    sub.updateSubscriptionInterest({}, {});
198
900
  } else {
199
900
    sub.updateSubscriptionInterest(added_removed.added_, added_removed.removed_);
200
900
  }
201

            
202
  // Tell the server about our change in interest, if any.
203
918
  if (sub.subscriptionUpdatePending()) {
204
724
    trySendDiscoveryRequests();
205
724
  }
206
918
}
207

            
208
template <class S, class F, class RQ, class RS>
209
449
void GrpcMuxImpl<S, F, RQ, RS>::removeWatch(const std::string& type_url, Watch* watch) {
210
449
  updateWatch(type_url, watch, {}, {});
211
449
  watchMapFor(type_url).removeWatch(watch);
212
449
}
213

            
214
template <class S, class F, class RQ, class RS>
215
19
ScopedResume GrpcMuxImpl<S, F, RQ, RS>::pause(const std::string& type_url) {
216
19
  return pause(std::vector<std::string>{type_url});
217
19
}
218

            
219
template <class S, class F, class RQ, class RS>
220
596
ScopedResume GrpcMuxImpl<S, F, RQ, RS>::pause(const std::vector<std::string> type_urls) {
221
1290
  for (const auto& type_url : type_urls) {
222
1290
    pausable_ack_queue_.pause(type_url);
223
1290
  }
224

            
225
596
  return std::make_unique<Cleanup>([this, type_urls]() {
226
1288
    for (const auto& type_url : type_urls) {
227
1288
      pausable_ack_queue_.resume(type_url);
228
1288
      trySendDiscoveryRequests();
229
1288
    }
230
594
  });
231
596
}
232

            
233
template <class S, class F, class RQ, class RS>
234
absl::Status GrpcMuxImpl<S, F, RQ, RS>::updateMuxSource(
235
    Grpc::RawAsyncClientSharedPtr&& primary_async_client,
236
    Grpc::RawAsyncClientSharedPtr&& failover_async_client, Stats::Scope& scope,
237
    BackOffStrategyPtr&& backoff_strategy,
238
14
    const envoy::config::core::v3::ApiConfigSource& ads_config_source) {
239
  // Process the rate limit settings.
240
14
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
241
14
      Utility::parseRateLimitSettings(ads_config_source);
242
14
  RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
243

            
244
10
  const Protobuf::MethodDescriptor& service_method =
245
10
      *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(methodName());
246

            
247
  // Disconnect from current xDS servers.
248
10
  ENVOY_LOG_MISC(info, "Replacing the xDS gRPC mux source");
249
10
  grpc_stream_->closeStream();
250
10
  grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client),
251
10
                                        std::move(failover_async_client), service_method, scope,
252
10
                                        std::move(backoff_strategy), *rate_limit_settings_or_error);
253
  // No need to update the config_validators_ as they may contain some state
254
  // that needs to be kept across different GrpcMux objects.
255

            
256
  // Update the watch map's config validators.
257
16
  for (auto& [type_url, watch_map] : watch_maps_) {
258
16
    watch_map->setConfigValidators(config_validators_.get());
259
16
  }
260

            
261
  // Start the subscriptions over the grpc_stream.
262
10
  grpc_stream_->establishNewStream();
263

            
264
10
  return absl::OkStatus();
265
14
}
266

            
267
template <class S, class F, class RQ, class RS>
268
2132
void GrpcMuxImpl<S, F, RQ, RS>::sendGrpcMessage(RQ& msg_proto, S& sub_state) {
269
2132
  if (sub_state.dynamicContextChanged() || !anyRequestSentYetInCurrentStream() ||
270
2132
      !skipSubsequentNode()) {
271
483
    msg_proto.mutable_node()->CopyFrom(localInfo().node());
272
483
  }
273
2132
  sendMessage(msg_proto);
274
2132
  setAnyRequestSentYetInCurrentStream(true);
275
2132
  sub_state.clearDynamicContextChanged();
276
2132
}
277

            
278
template <class S, class F, class RQ, class RS>
279
void GrpcMuxImpl<S, F, RQ, RS>::genericHandleResponse(const std::string& type_url,
280
                                                      RS& response_proto,
281
1363
                                                      ControlPlaneStats& control_plane_stats) {
282
1363
  auto sub = subscriptions_.find(type_url);
283
1363
  if (sub == subscriptions_.end()) {
284
2
    ENVOY_LOG(warn,
285
2
              "The server sent an xDS response proto with type_url {}, which we have "
286
2
              "not subscribed to. Ignoring.",
287
2
              type_url);
288
2
    return;
289
2
  }
290

            
291
1361
  if (response_proto.has_control_plane()) {
292
13
    control_plane_stats.identifier_.set(response_proto.control_plane().identifier());
293

            
294
13
    if (response_proto.control_plane().identifier() != sub->second->controlPlaneIdentifier()) {
295
9
      sub->second->setControlPlaneIdentifier(response_proto.control_plane().identifier());
296
9
      ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", response_proto.type_url(),
297
9
                sub->second->controlPlaneIdentifier());
298
9
    }
299
13
  }
300

            
301
1361
  pausable_ack_queue_.push(sub->second->handleResponse(response_proto));
302
1361
  trySendDiscoveryRequests();
303
1361
  Memory::Utils::tryShrinkHeap();
304
1361
}
305

            
306
236
template <class S, class F, class RQ, class RS> void GrpcMuxImpl<S, F, RQ, RS>::start() {
307
236
  ASSERT(!started_);
308
236
  if (started_) {
309
    return;
310
  }
311
236
  started_ = true;
312
236
  ENVOY_LOG(debug, "GrpcMuxImpl now trying to establish a stream");
313
236
  grpc_stream_->establishNewStream();
314
236
}
315

            
316
template <class S, class F, class RQ, class RS>
317
362
void GrpcMuxImpl<S, F, RQ, RS>::handleEstablishedStream() {
318
362
  ENVOY_LOG(debug, "GrpcMuxImpl stream successfully established");
319
420
  for (auto& [type_url, subscription_state] : subscriptions_) {
320
363
    subscription_state->markStreamFresh(should_send_initial_resource_versions_);
321
363
  }
322
362
  setAnyRequestSentYetInCurrentStream(false);
323
362
  maybeUpdateQueueSizeStat(0);
324
362
  pausable_ack_queue_.clear();
325
362
  trySendDiscoveryRequests();
326
362
}
327

            
328
template <class S, class F, class RQ, class RS>
329
void GrpcMuxImpl<S, F, RQ, RS>::handleStreamEstablishmentFailure(
330
266
    bool next_attempt_may_send_initial_resource_version) {
331
266
  ENVOY_LOG(debug, "GrpcMuxImpl stream failed to establish");
332
  // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
333
  // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
334
  // crash, the iteration needs to dance around a little: collect pointers to all
335
  // SubscriptionStates, call on all those pointers we haven't yet called on, repeat if there are
336
  // now more SubscriptionStates.
337
266
  absl::flat_hash_map<std::string, S*> all_subscribed;
338
266
  absl::flat_hash_map<std::string, S*> already_called;
339
266
  do {
340
464
    for (auto& [type_url, subscription_state] : subscriptions_) {
341
464
      all_subscribed[type_url] = subscription_state.get();
342
464
    }
343
464
    for (auto& sub : all_subscribed) {
344
464
      if (already_called.insert(sub).second) { // insert succeeded ==> not already called
345
464
        sub.second->handleEstablishmentFailure();
346
464
      }
347
464
    }
348
266
  } while (all_subscribed.size() != subscriptions_.size());
349
266
  should_send_initial_resource_versions_ = next_attempt_may_send_initial_resource_version;
350
266
}
351

            
352
template <class S, class F, class RQ, class RS>
353
9550
S& GrpcMuxImpl<S, F, RQ, RS>::subscriptionStateFor(const std::string& type_url) {
354
9550
  auto sub = subscriptions_.find(type_url);
355
9550
  RELEASE_ASSERT(sub != subscriptions_.end(),
356
9550
                 fmt::format("Tried to look up SubscriptionState for non-existent subscription {}.",
357
9550
                             type_url));
358
9550
  return *sub->second;
359
9550
}
360

            
361
template <class S, class F, class RQ, class RS>
362
1367
WatchMap& GrpcMuxImpl<S, F, RQ, RS>::watchMapFor(const std::string& type_url) {
363
1367
  auto watch_map = watch_maps_.find(type_url);
364
1367
  RELEASE_ASSERT(
365
1367
      watch_map != watch_maps_.end(),
366
1367
      fmt::format("Tried to look up WatchMap for non-existent subscription {}.", type_url));
367
1367
  return *watch_map->second;
368
1367
}
369

            
370
template <class S, class F, class RQ, class RS>
371
3770
void GrpcMuxImpl<S, F, RQ, RS>::trySendDiscoveryRequests() {
372
3770
  if (shutdown_) {
373
158
    return;
374
158
  }
375

            
376
5744
  while (true) {
377
    // Do any of our subscriptions even want to send a request?
378
5744
    absl::optional<std::string> request_type_if_any = whoWantsToSendDiscoveryRequest();
379
5744
    if (!request_type_if_any.has_value()) {
380
3419
      break;
381
3419
    }
382
    // If so, which one (by type_url)?
383
2325
    std::string next_request_type_url = request_type_if_any.value();
384
2325
    auto& sub = subscriptionStateFor(next_request_type_url);
385
2325
    ENVOY_LOG(debug, "GrpcMuxImpl wants to send discovery request for {}", next_request_type_url);
386
    // Try again later if paused/rate limited/stream down.
387
2325
    if (!canSendDiscoveryRequest(next_request_type_url)) {
388
193
      break;
389
193
    }
390
2132
    std::unique_ptr<RQ> request;
391
    // Get our subscription state to generate the appropriate discovery request, and send.
392
2132
    if (!pausable_ack_queue_.empty()) {
393
      // Because ACKs take precedence over plain requests, if there is anything in the queue, it's
394
      // safe to assume it's of the type_url that we're wanting to send.
395
      //
396
      // getNextRequestWithAck() returns a raw unowned pointer, which sendGrpcMessage deletes.
397
1331
      request = sub.getNextRequestWithAck(pausable_ack_queue_.popFront());
398
1331
      ENVOY_LOG(debug, "GrpcMuxImpl sent ACK discovery request for {}", next_request_type_url);
399
1627
    } else {
400
      // Returns a raw unowned pointer, which sendGrpcMessage deletes.
401
801
      request = sub.getNextRequestAckless();
402
801
      ENVOY_LOG(debug, "GrpcMuxImpl sent non-ACK discovery request for {}", next_request_type_url);
403
801
    }
404
2132
    ENVOY_LOG(debug, "GrpcMuxImpl skip_subsequent_node: {}", skipSubsequentNode());
405
2132
    sendGrpcMessage(*request, sub);
406
2132
  }
407
3612
  maybeUpdateQueueSizeStat(pausable_ack_queue_.size());
408
3612
}
409

            
410
// Checks whether external conditions allow sending a discovery request. (Does not check
411
// whether we *want* to send a discovery request).
412
template <class S, class F, class RQ, class RS>
413
2325
bool GrpcMuxImpl<S, F, RQ, RS>::canSendDiscoveryRequest(const std::string& type_url) {
414
2325
  RELEASE_ASSERT(
415
2325
      !pausable_ack_queue_.paused(type_url),
416
2325
      fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is "
417
2325
                  "supposed to be filtered out by whoWantsToSendDiscoveryRequest(). ",
418
2325
                  type_url));
419

            
420
2325
  if (!grpcStreamAvailable()) {
421
149
    ENVOY_LOG(trace, "No stream available to send a discovery request for {}.", type_url);
422
149
    return false;
423
2178
  } else if (!rateLimitAllowsDrain()) {
424
44
    ENVOY_LOG(trace, "{} discovery request hit rate limit; will try later.", type_url);
425
44
    return false;
426
44
  }
427
2132
  return true;
428
2325
}
429

            
430
// Checks whether we have something to say in a discovery request, which can be an ACK and/or
431
// a subscription update. (Does not check whether we *can* send that discovery request).
432
// Returns the type_url we should send the discovery request for (if any).
433
// First, prioritizes ACKs over non-ACK subscription interest updates.
434
// Then, prioritizes non-ACK updates in the order the various types
435
// of subscriptions were activated.
436
template <class S, class F, class RQ, class RS>
437
5744
absl::optional<std::string> GrpcMuxImpl<S, F, RQ, RS>::whoWantsToSendDiscoveryRequest() {
438
  // All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose
439
  // type_url from pausable_ack_queue_ if possible, before looking at pending updates.
440
5744
  if (!pausable_ack_queue_.empty()) {
441
1375
    return pausable_ack_queue_.front().type_url_;
442
1375
  }
443
  // If we're looking to send multiple non-ACK requests, send them in the order that their
444
  // subscriptions were initiated.
445
6282
  for (const auto& sub_type : subscription_ordering_) {
446
6282
    auto& sub = subscriptionStateFor(sub_type);
447
6282
    if (sub.subscriptionUpdatePending() && !pausable_ack_queue_.paused(sub_type)) {
448
950
      return sub_type;
449
950
    }
450
6282
  }
451
3419
  return absl::nullopt;
452
4369
}
453

            
454
template class GrpcMuxImpl<DeltaSubscriptionState, DeltaSubscriptionStateFactory,
455
                           envoy::service::discovery::v3::DeltaDiscoveryRequest,
456
                           envoy::service::discovery::v3::DeltaDiscoveryResponse>;
457
template class GrpcMuxImpl<SotwSubscriptionState, SotwSubscriptionStateFactory,
458
                           envoy::service::discovery::v3::DiscoveryRequest,
459
                           envoy::service::discovery::v3::DiscoveryResponse>;
460

            
461
// Delta- and SotW-specific concrete subclasses:
462
GrpcMuxDelta::GrpcMuxDelta(GrpcMuxContext& grpc_mux_context)
463
149
    : GrpcMuxImpl(std::make_unique<DeltaSubscriptionStateFactory>(grpc_mux_context.dispatcher_),
464
149
                  grpc_mux_context) {}
465

            
466
// GrpcStreamCallbacks for GrpcMuxDelta
467
void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url,
468
25
                                         const absl::flat_hash_set<std::string>& for_update) {
469
25
  auto& sub = subscriptionStateFor(type_url);
470
25
  sub.updateSubscriptionInterest(for_update, {});
471
  // Tell the server about our change in interest, if any.
472
25
  if (sub.subscriptionUpdatePending()) {
473
25
    trySendDiscoveryRequests();
474
25
  }
475
25
}
476

            
477
GrpcMuxSotw::GrpcMuxSotw(GrpcMuxContext& grpc_mux_context)
478
110
    : GrpcMuxImpl(std::make_unique<SotwSubscriptionStateFactory>(grpc_mux_context.dispatcher_),
479
110
                  grpc_mux_context) {}
480

            
481
Config::GrpcMuxWatchPtr NullGrpcMuxImpl::addWatch(const std::string&,
482
                                                  const absl::flat_hash_set<std::string>&,
483
                                                  SubscriptionCallbacks&,
484
                                                  OpaqueResourceDecoderSharedPtr,
485
1
                                                  const SubscriptionOptions&) {
486
1
  throwEnvoyExceptionOrPanic("ADS must be configured to support an ADS config source");
487
1
}
488

            
489
class DeltaGrpcMuxFactory : public MuxFactory {
490
public:
491
492
  std::string name() const override { return "envoy.config_mux.delta_grpc_mux_factory"; }
492
10652
  void shutdownAll() override { return GrpcMuxDelta::shutdownAll(); }
493
  std::shared_ptr<GrpcMux>
494
  create(Grpc::RawAsyncClientSharedPtr&& async_client,
495
         Grpc::RawAsyncClientSharedPtr&& failover_async_client, Event::Dispatcher& dispatcher,
496
         Random::RandomGenerator&, Stats::Scope& scope,
497
         const envoy::config::core::v3::ApiConfigSource& ads_config,
498
         const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
499
         BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
500
         XdsResourcesDelegateOptRef,
501
         std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> load_stats_reporter_factory)
502
87
      override {
503
87
    absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
504
87
        Utility::parseRateLimitSettings(ads_config);
505
87
    THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
506
86
    GrpcMuxContext grpc_mux_context{
507
86
        /*async_client_=*/std::move(async_client),
508
86
        /*failover_async_client=*/std::move(failover_async_client),
509
86
        /*dispatcher_=*/dispatcher,
510
        /*service_method_=*/
511
86
        *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
512
86
            "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"),
513
86
        /*local_info_=*/local_info,
514
86
        /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
515
86
        /*scope_=*/scope,
516
86
        /*config_validators_=*/std::move(config_validators),
517
86
        /*xds_resources_delegate_=*/absl::nullopt,
518
86
        /*xds_config_tracker_=*/xds_config_tracker,
519
86
        /*backoff_strategy_=*/std::move(backoff_strategy),
520
86
        /*target_xds_authority_=*/"",
521
86
        /*eds_resources_cache_=*/std::make_unique<EdsResourcesCacheImpl>(dispatcher),
522
86
        /*skip_subsequent_node_=*/ads_config.set_node_on_first_message_only(),
523
86
        /*load_stats_reporter_factory_=*/load_stats_reporter_factory};
524
86
    return std::make_shared<GrpcMuxDelta>(grpc_mux_context);
525
87
  }
526
};
527

            
528
class SotwGrpcMuxFactory : public MuxFactory {
529
public:
530
492
  std::string name() const override { return "envoy.config_mux.sotw_grpc_mux_factory"; }
531
10652
  void shutdownAll() override { return GrpcMuxSotw::shutdownAll(); }
532
  std::shared_ptr<GrpcMux>
533
  create(Grpc::RawAsyncClientSharedPtr&& async_client,
534
         Grpc::RawAsyncClientSharedPtr&& failover_async_client, Event::Dispatcher& dispatcher,
535
         Random::RandomGenerator&, Stats::Scope& scope,
536
         const envoy::config::core::v3::ApiConfigSource& ads_config,
537
         const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
538
         BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
539
         XdsResourcesDelegateOptRef,
540
         std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> load_stats_reporter_factory)
541
18
      override {
542
18
    absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
543
18
        Utility::parseRateLimitSettings(ads_config);
544
18
    THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
545
17
    GrpcMuxContext grpc_mux_context{
546
17
        /*async_client_=*/std::move(async_client),
547
17
        /*failover_async_client_=*/std::move(failover_async_client),
548
17
        /*dispatcher_=*/dispatcher,
549
        /*service_method_=*/
550
17
        *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
551
17
            "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
552
17
        /*local_info_=*/local_info,
553
17
        /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
554
17
        /*scope_=*/scope,
555
17
        /*config_validators_=*/std::move(config_validators),
556
17
        /*xds_resources_delegate_=*/absl::nullopt,
557
17
        /*xds_config_tracker_=*/xds_config_tracker,
558
17
        /*backoff_strategy_=*/std::move(backoff_strategy),
559
17
        /*target_xds_authority_=*/"",
560
17
        /*eds_resources_cache_=*/std::make_unique<EdsResourcesCacheImpl>(dispatcher),
561
17
        /*skip_subsequent_node_=*/ads_config.set_node_on_first_message_only(),
562
17
        /*load_stats_reporter_factory_=*/load_stats_reporter_factory};
563
17
    return std::make_shared<GrpcMuxSotw>(grpc_mux_context);
564
18
  }
565
};
566

            
567
REGISTER_FACTORY(DeltaGrpcMuxFactory, MuxFactory);
568
REGISTER_FACTORY(SotwGrpcMuxFactory, MuxFactory);
569

            
570
} // namespace XdsMux
571
} // namespace Config
572
} // namespace Envoy