1
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
2

            
3
#include "envoy/service/discovery/v3/discovery.pb.h"
4

            
5
#include "source/common/common/assert.h"
6
#include "source/common/common/backoff_strategy.h"
7
#include "source/common/common/token_bucket_impl.h"
8
#include "source/common/config/utility.h"
9
#include "source/common/config/xds_context_params.h"
10
#include "source/common/config/xds_resource.h"
11
#include "source/common/memory/utils.h"
12
#include "source/common/protobuf/protobuf.h"
13
#include "source/common/protobuf/utility.h"
14
#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
15

            
16
namespace Envoy {
17
namespace Config {
18

            
19
namespace {
20
class AllMuxesState {
21
public:
22
530
  void insert(NewGrpcMuxImpl* mux) { muxes_.insert(mux); }
23

            
24
530
  void erase(NewGrpcMuxImpl* mux) { muxes_.erase(mux); }
25

            
26
10636
  void shutdownAll() {
27
10669
    for (auto& mux : muxes_) {
28
481
      mux->shutdown();
29
481
    }
30
10636
  }
31

            
32
private:
33
  absl::flat_hash_set<NewGrpcMuxImpl*> muxes_;
34
};
35
using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
36
} // namespace
37

            
38
NewGrpcMuxImpl::NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context)
39
530
    : dispatcher_(grpc_mux_context.dispatcher_),
40
530
      grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_),
41
530
                                          std::move(grpc_mux_context.failover_async_client_),
42
530
                                          grpc_mux_context.service_method_, grpc_mux_context.scope_,
43
530
                                          std::move(grpc_mux_context.backoff_strategy_),
44
530
                                          grpc_mux_context.rate_limit_settings_)),
45
530
      local_info_(grpc_mux_context.local_info_),
46
530
      config_validators_(std::move(grpc_mux_context.config_validators_)),
47
      dynamic_update_callback_handle_(
48
530
          grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
49
530
              [this](absl::string_view resource_type_url) {
50
14
                onDynamicContextUpdate(resource_type_url);
51
14
                return absl::OkStatus();
52
14
              })),
53
530
      xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
54
530
      skip_subsequent_node_(grpc_mux_context.skip_subsequent_node_ &&
55
530
                            Runtime::runtimeFeatureEnabled(
56
276
                                "envoy.reloadable_features.xds_legacy_delta_skip_subsequent_node")),
57
530
      eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)) {
58
530
  AllMuxes::get().insert(this);
59
530
}
60

            
61
std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DeltaDiscoveryRequest,
62
                                    envoy::service::discovery::v3::DeltaDiscoveryResponse>>
63
NewGrpcMuxImpl::createGrpcStreamObject(Grpc::RawAsyncClientSharedPtr&& async_client,
64
                                       Grpc::RawAsyncClientSharedPtr&& failover_async_client,
65
                                       const Protobuf::MethodDescriptor& service_method,
66
                                       Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
67
536
                                       const RateLimitSettings& rate_limit_settings) {
68
536
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
69
35
    return std::make_unique<GrpcMuxFailover<envoy::service::discovery::v3::DeltaDiscoveryRequest,
70
35
                                            envoy::service::discovery::v3::DeltaDiscoveryResponse>>(
71
        /*primary_stream_creator=*/
72
35
        [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy,
73
35
         &rate_limit_settings](
74
35
            GrpcStreamCallbacks<envoy::service::discovery::v3::DeltaDiscoveryResponse>* callbacks)
75
35
            -> GrpcStreamInterfacePtr<envoy::service::discovery::v3::DeltaDiscoveryRequest,
76
35
                                      envoy::service::discovery::v3::DeltaDiscoveryResponse> {
77
35
          return std::make_unique<
78
35
              GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
79
35
                         envoy::service::discovery::v3::DeltaDiscoveryResponse>>(
80
35
              callbacks, std::move(async_client), service_method, dispatcher, scope,
81
35
              std::move(backoff_strategy), rate_limit_settings,
82
35
              GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
83
35
                         envoy::service::discovery::v3::DeltaDiscoveryResponse>::
84
35
                  ConnectedStateValue::FirstEntry);
85
35
        },
86
        /*failover_stream_creator=*/
87
35
        failover_async_client
88
35
            ? absl::make_optional(
89
12
                  [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope,
90
12
                   &rate_limit_settings](
91
12
                      GrpcStreamCallbacks<envoy::service::discovery::v3::DeltaDiscoveryResponse>*
92
12
                          callbacks)
93
12
                      -> GrpcStreamInterfacePtr<
94
12
                          envoy::service::discovery::v3::DeltaDiscoveryRequest,
95
12
                          envoy::service::discovery::v3::DeltaDiscoveryResponse> {
96
12
                    return std::make_unique<
97
12
                        GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
98
12
                                   envoy::service::discovery::v3::DeltaDiscoveryResponse>>(
99
12
                        callbacks, std::move(failover_async_client), service_method, dispatcher,
100
12
                        scope,
101
                        // TODO(adisuissa): the backoff strategy for the failover should
102
                        // be the same as the primary source.
103
12
                        std::make_unique<FixedBackOffStrategy>(
104
12
                            GrpcMuxFailover<envoy::service::discovery::v3::DeltaDiscoveryRequest,
105
12
                                            envoy::service::discovery::v3::DeltaDiscoveryResponse>::
106
12
                                DefaultFailoverBackoffMilliseconds),
107
12
                        rate_limit_settings,
108
12
                        GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
109
12
                                   envoy::service::discovery::v3::DeltaDiscoveryResponse>::
110
12
                            ConnectedStateValue::SecondEntry);
111
12
                  })
112
35
            : absl::nullopt,
113
35
        /*grpc_mux_callbacks=*/*this,
114
35
        /*dispatch=*/dispatcher_);
115
35
  }
