1
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
2

            
3
#include "envoy/service/discovery/v3/discovery.pb.h"
4

            
5
#include "source/common/common/assert.h"
6
#include "source/common/common/backoff_strategy.h"
7
#include "source/common/common/token_bucket_impl.h"
8
#include "source/common/config/utility.h"
9
#include "source/common/config/xds_context_params.h"
10
#include "source/common/config/xds_resource.h"
11
#include "source/common/memory/utils.h"
12
#include "source/common/protobuf/protobuf.h"
13
#include "source/common/protobuf/utility.h"
14
#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
15

            
16
namespace Envoy {
17
namespace Config {
18

            
19
namespace {
20
class AllMuxesState {
21
public:
22
516
  void insert(NewGrpcMuxImpl* mux) { muxes_.insert(mux); }
23

            
24
516
  void erase(NewGrpcMuxImpl* mux) { muxes_.erase(mux); }
25

            
26
10652
  void shutdownAll() {
27
10685
    for (auto& mux : muxes_) {
28
467
      mux->shutdown();
29
467
    }
30
10652
  }
31

            
32
private:
33
  absl::flat_hash_set<NewGrpcMuxImpl*> muxes_;
34
};
35
using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
36
} // namespace
37

            
38
NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context)
39
516
    : dispatcher_(grpc_mux_context.dispatcher_),
40
516
      grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_),
41
516
                                          std::move(grpc_mux_context.failover_async_client_),
42
516
                                          grpc_mux_context.service_method_, grpc_mux_context.scope_,
43
516
                                          std::move(grpc_mux_context.backoff_strategy_),
44
516
                                          grpc_mux_context.rate_limit_settings_)),
45
516
      local_info_(grpc_mux_context.local_info_),
46
516
      config_validators_(std::move(grpc_mux_context.config_validators_)),
47
      dynamic_update_callback_handle_(
48
516
          grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
49
516
              [this](absl::string_view resource_type_url) {
50
14
                onDynamicContextUpdate(resource_type_url);
51
14
                return absl::OkStatus();
52
14
              })),
53
516
      xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
54
516
      skip_subsequent_node_(grpc_mux_context.skip_subsequent_node_ &&
55
516
                            Runtime::runtimeFeatureEnabled(
56
278
                                "envoy.reloadable_features.xds_legacy_delta_skip_subsequent_node")),
57
516
      eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)) {
58
516
  AllMuxes::get().insert(this);
59
516
}
60

            
61
std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DeltaDiscoveryRequest,
62
                                    envoy::service::discovery::v3::DeltaDiscoveryResponse>>
