Line data Source code
1 : #include "source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h"
2 :
3 : #include "envoy/config/endpoint/v3/endpoint.pb.h"
4 : #include "envoy/service/discovery/v3/discovery.pb.h"
5 :
6 : #include "source/common/common/assert.h"
7 : #include "source/common/common/backoff_strategy.h"
8 : #include "source/common/config/decoded_resource_impl.h"
9 : #include "source/common/config/utility.h"
10 : #include "source/common/config/xds_context_params.h"
11 : #include "source/common/config/xds_resource.h"
12 : #include "source/common/memory/utils.h"
13 : #include "source/common/protobuf/protobuf.h"
14 : #include "source/common/protobuf/utility.h"
15 : #include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
16 :
17 : namespace Envoy {
18 : namespace Config {
19 : namespace XdsMux {
20 :
21 : namespace {
22 : class AllMuxesState {
23 : public:
24 14 : void insert(ShutdownableMux* mux) { muxes_.insert(mux); }
25 :
26 14 : void erase(ShutdownableMux* mux) { muxes_.erase(mux); }
27 :
28 270 : void shutdownAll() {
29 270 : for (auto& mux : muxes_) {
30 28 : mux->shutdown();
31 28 : }
32 270 : }
33 :
34 : private:
35 : absl::flat_hash_set<ShutdownableMux*> muxes_;
36 : };
37 : using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
38 : } // namespace
39 :
40 : template <class S, class F, class RQ, class RS>
41 : GrpcMuxImpl<S, F, RQ, RS>::GrpcMuxImpl(std::unique_ptr<F> subscription_state_factory,
42 : GrpcMuxContext& grpc_mux_content, bool skip_subsequent_node)
43 : : grpc_stream_(this, std::move(grpc_mux_content.async_client_),
44 : grpc_mux_content.service_method_, grpc_mux_content.dispatcher_,
45 : grpc_mux_content.scope_, std::move(grpc_mux_content.backoff_strategy_),
46 : grpc_mux_content.rate_limit_settings_),
47 : subscription_state_factory_(std::move(subscription_state_factory)),
48 : skip_subsequent_node_(skip_subsequent_node), local_info_(grpc_mux_content.local_info_),
49 : dynamic_update_callback_handle_(
50 : grpc_mux_content.local_info_.contextProvider().addDynamicContextUpdateCallback(
51 0 : [this](absl::string_view resource_type_url) {
52 0 : onDynamicContextUpdate(resource_type_url);
53 0 : })),
54 : config_validators_(std::move(grpc_mux_content.config_validators_)),
55 : xds_config_tracker_(grpc_mux_content.xds_config_tracker_),
56 : xds_resources_delegate_(grpc_mux_content.xds_resources_delegate_),
57 : eds_resources_cache_(std::move(grpc_mux_content.eds_resources_cache_)),
58 14 : target_xds_authority_(grpc_mux_content.target_xds_authority_) {
59 14 : Config::Utility::checkLocalInfo("ads", grpc_mux_content.local_info_);
60 14 : AllMuxes::get().insert(this);
61 14 : }
62 :
63 14 : template <class S, class F, class RQ, class RS> GrpcMuxImpl<S, F, RQ, RS>::~GrpcMuxImpl() {
64 14 : AllMuxes::get().erase(this);
65 14 : }
66 :
67 270 : template <class S, class F, class RQ, class RS> void GrpcMuxImpl<S, F, RQ, RS>::shutdownAll() {
68 270 : AllMuxes::get().shutdownAll();
69 270 : }
70 :
71 : template <class S, class F, class RQ, class RS>
72 0 : void GrpcMuxImpl<S, F, RQ, RS>::onDynamicContextUpdate(absl::string_view resource_type_url) {
73 0 : ENVOY_LOG(debug, "GrpcMuxImpl::onDynamicContextUpdate for {}", resource_type_url);
74 0 : auto sub = subscriptions_.find(resource_type_url);
75 0 : if (sub == subscriptions_.end()) {
76 0 : return;
77 0 : }
78 0 : sub->second->setDynamicContextChanged();
79 0 : trySendDiscoveryRequests();
80 0 : }
81 :
82 : template <class S, class F, class RQ, class RS>
83 : Config::GrpcMuxWatchPtr GrpcMuxImpl<S, F, RQ, RS>::addWatch(
84 : const std::string& type_url, const absl::flat_hash_set<std::string>& resources,
85 : SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
86 67 : const SubscriptionOptions& options) {
87 67 : auto watch_map = watch_maps_.find(type_url);
88 67 : if (watch_map == watch_maps_.end()) {
89 : // Resource cache is only used for EDS resources.
90 54 : EdsResourcesCacheOptRef resources_cache{absl::nullopt};
91 54 : if (eds_resources_cache_ &&
92 54 : (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
93 0 : resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
94 0 : }
95 :
96 : // We don't yet have a subscription for type_url! Make one!
97 54 : watch_map = watch_maps_
98 54 : .emplace(type_url,
99 54 : std::make_unique<WatchMap>(options.use_namespace_matching_, type_url,
100 54 : *config_validators_.get(), resources_cache))
101 54 : .first;
102 54 : subscriptions_.emplace(type_url, subscription_state_factory_->makeSubscriptionState(
103 54 : type_url, *watch_maps_[type_url], resource_decoder,
104 54 : xds_config_tracker_, xds_resources_delegate_,
105 54 : target_xds_authority_));
106 54 : subscription_ordering_.emplace_back(type_url);
107 54 : }
108 :
109 67 : Watch* watch = watch_map->second->addWatch(callbacks, *resource_decoder);
110 : // updateWatch() queues a discovery request if any of 'resources' are not yet subscribed.
111 67 : updateWatch(type_url, watch, resources, options);
112 67 : return std::make_unique<WatchImpl>(type_url, watch, *this, options);
113 67 : }
114 :
115 : // Updates the list of resource names watched by the given watch. If an added name is new across
116 : // the whole subscription, or if a removed name has no other watch interested in it, then the
117 : // subscription will enqueue and attempt to send an appropriate discovery request.
118 : template <class S, class F, class RQ, class RS>
119 : void GrpcMuxImpl<S, F, RQ, RS>::updateWatch(const std::string& type_url, Watch* watch,
120 : const absl::flat_hash_set<std::string>& resources,
121 134 : const SubscriptionOptions& options) {
122 134 : ENVOY_LOG(debug, "GrpcMuxImpl::updateWatch for {}", type_url);
123 134 : ASSERT(watch != nullptr);
124 134 : auto& sub = subscriptionStateFor(type_url);
125 134 : WatchMap& watch_map = watchMapFor(type_url);
126 :
127 : // We need to prepare xdstp:// resources for the transport, by normalizing and adding any extra
128 : // context parameters.
129 134 : absl::flat_hash_set<std::string> effective_resources;
130 134 : for (const auto& resource : resources) {
131 39 : if (XdsResourceIdentifier::hasXdsTpScheme(resource)) {
132 0 : auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource);
133 0 : THROW_IF_STATUS_NOT_OK(xdstp_resource_or_error, throw);
134 0 : auto xdstp_resource = xdstp_resource_or_error.value();
135 0 : if (options.add_xdstp_node_context_params_) {
136 0 : const auto context = XdsContextParams::encodeResource(
137 0 : local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
138 0 : xdstp_resource.mutable_context()->CopyFrom(context);
139 0 : }
140 0 : XdsResourceIdentifier::EncodeOptions encode_options;
141 0 : encode_options.sort_context_params_ = true;
142 0 : effective_resources.insert(XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options));
143 39 : } else {
144 39 : effective_resources.insert(resource);
145 39 : }
146 39 : }
147 :
148 134 : auto added_removed = watch_map.updateWatchInterest(watch, effective_resources);
149 134 : if (options.use_namespace_matching_) {
150 : // This is to prevent sending out of requests that contain prefixes instead of resource names
151 0 : sub.updateSubscriptionInterest({}, {});
152 134 : } else {
153 134 : sub.updateSubscriptionInterest(added_removed.added_, added_removed.removed_);
154 134 : }
155 :
156 : // Tell the server about our change in interest, if any.
157 134 : if (sub.subscriptionUpdatePending()) {
158 106 : trySendDiscoveryRequests();
159 106 : }
160 134 : }
161 :
162 : template <class S, class F, class RQ, class RS>
163 67 : void GrpcMuxImpl<S, F, RQ, RS>::removeWatch(const std::string& type_url, Watch* watch) {
164 67 : updateWatch(type_url, watch, {}, {});
165 67 : watchMapFor(type_url).removeWatch(watch);
166 67 : }
167 :
168 : template <class S, class F, class RQ, class RS>
169 28 : ScopedResume GrpcMuxImpl<S, F, RQ, RS>::pause(const std::string& type_url) {
170 28 : return pause(std::vector<std::string>{type_url});
171 28 : }
172 :
173 : template <class S, class F, class RQ, class RS>
174 100 : ScopedResume GrpcMuxImpl<S, F, RQ, RS>::pause(const std::vector<std::string> type_urls) {
175 244 : for (const auto& type_url : type_urls) {
176 244 : pausable_ack_queue_.pause(type_url);
177 244 : }
178 :
179 100 : return std::make_unique<Cleanup>([this, type_urls]() {
180 244 : for (const auto& type_url : type_urls) {
181 244 : pausable_ack_queue_.resume(type_url);
182 244 : trySendDiscoveryRequests();
183 244 : }
184 100 : });
185 100 : }
186 :
187 : template <class S, class F, class RQ, class RS>
188 180 : void GrpcMuxImpl<S, F, RQ, RS>::sendGrpcMessage(RQ& msg_proto, S& sub_state) {
189 180 : if (sub_state.dynamicContextChanged() || !anyRequestSentYetInCurrentStream() ||
190 180 : !skipSubsequentNode()) {
191 14 : msg_proto.mutable_node()->CopyFrom(localInfo().node());
192 14 : }
193 180 : sendMessage(msg_proto);
194 180 : setAnyRequestSentYetInCurrentStream(true);
195 180 : sub_state.clearDynamicContextChanged();
196 180 : }
197 :
198 : template <class S, class F, class RQ, class RS>
199 : void GrpcMuxImpl<S, F, RQ, RS>::genericHandleResponse(const std::string& type_url,
200 : const RS& response_proto,
201 111 : ControlPlaneStats& control_plane_stats) {
202 111 : auto sub = subscriptions_.find(type_url);
203 111 : if (sub == subscriptions_.end()) {
204 0 : ENVOY_LOG(warn,
205 0 : "The server sent an xDS response proto with type_url {}, which we have "
206 0 : "not subscribed to. Ignoring.",
207 0 : type_url);
208 0 : return;
209 0 : }
210 :
211 111 : if (response_proto.has_control_plane()) {
212 0 : control_plane_stats.identifier_.set(response_proto.control_plane().identifier());
213 :
214 0 : if (response_proto.control_plane().identifier() != sub->second->controlPlaneIdentifier()) {
215 0 : sub->second->setControlPlaneIdentifier(response_proto.control_plane().identifier());
216 0 : ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", response_proto.type_url(),
217 0 : sub->second->controlPlaneIdentifier());
218 0 : }
219 0 : }
220 :
221 111 : pausable_ack_queue_.push(sub->second->handleResponse(response_proto));
222 111 : trySendDiscoveryRequests();
223 111 : Memory::Utils::tryShrinkHeap();
224 111 : }
225 :
226 14 : template <class S, class F, class RQ, class RS> void GrpcMuxImpl<S, F, RQ, RS>::start() {
227 14 : ASSERT(!started_);
228 14 : if (started_) {
229 0 : return;
230 0 : }
231 14 : started_ = true;
232 14 : ENVOY_LOG(debug, "GrpcMuxImpl now trying to establish a stream");
233 14 : grpc_stream_.establishNewStream();
234 14 : }
235 :
236 : template <class S, class F, class RQ, class RS>
237 14 : void GrpcMuxImpl<S, F, RQ, RS>::handleEstablishedStream() {
238 14 : ENVOY_LOG(debug, "GrpcMuxImpl stream successfully established");
239 14 : for (auto& [type_url, subscription_state] : subscriptions_) {
240 0 : subscription_state->markStreamFresh();
241 0 : }
242 14 : setAnyRequestSentYetInCurrentStream(false);
243 14 : maybeUpdateQueueSizeStat(0);
244 14 : pausable_ack_queue_.clear();
245 14 : trySendDiscoveryRequests();
246 14 : }
247 :
248 : template <class S, class F, class RQ, class RS>
249 14 : void GrpcMuxImpl<S, F, RQ, RS>::handleStreamEstablishmentFailure() {
250 14 : ENVOY_LOG(debug, "GrpcMuxImpl stream failed to establish");
251 : // If this happens while Envoy is still initializing, the onConfigUpdateFailed() we ultimately
252 : // call on CDS will cause LDS to start up, which adds to subscriptions_ here. So, to avoid a
253 : // crash, the iteration needs to dance around a little: collect pointers to all
254 : // SubscriptionStates, call on all those pointers we haven't yet called on, repeat if there are
255 : // now more SubscriptionStates.
256 14 : absl::flat_hash_map<std::string, S*> all_subscribed;
257 14 : absl::flat_hash_map<std::string, S*> already_called;
258 14 : do {
259 54 : for (auto& [type_url, subscription_state] : subscriptions_) {
260 54 : all_subscribed[type_url] = subscription_state.get();
261 54 : }
262 54 : for (auto& sub : all_subscribed) {
263 54 : if (already_called.insert(sub).second) { // insert succeeded ==> not already called
264 54 : sub.second->handleEstablishmentFailure();
265 54 : }
266 54 : }
267 14 : } while (all_subscribed.size() != subscriptions_.size());
268 14 : }
269 :
270 : template <class S, class F, class RQ, class RS>
271 1838 : S& GrpcMuxImpl<S, F, RQ, RS>::subscriptionStateFor(const std::string& type_url) {
272 1838 : auto sub = subscriptions_.find(type_url);
273 1838 : RELEASE_ASSERT(sub != subscriptions_.end(),
274 1838 : fmt::format("Tried to look up SubscriptionState for non-existent subscription {}.",
275 1838 : type_url));
276 1838 : return *sub->second;
277 1838 : }
278 :
279 : template <class S, class F, class RQ, class RS>
280 201 : WatchMap& GrpcMuxImpl<S, F, RQ, RS>::watchMapFor(const std::string& type_url) {
281 201 : auto watch_map = watch_maps_.find(type_url);
282 201 : RELEASE_ASSERT(
283 201 : watch_map != watch_maps_.end(),
284 201 : fmt::format("Tried to look up WatchMap for non-existent subscription {}.", type_url));
285 201 : return *watch_map->second;
286 201 : }
287 :
288 : template <class S, class F, class RQ, class RS>
289 475 : void GrpcMuxImpl<S, F, RQ, RS>::trySendDiscoveryRequests() {
290 475 : if (shutdown_) {
291 37 : return;
292 37 : }
293 :
294 618 : while (true) {
295 : // Do any of our subscriptions even want to send a request?
296 618 : absl::optional<std::string> request_type_if_any = whoWantsToSendDiscoveryRequest();
297 618 : if (!request_type_if_any.has_value()) {
298 438 : break;
299 438 : }
300 : // If so, which one (by type_url)?
301 180 : std::string next_request_type_url = request_type_if_any.value();
302 180 : auto& sub = subscriptionStateFor(next_request_type_url);
303 180 : ENVOY_LOG(debug, "GrpcMuxImpl wants to send discovery request for {}", next_request_type_url);
304 : // Try again later if paused/rate limited/stream down.
305 180 : if (!canSendDiscoveryRequest(next_request_type_url)) {
306 0 : break;
307 0 : }
308 180 : std::unique_ptr<RQ> request;
309 : // Get our subscription state to generate the appropriate discovery request, and send.
310 180 : if (!pausable_ack_queue_.empty()) {
311 : // Because ACKs take precedence over plain requests, if there is anything in the queue, it's
312 : // safe to assume it's of the type_url that we're wanting to send.
313 : //
314 : // getNextRequestWithAck() returns a raw unowned pointer, which sendGrpcMessage deletes.
315 111 : request = sub.getNextRequestWithAck(pausable_ack_queue_.popFront());
316 111 : ENVOY_LOG(debug, "GrpcMuxImpl sent ACK discovery request for {}", next_request_type_url);
317 111 : } else {
318 : // Returns a raw unowned pointer, which sendGrpcMessage deletes.
319 69 : request = sub.getNextRequestAckless();
320 69 : ENVOY_LOG(debug, "GrpcMuxImpl sent non-ACK discovery request for {}", next_request_type_url);
321 69 : }
322 180 : ENVOY_LOG(debug, "GrpcMuxImpl skip_subsequent_node: {}", skipSubsequentNode());
323 180 : sendGrpcMessage(*request, sub);
324 180 : }
325 438 : maybeUpdateQueueSizeStat(pausable_ack_queue_.size());
326 438 : }
327 :
328 : // Checks whether external conditions allow sending a discovery request. (Does not check
329 : // whether we *want* to send a discovery request).
330 : template <class S, class F, class RQ, class RS>
331 180 : bool GrpcMuxImpl<S, F, RQ, RS>::canSendDiscoveryRequest(const std::string& type_url) {
332 180 : RELEASE_ASSERT(
333 180 : !pausable_ack_queue_.paused(type_url),
334 180 : fmt::format("canSendDiscoveryRequest() called on paused type_url {}. Pausedness is "
335 180 : "supposed to be filtered out by whoWantsToSendDiscoveryRequest(). ",
336 180 : type_url));
337 :
338 180 : if (!grpcStreamAvailable()) {
339 0 : ENVOY_LOG(trace, "No stream available to send a discovery request for {}.", type_url);
340 0 : return false;
341 180 : } else if (!rateLimitAllowsDrain()) {
342 0 : ENVOY_LOG(trace, "{} discovery request hit rate limit; will try later.", type_url);
343 0 : return false;
344 0 : }
345 180 : return true;
346 180 : }
347 :
348 : // Checks whether we have something to say in a discovery request, which can be an ACK and/or
349 : // a subscription update. (Does not check whether we *can* send that discovery request).
350 : // Returns the type_url we should send the discovery request for (if any).
351 : // First, prioritizes ACKs over non-ACK subscription interest updates.
352 : // Then, prioritizes non-ACK updates in the order the various types
353 : // of subscriptions were activated.
354 : template <class S, class F, class RQ, class RS>
355 618 : absl::optional<std::string> GrpcMuxImpl<S, F, RQ, RS>::whoWantsToSendDiscoveryRequest() {
356 : // All ACKs are sent before plain updates. trySendDiscoveryRequests() relies on this. So, choose
357 : // type_url from pausable_ack_queue_ if possible, before looking at pending updates.
358 618 : if (!pausable_ack_queue_.empty()) {
359 111 : return pausable_ack_queue_.front().type_url_;
360 111 : }
361 : // If we're looking to send multiple non-ACK requests, send them in the order that their
362 : // subscriptions were initiated.
363 1524 : for (const auto& sub_type : subscription_ordering_) {
364 1524 : auto& sub = subscriptionStateFor(sub_type);
365 1524 : if (sub.subscriptionUpdatePending() && !pausable_ack_queue_.paused(sub_type)) {
366 69 : return sub_type;
367 69 : }
368 1524 : }
369 438 : return absl::nullopt;
370 507 : }
371 :
372 : template class GrpcMuxImpl<DeltaSubscriptionState, DeltaSubscriptionStateFactory,
373 : envoy::service::discovery::v3::DeltaDiscoveryRequest,
374 : envoy::service::discovery::v3::DeltaDiscoveryResponse>;
375 : template class GrpcMuxImpl<SotwSubscriptionState, SotwSubscriptionStateFactory,
376 : envoy::service::discovery::v3::DiscoveryRequest,
377 : envoy::service::discovery::v3::DiscoveryResponse>;
378 :
379 : // Delta- and SotW-specific concrete subclasses:
380 : GrpcMuxDelta::GrpcMuxDelta(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
381 : : GrpcMuxImpl(std::make_unique<DeltaSubscriptionStateFactory>(grpc_mux_context.dispatcher_),
382 4 : grpc_mux_context, skip_subsequent_node) {}
383 :
384 : // GrpcStreamCallbacks for GrpcMuxDelta
385 : void GrpcMuxDelta::requestOnDemandUpdate(const std::string& type_url,
386 0 : const absl::flat_hash_set<std::string>& for_update) {
387 0 : auto& sub = subscriptionStateFor(type_url);
388 0 : sub.updateSubscriptionInterest(for_update, {});
389 : // Tell the server about our change in interest, if any.
390 0 : if (sub.subscriptionUpdatePending()) {
391 0 : trySendDiscoveryRequests();
392 0 : }
393 0 : }
394 :
395 : GrpcMuxSotw::GrpcMuxSotw(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
396 : : GrpcMuxImpl(std::make_unique<SotwSubscriptionStateFactory>(grpc_mux_context.dispatcher_),
397 10 : grpc_mux_context, skip_subsequent_node) {}
398 :
399 : Config::GrpcMuxWatchPtr NullGrpcMuxImpl::addWatch(const std::string&,
400 : const absl::flat_hash_set<std::string>&,
401 : SubscriptionCallbacks&,
402 : OpaqueResourceDecoderSharedPtr,
403 0 : const SubscriptionOptions&) {
404 0 : throw EnvoyException("ADS must be configured to support an ADS config source");
405 0 : }
406 :
407 : class DeltaGrpcMuxFactory : public MuxFactory {
408 : public:
409 13 : std::string name() const override { return "envoy.config_mux.delta_grpc_mux_factory"; }
410 135 : void shutdownAll() override { return GrpcMuxDelta::shutdownAll(); }
411 : std::shared_ptr<GrpcMux>
412 : create(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
413 : Random::RandomGenerator&, Stats::Scope& scope,
414 : const envoy::config::core::v3::ApiConfigSource& ads_config,
415 : const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
416 : BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
417 4 : XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override {
418 4 : GrpcMuxContext grpc_mux_context{
419 4 : /*async_client_=*/std::move(async_client),
420 4 : /*dispatcher_=*/dispatcher,
421 : /*service_method_=*/
422 4 : *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
423 4 : "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources"),
424 4 : /*local_info_=*/local_info,
425 4 : /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
426 4 : /*scope_=*/scope,
427 4 : /*config_validators_=*/std::move(config_validators),
428 4 : /*xds_resources_delegate_=*/absl::nullopt,
429 4 : /*xds_config_tracker_=*/xds_config_tracker,
430 4 : /*backoff_strategy_=*/std::move(backoff_strategy),
431 4 : /*target_xds_authority_=*/"",
432 : /*eds_resources_cache_=*/
433 4 : (use_eds_resources_cache &&
434 4 : Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
435 4 : ? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
436 4 : : nullptr};
437 4 : return std::make_shared<GrpcMuxDelta>(grpc_mux_context,
438 4 : ads_config.set_node_on_first_message_only());
439 4 : }
440 : };
441 :
442 : class SotwGrpcMuxFactory : public MuxFactory {
443 : public:
444 13 : std::string name() const override { return "envoy.config_mux.sotw_grpc_mux_factory"; }
445 135 : void shutdownAll() override { return GrpcMuxSotw::shutdownAll(); }
446 : std::shared_ptr<GrpcMux>
447 : create(Grpc::RawAsyncClientPtr&& async_client, Event::Dispatcher& dispatcher,
448 : Random::RandomGenerator&, Stats::Scope& scope,
449 : const envoy::config::core::v3::ApiConfigSource& ads_config,
450 : const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
451 : BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
452 10 : XdsResourcesDelegateOptRef, bool use_eds_resources_cache) override {
453 10 : GrpcMuxContext grpc_mux_context{
454 10 : /*async_client_=*/std::move(async_client),
455 10 : /*dispatcher_=*/dispatcher,
456 : /*service_method_=*/
457 10 : *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
458 10 : "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
459 10 : /*local_info_=*/local_info,
460 10 : /*rate_limit_settings_=*/Utility::parseRateLimitSettings(ads_config),
461 10 : /*scope_=*/scope,
462 10 : /*config_validators_=*/std::move(config_validators),
463 10 : /*xds_resources_delegate_=*/absl::nullopt,
464 10 : /*xds_config_tracker_=*/xds_config_tracker,
465 10 : /*backoff_strategy_=*/std::move(backoff_strategy),
466 10 : /*target_xds_authority_=*/"",
467 : /*eds_resources_cache_=*/
468 10 : (use_eds_resources_cache &&
469 10 : Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads"))
470 10 : ? std::make_unique<EdsResourcesCacheImpl>(dispatcher)
471 10 : : nullptr};
472 10 : return std::make_shared<GrpcMuxSotw>(grpc_mux_context,
473 10 : ads_config.set_node_on_first_message_only());
474 10 : }
475 : };
476 :
477 : REGISTER_FACTORY(DeltaGrpcMuxFactory, MuxFactory);
478 : REGISTER_FACTORY(SotwGrpcMuxFactory, MuxFactory);
479 :
480 : } // namespace XdsMux
481 : } // namespace Config
482 : } // namespace Envoy
|