116
501
  return std::make_unique<GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
117
501
                                     envoy::service::discovery::v3::DeltaDiscoveryResponse>>(
118
501
      this, std::move(async_client), service_method, dispatcher_, scope,
119
501
      std::move(backoff_strategy), rate_limit_settings,
120
501
      GrpcStream<
121
501
          envoy::service::discovery::v3::DeltaDiscoveryRequest,
122
501
          envoy::service::discovery::v3::DeltaDiscoveryResponse>::ConnectedStateValue::FirstEntry);
123
536
}
124

            
125
530
NewGrpcMuxImpl::~NewGrpcMuxImpl() { AllMuxes::get().erase(this); }
126

            
127
10636
void NewGrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
128

            
129
14
void NewGrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
130
14
  auto sub = subscriptions_.find(resource_type_url);
131
14
  if (sub == subscriptions_.end()) {
132
2
    return;
133
2
  }
134
12
  sub->second->sub_state_.setDynamicContextChanged();
135
12
  trySendDiscoveryRequests();
136
12
}
137

            
138
7
ScopedResume NewGrpcMuxImpl::pause(const std::string& type_url) {
139
7
  return pause(std::vector<std::string>{type_url});
140
7
}
141

            
142
1223
ScopedResume NewGrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
143
2581
  for (const auto& type_url : type_urls) {
144
2581
    pausable_ack_queue_.pause(type_url);
145
2581
  }
146

            
147
1223
  return std::make_unique<Cleanup>([this, type_urls]() {
148
2581
    for (const auto& type_url : type_urls) {
149
2581
      pausable_ack_queue_.resume(type_url);
150
2581
      if (!pausable_ack_queue_.paused(type_url)) {
151
2540
        trySendDiscoveryRequests();
152
2540
      }
153
2581
    }
154
1223
  });
155
1223
}
156

            
157
void NewGrpcMuxImpl::onDiscoveryResponse(
158
    std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message,
159
1530
    ControlPlaneStats& control_plane_stats) {
160
1530
  ENVOY_LOG(debug, "Received DeltaDiscoveryResponse for {} at version {}", message->type_url(),
161
1530
            message->system_version_info());
162

            
163
1530
  auto sub = subscriptions_.find(message->type_url());
164
1530
  if (sub == subscriptions_.end()) {
165
    ENVOY_LOG(warn,
166
              "Dropping received DeltaDiscoveryResponse (with version {}) for non-existent "
167
              "subscription {}.",
168
              message->system_version_info(), message->type_url());
169
    return;
170
  }
171

            
172
1530
  if (message->has_control_plane()) {
173
2
    control_plane_stats.identifier_.set(message->control_plane().identifier());
174

            
175
2
    if (message->control_plane().identifier() != sub->second->control_plane_identifier_) {
176
2
      sub->second->control_plane_identifier_ = message->control_plane().identifier();
177
2
      ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", message->type_url(),
178
2
                sub->second->control_plane_identifier_);
179
2
    }
180
2
  }