63
NewGrpcMuxImpl::createGrpcStreamObject(Grpc::RawAsyncClientSharedPtr&& async_client,
64
                                       Grpc::RawAsyncClientSharedPtr&& failover_async_client,
65
                                       const Protobuf::MethodDescriptor& service_method,
66
                                       Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
67
524
                                       const RateLimitSettings& rate_limit_settings) {
68
524
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
69
39
    return std::make_unique<GrpcMuxFailover<envoy::service::discovery::v3::DeltaDiscoveryRequest,
70
39
                                            envoy::service::discovery::v3::DeltaDiscoveryResponse>>(
71
        /*primary_stream_creator=*/
72
39
        [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy,
73
39
         &rate_limit_settings](
74
39
            GrpcStreamCallbacks<envoy::service::discovery::v3::DeltaDiscoveryResponse>* callbacks)
75
39
            -> GrpcStreamInterfacePtr<envoy::service::discovery::v3::DeltaDiscoveryRequest,
76
39
                                      envoy::service::discovery::v3::DeltaDiscoveryResponse> {
77
39
          return std::make_unique<
78
39
              GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
79
39
                         envoy::service::discovery::v3::DeltaDiscoveryResponse>>(
80
39
              callbacks, std::move(async_client), service_method, dispatcher, scope,
81
39
              std::move(backoff_strategy), rate_limit_settings,
82
39
              GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
83
39
                         envoy::service::discovery::v3::DeltaDiscoveryResponse>::
84
39
                  ConnectedStateValue::FirstEntry);
85
39
        },
86
        /*failover_stream_creator=*/
87
39
        failover_async_client
88
39
            ? absl::make_optional(
89
16
                  [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope,
90
16
                   &rate_limit_settings](
91
16
                      GrpcStreamCallbacks<envoy::service::discovery::v3::DeltaDiscoveryResponse>*
92
16
                          callbacks)
93
16
                      -> GrpcStreamInterfacePtr<
94
16
                          envoy::service::discovery::v3::DeltaDiscoveryRequest,
95
16
                          envoy::service::discovery::v3::DeltaDiscoveryResponse> {
96
16
                    return std::make_unique<
97
16
                        GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
98
16
                                   envoy::service::discovery::v3::DeltaDiscoveryResponse>>(
99
16
                        callbacks, std::move(failover_async_client), service_method, dispatcher,
100
16
                        scope,
101
                        // TODO(adisuissa): the backoff strategy for the failover should
102
                        // be the same as the primary source.
103
16
                        std::make_unique<FixedBackOffStrategy>(
104
16
                            GrpcMuxFailover<envoy::service::discovery::v3::DeltaDiscoveryRequest,
105
16
                                            envoy::service::discovery::v3::DeltaDiscoveryResponse>::
106
16
                                DefaultFailoverBackoffMilliseconds),
107
16
                        rate_limit_settings,
108
16
                        GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
109
16
                                   envoy::service::discovery::v3::DeltaDiscoveryResponse>::
110
16
                            ConnectedStateValue::SecondEntry);
111
16
                  })
112
39
            : absl::nullopt,
113
39
        /*grpc_mux_callbacks=*/*this,
114
39
        /*dispatch=*/dispatcher_);
115
39
  }
116
485
  return std::make_unique<GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
117
485
                                     envoy::service::discovery::v3::DeltaDiscoveryResponse>>(
118
485
      this, std::move(async_client), service_method, dispatcher_, scope,
119
485
      std::move(backoff_strategy), rate_limit_settings,
120
485
      GrpcStream<
121
485
          envoy::service::discovery::v3::DeltaDiscoveryRequest,
122
485
          envoy::service::discovery::v3::DeltaDiscoveryResponse>::ConnectedStateValue::FirstEntry);
123
524
}
124

            
125
516
NewGrpcMuxImpl::~NewGrpcMuxImpl() { AllMuxes::get().erase(this); }
126

            
127
10652
void NewGrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
128

            
129
14
void NewGrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
130
14
  auto sub = subscriptions_.find(resource_type_url);
131
14
  if (sub == subscriptions_.end()) {
132
2
    return;
133
2
  }
134
12
  sub->second->sub_state_.setDynamicContextChanged();
135
12
  trySendDiscoveryRequests();
136
12
}
137

            
138
7
ScopedResume NewGrpcMuxImpl::pause(const std::string& type_url) {
139
7
  return pause(std::vector<std::string>{type_url});
140
7
}
141

            
142
1227
ScopedResume NewGrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
143
2589
  for (const auto& type_url : type_urls) {
144
2589
    pausable_ack_queue_.pause(type_url);
145
2589
  }
146

            
147
1227
  return std::make_unique<Cleanup>([this, type_urls]() {
148
2589
    for (const auto& type_url : type_urls) {
149
2589
      pausable_ack_queue_.resume(type_url);
150
2589
      if (!pausable_ack_queue_.paused(type_url)) {
151
2548
        trySendDiscoveryRequests();
152
2548
      }
153
2589
    }
154
1227
  });
155
1227
}
156

            
157
void NewGrpcMuxImpl::onDiscoveryResponse(
158
    std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message,
159
1500
    ControlPlaneStats& control_plane_stats) {
160
1500
  ENVOY_LOG(debug, "Received DeltaDiscoveryResponse for {} at version {}", message->type_url(),
161
1500
            message->system_version_info());
162

            
163
1500
  auto sub = subscriptions_.find(message->type_url());
164
1500
  if (sub == subscriptions_.end()) {
165
    ENVOY_LOG(warn,
166
              "Dropping received DeltaDiscoveryResponse (with version {}) for non-existent "
167
              "subscription {}.",
168
              message->system_version_info(), message->type_url());
169
    return;
170
  }
171

            
172
1500
  if (message->has_control_plane()) {
173
2
    control_plane_stats.identifier_.set(message->control_plane().identifier());
174

            
175
2
    if (message->control_plane().identifier() != sub->second->control_plane_identifier_) {
176
2
      sub->second->control_plane_identifier_ = message->control_plane().identifier();
177
2
      ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", message->type_url(),
178
2
                sub->second->control_plane_identifier_);
179
2
    }
180
2
  }