181

            
182
1530
  auto ack = sub->second->sub_state_.handleResponse(*message);
183

            
184
  // Processing point to record error if there is any failure after the response is processed.
185
1530
  if (xds_config_tracker_.has_value() &&
186
1530
      ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
187
4
    xds_config_tracker_->onConfigRejected(*message, ack.error_detail_.message());
188
4
  }
189
1530
  kickOffAck(ack);
190
1530
  Memory::Utils::tryShrinkHeap();
191
1530
}
192

            
193
665
void NewGrpcMuxImpl::onStreamEstablished() {
194
689
  for (auto& [type_url, subscription] : subscriptions_) {
195
537
    UNREFERENCED_PARAMETER(type_url);
196
537
    subscription->sub_state_.markStreamFresh(should_send_initial_resource_versions_);
197
537
  }
198
665
  first_request_on_stream_ = true;
199
665
  pausable_ack_queue_.clear();
200
665
  trySendDiscoveryRequests();
201
665
}
202

            
203
627
void NewGrpcMuxImpl::onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) {
204
  // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
205
  // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
206
  // crash, the iteration needs to dance around a little: collect pointers to all
207
  // SubscriptionStates, call on all those pointers we haven't yet called on, repeat if there are
208
  // now more SubscriptionStates.
209
627
  absl::flat_hash_map<std::string, DeltaSubscriptionState*> all_subscribed;
210
627
  absl::flat_hash_map<std::string, DeltaSubscriptionState*> already_called;
211
627
  do {
212
1014
    for (auto& [type_url, subscription] : subscriptions_) {
213
1002
      all_subscribed[type_url] = &subscription->sub_state_;
214
1002
    }
215
1014
    for (auto& sub : all_subscribed) {
216
1002
      if (already_called.insert(sub).second) { // insert succeeded ==> not already called
217
1002
        sub.second->handleEstablishmentFailure();
218
1002
      }
219
1002
    }
220
627
  } while (all_subscribed.size() != subscriptions_.size());
221
627
  should_send_initial_resource_versions_ = next_attempt_may_send_initial_resource_version;
222
627
}
223

            
224
void NewGrpcMuxImpl::onWriteable() { trySendDiscoveryRequests(); }
225

            
226
1530
void NewGrpcMuxImpl::kickOffAck(UpdateAck ack) {
227
1530
  pausable_ack_queue_.push(std::move(ack));
228
1530
  trySendDiscoveryRequests();
229
1530
}
230

            
231
// TODO(fredlas) to be removed from the GrpcMux interface very soon.
232
508
void NewGrpcMuxImpl::start() {
233
508
  ASSERT(!started_);
234
508
  if (started_) {
235
    return;
236
  }
237
508
  started_ = true;
238
508
  grpc_stream_->establishNewStream();
239
508
}
240

            
241
GrpcMuxWatchPtr NewGrpcMuxImpl::addWatch(const std::string& type_url,
242
                                         const absl::flat_hash_set<std::string>& resources,
243
                                         SubscriptionCallbacks& callbacks,
244
                                         OpaqueResourceDecoderSharedPtr resource_decoder,
245
954
                                         const SubscriptionOptions& options) {
246
954
  auto entry = subscriptions_.find(type_url);
247
954
  if (entry == subscriptions_.end()) {
248
    // We don't yet have a subscription for type_url! Make one!
249
808
    entry = addSubscription(type_url, options.use_namespace_matching_);
250
808
  }
251

            
252
954
  Watch* watch = entry->second->watch_map_.addWatch(callbacks, *resource_decoder);
253
  // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
254
954
  updateWatch(type_url, watch, resources, options);
255
954
  return std::make_unique<WatchImpl>(type_url, watch, *this, options);
256
954
}
257

            
258
absl::Status
259
NewGrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientSharedPtr&& primary_async_client,
260
                                Grpc::RawAsyncClientSharedPtr&& failover_async_client,
261
                                Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