181

            
182
1500
  auto ack = sub->second->sub_state_.handleResponse(*message);
183

            
184
  // Processing point to record error if there is any failure after the response is processed.
185
1500
  if (xds_config_tracker_.has_value() &&
186
1500
      ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
187
4
    xds_config_tracker_->onConfigRejected(*message, ack.error_detail_.message());
188
4
  }
189
1500
  kickOffAck(ack);
190
1500
  Memory::Utils::tryShrinkHeap();
191
1500
}
192

            
193
647
void NewGrpcMuxImpl::onStreamEstablished() {
194
675
  for (auto& [type_url, subscription] : subscriptions_) {
195
544
    UNREFERENCED_PARAMETER(type_url);
196
544
    subscription->sub_state_.markStreamFresh(should_send_initial_resource_versions_);
197
544
  }
198
647
  first_request_on_stream_ = true;
199
647
  pausable_ack_queue_.clear();
200
647
  trySendDiscoveryRequests();
201
647
}
202

            
203
608
void NewGrpcMuxImpl::onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) {
204
  // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
205
  // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
206
  // crash, the iteration needs to dance around a little: collect pointers to all
207
  // SubscriptionStates, call on all those pointers we haven't yet called on, repeat if there are
208
  // now more SubscriptionStates.
209
608
  absl::flat_hash_map<std::string, DeltaSubscriptionState*> all_subscribed;
210
608
  absl::flat_hash_map<std::string, DeltaSubscriptionState*> already_called;
211
608
  do {
212
1021
    for (auto& [type_url, subscription] : subscriptions_) {
213
1010
      all_subscribed[type_url] = &subscription->sub_state_;
214
1010
    }
215
1021
    for (auto& sub : all_subscribed) {
216
1010
      if (already_called.insert(sub).second) { // insert succeeded ==> not already called
217
1010
        sub.second->handleEstablishmentFailure();
218
1010
      }
219
1010
    }
220
608
  } while (all_subscribed.size() != subscriptions_.size());
221
608
  should_send_initial_resource_versions_ = next_attempt_may_send_initial_resource_version;
222
608
}
223

            
224
void NewGrpcMuxImpl::onWriteable() { trySendDiscoveryRequests(); }
225

            
226
1500
void NewGrpcMuxImpl::kickOffAck(UpdateAck ack) {
227
1500
  pausable_ack_queue_.push(std::move(ack));
228
1500
  trySendDiscoveryRequests();
229
1500
}
230

            
231
// TODO(fredlas) to be removed from the GrpcMux interface very soon.
232
494
void NewGrpcMuxImpl::start() {
233
494
  ASSERT(!started_);
234
494
  if (started_) {
235
    return;
236
  }
237
494
  started_ = true;
238
494
  grpc_stream_->establishNewStream();
239
494
}
240

            
241
GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
242
                                         const absl::flat_hash_set<std::string>& resources,
243
                                         SubscriptionCallbacks& callbacks,
244
                                         OpaqueResourceDecoderSharedPtr resource_decoder,
245
942
                                         const SubscriptionOptions& options) {
246
942
  auto entry = subscriptions_.find(type_url);
247
942
  if (entry == subscriptions_.end()) {
248
    // We don't yet have a subscription for type_url! Make one!
249
796
    entry = addSubscription(type_url, options.use_namespace_matching_);
250
796
  }
251

            
252
942
  Watch* watch = entry->second->watch_map_.addWatch(callbacks, *resource_decoder);
253
  // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
254
942
  updateWatch(type_url, watch, resources, options);
255
942
  return std::make_unique<WatchImpl>(type_url, watch, *this, options);
256
942
}
257

            
258
absl::Status
259
NewGrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientSharedPtr&& primary_async_client,
260
                                Grpc::RawAsyncClientSharedPtr&& failover_async_client,