262
8
                                const envoy::config::core::v3::ApiConfigSource& ads_config_source) {
263
  // Process the rate limit settings.
264
8
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
265
8
      Utility::parseRateLimitSettings(ads_config_source);
266
8
  RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
267

            
268
6
  const Protobuf::MethodDescriptor& service_method =
269
6
      *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
270
6
          "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources");
271

            
272
  // Disconnect from current xDS servers.
273
6
  ENVOY_LOG_MISC(info, "Replacing the xDS gRPC mux source");
274
6
  grpc_stream_->closeStream();
275
6
  grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client),
276
6
                                        std::move(failover_async_client), service_method, scope,
277
6
                                        std::move(backoff_strategy), *rate_limit_settings_or_error);
278
  // No need to update the config_validators_ as they may contain some state
279
  // that needs to be kept across different GrpcMux objects.
280

            
281
  // Update the watch map's config validators.
282
12
  for (auto& [type_url, subscription] : subscriptions_) {
283
12
    subscription->watch_map_.setConfigValidators(config_validators_.get());
284
12
  }
285

            
286
  // Start the subscriptions over the grpc_stream.
287
6
  grpc_stream_->establishNewStream();
288

            
289
6
  return absl::OkStatus();
290
8
}
291

            
292
// Updates the list of resource names watched by the given watch. If an added name is new across
293
// the whole subscription, or if a removed name has no other watch interested in it, then the
294
// subscription will enqueue and attempt to send an appropriate discovery request.
295
void NewGrpcMuxImpl::updateWatch(const std::string& type_url, Watch* watch,
296
                                 const absl::flat_hash_set<std::string>& resources,
297
1923
                                 const SubscriptionOptions& options) {
298
1923
  ASSERT(watch != nullptr);
299
1923
  auto sub = subscriptions_.find(type_url);
300
1923
  RELEASE_ASSERT(sub != subscriptions_.end(),
301
1923
                 fmt::format("Watch of {} has no subscription to update.", type_url));
302
  // We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra
303
  // context parameters.
304
1923
  absl::flat_hash_set<std::string> effective_resources;
305
1942
  for (const auto& resource : resources) {
306
620
    if (XdsResourceIdentifier::hasXdsTpScheme(resource)) {
307
97
      auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource);
308
97
      THROW_IF_NOT_OK_REF(xdstp_resource_or_error.status());
309
97
      auto xdstp_resource = xdstp_resource_or_error.value();
310
97
      if (options.add_xdstp_node_context_params_) {
311
49
        const auto context = XdsContextParams::encodeResource(
312
49
            local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
313
49
        xdstp_resource.mutable_context()->CopyFrom(context);
314
49
      }
315
97
      XdsResourceIdentifier::EncodeOptions encode_options;
316
97
      encode_options.sort_context_params_ = true;
317
97
      effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options));
318
545
    } else {
319
523
      effective_resources.insert(resource);
320
523
    }
321
620
  }
322
1923
  auto added_removed = sub->second->watch_map_.updateWatchInterest(watch, effective_resources);
323
1923
  if (options.use_namespace_matching_) {
324
    // This is to prevent sending out of requests that contain prefixes instead of resource names
325
48
    sub->second->sub_state_.updateSubscriptionInterest({}, {});
326
1875
  } else {
327
1875
    sub->second->sub_state_.updateSubscriptionInterest(added_removed.added_,
328
1875
                                                       added_removed.removed_);
329
1875
  }
330
  // Tell the server about our change in interest, if any.
331
1923
  if (sub->second->sub_state_.subscriptionUpdatePending()) {
332
1455
    trySendDiscoveryRequests();
333
1455
  }