261
                                Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
262
10
                                const envoy::config::core::v3::ApiConfigSource& ads_config_source) {
263
  // Process the rate limit settings.
264
10
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
265
10
      Utility::parseRateLimitSettings(ads_config_source);
266
10
  RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
267

            
268
8
  const Protobuf::MethodDescriptor& service_method =
269
8
      *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
270
8
          "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources");
271

            
272
  // Disconnect from current xDS servers.
273
8
  ENVOY_LOG_MISC(info, "Replacing the xDS gRPC mux source");
274
8
  grpc_stream_->closeStream();
275
8
  grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client),
276
8
                                        std::move(failover_async_client), service_method, scope,
277
8
                                        std::move(backoff_strategy), *rate_limit_settings_or_error);
278
  // No need to update the config_validators_ as they may contain some state
279
  // that needs to be kept across different GrpcMux objects.
280

            
281
  // Update the watch map's config validators.
282
16
  for (auto& [type_url, subscription] : subscriptions_) {
283
16
    subscription->watch_map_.setConfigValidators(config_validators_.get());
284
16
  }
285

            
286
  // Start the subscriptions over the grpc_stream.
287
8
  grpc_stream_->establishNewStream();
288

            
289
8
  return absl::OkStatus();
290
10
}
291

            
292
// Updates the list of resource names watched by the given watch. If an added name is new across
293
// the whole subscription, or if a removed name has no other watch interested in it, then the
294
// subscription will enqueue and attempt to send an appropriate discovery request.
295
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
296
                                 const absl::flat_hash_set<std::string>& resources,
297
1899
                                 const SubscriptionOptions& options) {
298
1899
  ASSERT(watch != nullptr);
299
1899
  auto sub = subscriptions_.find(type_url);
300
1899
  RELEASE_ASSERT(sub != subscriptions_.end(),
301
1899
                 fmt::format("Watch of {} has no subscription to update.", type_url));
302
  // We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra
303
  // context parameters.
304
1899
  absl::flat_hash_set<std::string> effective_resources;
305
1918
  for (const auto& resource : resources) {
306
604
    if (XdsResourceIdentifier::hasXdsTpScheme(resource)) {
307
97
      auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource);
308
97
      THROW_IF_NOT_OK_REF(xdstp_resource_or_error.status());
309
97
      auto xdstp_resource = xdstp_resource_or_error.value();
310
97
      if (options.add_xdstp_node_context_params_) {
311
49
        const auto context = XdsContextParams::encodeResource(
312
49
            local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
313
49
        xdstp_resource.mutable_context()->CopyFrom(context);
314
49
      }
315
97
      XdsResourceIdentifier::EncodeOptions encode_options;
316
97
      encode_options.sort_context_params_ = true;
317
97
      effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options));
318
529
    } else {
319
507
      effective_resources.insert(resource);
320
507
    }
321
604
  }
322
1899
  auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, effective_resources);
323
1899
  if (options.use_namespace_matching_) {
324
    // This is to prevent sending out of requests that contain prefixes instead of resource names
325
32
    sub->second->sub_state_.updateSubscriptionInterest({}, {});
326
1867
  } else {
327
1867
    sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_,
328
1867
                                                       added_removed.removed_);
329
1867
  }
330
  // Tell the server about our change in interest, if any.
331
1899
  if (sub->second->sub_state_.subscriptionUpdatePending()) {
332
1442
    trySendDiscoveryRequests();
333
1442
  }
334
1899
}
335

            
336
void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
337
31
                                           const absl::flat_hash_set<std::string>& for_update) {
338
31
  auto sub = subscriptions_.find(type_url);
339
31
  RELEASE_ASSERT(sub != subscriptions_.end(),
340
31
                 fmt::format("Watch of {} has no subscription to update.", type_url));
341
31
  sub->second->sub_state_.updateSubscriptionInterest(for_update, {});
342
  // Tell the server about our change in interest, if any.
343
31
  if (sub->second->sub_state_.subscriptionUpdatePending()) {
344
31
    trySendDiscoveryRequests();
345
31
  }
346
31
}
347

            
348
942
void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
349
942
  updateWatch(type_url, watch, {}, {});
350
942
  auto entry = subscriptions_.find(type_url);
351
942
  ASSERT(entry != subscriptions_.end(),
352
942
         fmt::format("removeWatch() called for non-existent subscription {}.", type_url));
353
942
  entry->second->watch_map_.removeWatch(watch);
354
942
}
355

            
356
NewGrpcMuxImpl::SubscriptionsMap::iterator
357
796
NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use_namespace_matching) {
358
  // Resource cache is only used for EDS resources.
359
796
  EdsResourcesCacheOptRef resources_cache{absl::nullopt};
360
796
  if (eds_resources_cache_ &&
361
796
      (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
362
120
    resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
363
120
  }
364
796
  auto [it, success] = subscriptions_.emplace(
365
796
      type_url, std::make_unique<SubscriptionStuff>(type_url, use_namespace_matching, dispatcher_,
366
796
                                                    config_validators_.get(), xds_config_tracker_,
367
796
                                                    resources_cache));
368
  // Insertion must succeed, as the addSubscription method is only called if
369
  // the map doesn't have the type_url.
370
796
  ASSERT(success);
371
796
  subscription_ordering_.emplace_back(type_url);
372
796
  return it;
373
796
}
374

            
375
6180
void NewGrpcMuxImpl::trySendDiscoveryRequests() {
376
6180
  if (shutdown_) {
377
442
    return;
378
442
  }
379

            
380
8487
  while (true) {
381
    // Do any of our subscriptions even want to send a request?
382
8487
    absl::optional<std::string> maybe_request_type = whoWantsToSendDiscoveryRequest();
383
8487
    if (!maybe_request_type.has_value()) {
384
5459
      break;
385
5459
    }
386
    // If so, which one (by type_url)?
387
3028
    std::string next_request_type_url = maybe_request_type.value();
388
    // If we don't have a subscription object for this request's type_url, drop the request.
389
3028
    auto sub = subscriptions_.find(next_request_type_url);
390
3028
    RELEASE_ASSERT(sub != subscriptions_.end(),
391
3028
                   fmt::format("Tried to send discovery request for non-existent subscription {}.",
392
3028
                               next_request_type_url));
393

            
394
    // Try again later if paused/rate limited/stream down.
395
3028
    if (!canSendDiscoveryRequest(next_request_type_url)) {
396
279
      break;
397
279
    }
398
2749
    envoy::service::discovery::v3::DeltaDiscoveryRequest request;
399
    // Get our subscription state to generate the appropriate DeltaDiscoveryRequest, and send.
400
2749
    if (!pausable_ack_queue_.empty()) {
401
      // Because ACKs take precedence over plain requests, if there is anything in the queue, it's
402
      // safe to assume it's of the type_url that we're wanting to send.
403
1488
      request = sub->second->sub_state_.getNextRequestWithAck(pausable_ack_queue_.popFront());
404
1786
    } else {
405
1261
      request = sub->second->sub_state_.getNextRequestAckless();
406
1261
    }
407
2749
    const bool set_node = sub->second->sub_state_.dynamicContextChanged() ||
408
2749
                          !skip_subsequent_node_ || first_request_on_stream_;
409
2749
    if (set_node) {
410
966
      first_request_on_stream_ = false;
411
966
      *request.mutable_node() = local_info_.node();
412
966
    }
413
2749
    sub->second->sub_state_.clearDynamicContextChanged();
414
2749
    grpc_stream_->sendMessage(request);
415
2749
  }
416
5738
  grpc_stream_->maybeUpdateQueueSizeStat(pausable_ack_queue_.size());
417
5738
}
418

            
419
// Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
420
// whether we *want* to send a DeltaDiscoveryRequest).
421
3028
bool NewGrpcMuxImpl::canSendDiscoveryRequest(const std::string& type_url) {
422
3028
  RELEASE_ASSERT(
423
3028
      !pausable_ack_queue_.paused(type_url),
424
3028
      fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is "
425
3028
                  "supposed to be filtered out by whoWantsToSendDiscoveryRequest(). ",
426
3028
                  type_url));
427

            
428
3028
  if (!grpc_stream_->grpcStreamAvailable()) {
429
279
    ENVOY_LOG(trace, "No stream available to send a discovery request for {}.", type_url);
430
279
    return false;
431
2753
  } else if (!grpc_stream_->checkRateLimitAllowsDrain()) {
432
    ENVOY_LOG(trace, "{} discovery request hit rate limit; will try later.", type_url);
433
    return false;
434
  }
435
2749
  return true;
436
3028
}
437

            
438
// Checks whether we have something to say in a DeltaDiscoveryRequest, which can be an ACK and/or
439
// a subscription update. (Does not check whether we *can* send that DeltaDiscoveryRequest).
440
// Returns the type_url we should send the DeltaDiscoveryRequest for (if any).
441
// First, prioritizes ACKs over non-ACK subscription interest updates.
442
// Then, prioritizes non-ACK updates in the order the various types
443
// of subscriptions were activated.
444
8487
absl::optional<std::string> NewGrpcMuxImpl::whoWantsToSendDiscoveryRequest() {
445
  // All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose
446
  // type_url from pausable_ack_queue_ if possible, before looking at pending updates.
447
8487
  if (!pausable_ack_queue_.empty()) {
448
1488
    return pausable_ack_queue_.front().type_url_;
449
1488
  }
450
  // If we're looking to send multiple non-ACK requests, send them in the order that their
451
  // subscriptions were initiated.
452
12485
  for (const auto& sub_type : subscription_ordering_) {
453
12386
    auto sub = subscriptions_.find(sub_type);
454
12386
    if (sub != subscriptions_.end() && sub->second->sub_state_.subscriptionUpdatePending() &&
455
12386
        !pausable_ack_queue_.paused(sub_type)) {
456
1540
      return sub->first;
457
1540
    }
458
12386
  }
459
5459
  return absl::nullopt;
460
6999
}
461

            
462
// A factory class for creating NewGrpcMuxImpl so it does not have to be
463
// hard-compiled into cluster_manager_impl.cc
464
class NewGrpcMuxFactory : public MuxFactory {
465
public:
466
491
  std::string name() const override { return "envoy.config_mux.new_grpc_mux_factory"; }
467
10652
  void shutdownAll() override { return NewGrpcMuxImpl::shutdownAll(); }
468
  std::shared_ptr<GrpcMux>
469
  create(Grpc::RawAsyncClientSharedPtr&& async_client,
470
         Grpc::RawAsyncClientSharedPtr&& failover_async_client, Event::Dispatcher& dispatcher,
471
         Random::RandomGenerator&, Stats::Scope& scope,
472
         const envoy::config::core::v3::ApiConfigSource& ads_config,
473
         const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
474
         BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
475
         OptRef<XdsResourcesDelegate>,
476
         std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> load_stats_reporter_factory)
477
225
      override {
478
225
    absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
479
225
        Utility::parseRateLimitSettings(ads_config);
480
225
    THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
481
224
    GrpcMuxContext grpc_mux_context{
482
224
        /*async_client_=*/std::move(async_client),
483
224
        /*failover_async_client_=*/std::move(failover_async_client),
484
224
        /*dispatcher_=*/dispatcher,
485
        /*service_method_=*/
486
224
        *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
487
224
            "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"),
488
224
        /*local_info_=*/local_info,
489
224
        /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
490
224
        /*scope_=*/scope,
491
224
        /*config_validators_=*/std::move(config_validators),
492
224
        /*xds_resources_delegate_=*/absl::nullopt,
493
224
        /*xds_config_tracker_=*/xds_config_tracker,
494
224
        /*backoff_strategy_=*/std::move(backoff_strategy),
495
224
        /*target_xds_authority_=*/"",
496
224
        /*eds_resources_cache_=*/std::make_unique<EdsResourcesCacheImpl>(dispatcher),
497
224
        /*skip_subsequent_node_=*/ads_config.set_node_on_first_message_only(),
498
224
        /*load_stats_reporter_factory_=*/load_stats_reporter_factory};
499
224
    return std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context);
500
225
  }
501
};
502

            
503
REGISTER_FACTORY(NewGrpcMuxFactory, MuxFactory);
504

            
505
} // namespace Config
506
} // namespace Envoy