334
1923
}
335

            
336
void NewGrpcMuxImpl::requestOnDemandUpdate(const std::string& type_url,
337
46
                                           const absl::flat_hash_set<std::string>& for_update) {
338
46
  auto sub = subscriptions_.find(type_url);
339
46
  RELEASE_ASSERT(sub != subscriptions_.end(),
340
46
                 fmt::format("Watch of {} has no subscription to update.", type_url));
341
46
  sub->second->sub_state_.updateSubscriptionInterest(for_update, {});
342
  // Tell the server about our change in interest, if any.
343
46
  if (sub->second->sub_state_.subscriptionUpdatePending()) {
344
46
    trySendDiscoveryRequests();
345
46
  }
346
46
}
347

            
348
954
void NewGrpcMuxImpl::removeWatch(const std::string& type_url, Watch* watch) {
349
954
  updateWatch(type_url, watch, {}, {});
350
954
  auto entry = subscriptions_.find(type_url);
351
954
  ASSERT(entry != subscriptions_.end(),
352
954
         fmt::format("removeWatch() called for non-existent subscription {}.", type_url));
353
954
  entry->second->watch_map_.removeWatch(watch);
354
954
}
355

            
356
NewGrpcMuxImpl::SubscriptionsMap::iterator
357
808
NewGrpcMuxImpl::addSubscription(const std::string& type_url, const bool use_namespace_matching) {
358
  // Resource cache is only used for EDS resources.
359
808
  EdsResourcesCacheOptRef resources_cache{absl::nullopt};
360
808
  if (eds_resources_cache_ &&
361
808
      (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
362
120
    resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
363
120
  }
364
808
  auto [it, success] = subscriptions_.emplace(
365
808
      type_url, std::make_unique<SubscriptionStuff>(type_url, use_namespace_matching, dispatcher_,
366
808
                                                    config_validators_.get(), xds_config_tracker_,
367
808
                                                    resources_cache));
368
  // Insertion must succeed, as the addSubscription method is only called if
369
  // the map doesn't have the type_url.
370
808
  ASSERT(success);
371
808
  subscription_ordering_.emplace_back(type_url);
372
808
  return it;
373
808
}
374

            
375
6248
void NewGrpcMuxImpl::trySendDiscoveryRequests() {
376
6248
  if (shutdown_) {
377
443
    return;
378
443
  }
379

            
380
8585
  while (true) {
381
    // Do any of our subscriptions even want to send a request?
382
8585
    absl::optional<std::string> maybe_request_type = whoWantsToSendDiscoveryRequest();
383
8585
    if (!maybe_request_type.has_value()) {
384
5510
      break;
385
5510
    }
386
    // If so, which one (by type_url)?
387
3075
    std::string next_request_type_url = maybe_request_type.value();
388
    // If we don't have a subscription object for this request's type_url, drop the request.
389
3075
    auto sub = subscriptions_.find(next_request_type_url);
390
3075
    RELEASE_ASSERT(sub != subscriptions_.end(),
391
3075
                   fmt::format("Tried to send discovery request for non-existent subscription {}.",
392
3075
                               next_request_type_url));
393

            
394
    // Try again later if paused/rate limited/stream down.
395
3075
    if (!canSendDiscoveryRequest(next_request_type_url)) {
396
295
      break;
397
295
    }
398
2780
    envoy::service::discovery::v3::DeltaDiscoveryRequest request;
399
    // Get our subscription state to generate the appropriate DeltaDiscoveryRequest, and send.
400
2780
    if (!pausable_ack_queue_.empty()) {
401
      // Because ACKs take precedence over plain requests, if there is anything in the queue, it's
402
      // safe to assume it's of the type_url that we're wanting to send.
403
1518
      request = sub->second->sub_state_.getNextRequestWithAck(pausable_ack_queue_.popFront());
404
1777
    } else {
405
1262
      request = sub->second->sub_state_.getNextRequestAckless();
406
1262
    }
407
2780
    const bool set_node = sub->second->sub_state_.dynamicContextChanged() ||
408
2780
                          !skip_subsequent_node_ || first_request_on_stream_;
409
2780
    if (set_node) {
410
1028
      first_request_on_stream_ = false;
411
1028
      *request.mutable_node() = local_info_.node();
412
1028
    }
413
2780
    sub->second->sub_state_.clearDynamicContextChanged();
414
2780
    grpc_stream_->sendMessage(request);
415
2780
  }
416
5805
  grpc_stream_->maybeUpdateQueueSizeStat(pausable_ack_queue_.size());
417
5805
}
418

            
419
// Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
420
// whether we *want* to send a DeltaDiscoveryRequest).
421
3075
bool NewGrpcMuxImpl::canSendDiscoveryRequest(const std::string& type_url) {
422
3075
  RELEASE_ASSERT(
423
3075
      !pausable_ack_queue_.paused(type_url),
424
3075
      fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is "
425
3075
                  "supposed to be filtered out by whoWantsToSendDiscoveryRequest(). ",
426
3075
                  type_url));
427

            
428
3075
  if (!grpc_stream_->grpcStreamAvailable()) {
429
295
    ENVOY_LOG(trace, "No stream available to send a discovery request for {}.", type_url);
430
295
    return false;
431
2784
  } else if (!grpc_stream_->checkRateLimitAllowsDrain()) {
432
    ENVOY_LOG(trace, "{} discovery request hit rate limit; will try later.", type_url);
433
    return false;
434
  }
435
2780
  return true;
436
3075
}
437

            
438
// Checks whether we have something to say in a DeltaDiscoveryRequest, which can be an ACK and/or
439
// a subscription update. (Does not check whether we *can* send that DeltaDiscoveryRequest).
440
// Returns the type_url we should send the DeltaDiscoveryRequest for (if any).
441
// First, prioritizes ACKs over non-ACK subscription interest updates.
442
// Then, prioritizes non-ACK updates in the order the various types
443
// of subscriptions were activated.
444
8585
absl::optional<std::string> NewGrpcMuxImpl::whoWantsToSendDiscoveryRequest() {
445
  // All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose
446
  // type_url from pausable_ack_queue_ if possible, before looking at pending updates.
447
8585
  if (!pausable_ack_queue_.empty()) {
448
1518
    return pausable_ack_queue_.front().type_url_;
449
1518
  }
450
  // If we're looking to send multiple non-ACK requests, send them in the order that their
451
  // subscriptions were initiated.
452
12472
  for (const auto& sub_type : subscription_ordering_) {
453
12372
    auto sub = subscriptions_.find(sub_type);
454
12372
    if (sub != subscriptions_.end() && sub->second->sub_state_.subscriptionUpdatePending() &&
455
12372
        !pausable_ack_queue_.paused(sub_type)) {
456
1557
      return sub->first;
457
1557
    }
458
12372
  }
459
5510
  return absl::nullopt;
460
7067
}
461

            
462
// A factory class for creating NewGrpcMuxImpl so it does not have to be
463
// hard-compiled into cluster_manager_impl.cc
464
class NewGrpcMuxFactory : public MuxFactory {
465
public:
466
494
  std::string name() const override { return "envoy.config_mux.new_grpc_mux_factory"; }
467
10636
  void shutdownAll() override { return NewGrpcMuxImpl::shutdownAll(); }
468
  std::shared_ptr<GrpcMux>
469
  create(Grpc::RawAsyncClientSharedPtr&& async_client,
470
         Grpc::RawAsyncClientSharedPtr&& failover_async_client, Event::Dispatcher& dispatcher,
471
         Random::RandomGenerator&, Stats::Scope& scope,
472
         const envoy::config::core::v3::ApiConfigSource& ads_config,
473
         const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
474
         BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
475
         OptRef<XdsResourcesDelegate>,
476
         std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> load_stats_reporter_factory)
477
223
      override {
478
223
    absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
479
223
        Utility::parseRateLimitSettings(ads_config);
480
223
    THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
481
222
    GrpcMuxContext grpc_mux_context{
482
222
        /*async_client_=*/std::move(async_client),
483
222
        /*failover_async_client_=*/std::move(failover_async_client),
484
222
        /*dispatcher_=*/dispatcher,
485
        /*service_method_=*/
486
222
        *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
487
222
            "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"),
488
222
        /*local_info_=*/local_info,
489
222
        /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
490
222
        /*scope_=*/scope,
491
222
        /*config_validators_=*/std::move(config_validators),
492
222
        /*xds_resources_delegate_=*/absl::nullopt,
493
222
        /*xds_config_tracker_=*/xds_config_tracker,
494
222
        /*backoff_strategy_=*/std::move(backoff_strategy),
495
222
        /*target_xds_authority_=*/"",
496
222
        /*eds_resources_cache_=*/std::make_unique<EdsResourcesCacheImpl>(dispatcher),
497
222
        /*skip_subsequent_node_=*/ads_config.set_node_on_first_message_only(),
498
222
        /*load_stats_reporter_factory_=*/load_stats_reporter_factory};
499
222
    return std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context);
500
223
  }
501
};
502

            
503
REGISTER_FACTORY(NewGrpcMuxFactory, MuxFactory);
504

            
505
} // namespace Config
506
} // namespace Envoy