1
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
2

            
3
#include "envoy/service/discovery/v3/discovery.pb.h"
4
#include "envoy/upstream/load_stats_reporter.h"
5

            
6
#include "source/common/config/decoded_resource_impl.h"
7
#include "source/common/config/utility.h"
8
#include "source/common/memory/utils.h"
9
#include "source/common/protobuf/protobuf.h"
10
#include "source/extensions/config_subscription/grpc/eds_resources_cache_impl.h"
11
#include "source/extensions/config_subscription/grpc/xds_source_id.h"
12

            
13
#include "absl/container/btree_map.h"
14
#include "absl/container/node_hash_set.h"
15
#include "absl/strings/match.h"
16
#include "absl/strings/str_cat.h"
17

            
18
namespace Envoy {
19
namespace Config {
20

            
21
namespace {
22
class AllMuxesState {
23
public:
24
1104
  void insert(GrpcMuxImpl* mux) { muxes_.insert(mux); }
25

            
26
1104
  void erase(GrpcMuxImpl* mux) { muxes_.erase(mux); }
27

            
28
10652
  void shutdownAll() {
29
10858
    for (auto& mux : muxes_) {
30
983
      mux->shutdown();
31
983
    }
32
10652
  }
33

            
34
private:
35
  absl::flat_hash_set<GrpcMuxImpl*> muxes_;
36
};
37
using AllMuxes = ThreadSafeSingleton<AllMuxesState>;
38

            
39
// Returns true if `resource_name` contains the wildcard XdsTp resource, for example:
40
// xdstp://test/envoy.config.cluster.v3.Cluster/foo-cluster/*
41
// xdstp://test/envoy.config.cluster.v3.Cluster/foo-cluster/*?node.name=my_node
42
974
bool isXdsTpWildcard(const std::string& resource_name) {
43
974
  return XdsResourceIdentifier::hasXdsTpScheme(resource_name) &&
44
974
         (absl::EndsWith(resource_name, "/*") || absl::StrContains(resource_name, "/*?"));
45
974
}
46

            
47
// Converts the XdsTp resource name to its wildcard equivalent.
48
// Must only be called on XdsTp resource names.
49
6
std::string convertToWildcard(const std::string& resource_name) {
50
6
  ASSERT(XdsResourceIdentifier::hasXdsTpScheme(resource_name));
51
6
  auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
52
6
  THROW_IF_NOT_OK_REF(resource_or_error.status());
53
6
  xds::core::v3::ResourceName xdstp_resource = resource_or_error.value();
54
6
  const auto pos = xdstp_resource.id().find_last_of('/');
55
6
  xdstp_resource.set_id(
56
6
      pos == std::string::npos ? "*" : absl::StrCat(xdstp_resource.id().substr(0, pos), "/*"));
57
6
  XdsResourceIdentifier::EncodeOptions options;
58
6
  options.sort_context_params_ = true;
59
6
  return XdsResourceIdentifier::encodeUrn(xdstp_resource, options);
60
6
}
61
} // namespace
62

            
63
GrpcMuxImpl::GrpcMuxImpl(GrpcMuxContext& grpc_mux_context)
64
1108
    : dispatcher_(grpc_mux_context.dispatcher_),
65
1108
      grpc_stream_(createGrpcStreamObject(std::move(grpc_mux_context.async_client_),
66
1108
                                          std::move(grpc_mux_context.failover_async_client_),
67
1108
                                          grpc_mux_context.service_method_, grpc_mux_context.scope_,
68
1108
                                          std::move(grpc_mux_context.backoff_strategy_),
69
1108
                                          grpc_mux_context.rate_limit_settings_)),
70
1108
      local_info_(grpc_mux_context.local_info_),
71
1108
      skip_subsequent_node_(grpc_mux_context.skip_subsequent_node_),
72
1108
      config_validators_(std::move(grpc_mux_context.config_validators_)),
73
1108
      xds_config_tracker_(grpc_mux_context.xds_config_tracker_),
74
1108
      xds_resources_delegate_(grpc_mux_context.xds_resources_delegate_),
75
1108
      eds_resources_cache_(std::move(grpc_mux_context.eds_resources_cache_)),
76
1108
      target_xds_authority_(grpc_mux_context.target_xds_authority_),
77
1108
      load_stats_reporter_factory_(grpc_mux_context.load_stats_reporter_factory_),
78
      dynamic_update_callback_handle_(
79
1108
          grpc_mux_context.local_info_.contextProvider().addDynamicContextUpdateCallback(
80
1108
              [this](absl::string_view resource_type_url) {
81
14
                onDynamicContextUpdate(resource_type_url);
82
14
                return absl::OkStatus();
83
1108
              })) {
84
1108
  THROW_IF_NOT_OK(Config::Utility::checkLocalInfo("ads", local_info_));
85
1104
  AllMuxes::get().insert(this);
86
1104
}
87

            
88
std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DiscoveryRequest,
89
                                    envoy::service::discovery::v3::DiscoveryResponse>>
90
GrpcMuxImpl::createGrpcStreamObject(Grpc::RawAsyncClientSharedPtr&& async_client,
91
                                    Grpc::RawAsyncClientSharedPtr&& failover_async_client,
92
                                    const Protobuf::MethodDescriptor& service_method,
93
                                    Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
94
1114
                                    const RateLimitSettings& rate_limit_settings) {
95
1114
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
96
51
    return std::make_unique<GrpcMuxFailover<envoy::service::discovery::v3::DiscoveryRequest,
97
51
                                            envoy::service::discovery::v3::DiscoveryResponse>>(
98
        /*primary_stream_creator=*/
99
51
        [&async_client, &service_method, &dispatcher = dispatcher_, &scope, &backoff_strategy,
100
51
         &rate_limit_settings](
101
51
            GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>* callbacks)
102
51
            -> GrpcStreamInterfacePtr<envoy::service::discovery::v3::DiscoveryRequest,
103
51
                                      envoy::service::discovery::v3::DiscoveryResponse> {
104
51
          return std::make_unique<GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
105
51
                                             envoy::service::discovery::v3::DiscoveryResponse>>(
106
51
              callbacks, std::move(async_client), service_method, dispatcher, scope,
107
51
              std::move(backoff_strategy), rate_limit_settings,
108
51
              GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
109
51
                         envoy::service::discovery::v3::DiscoveryResponse>::ConnectedStateValue::
110
51
                  FirstEntry);
111
51
        },
112
        /*failover_stream_creator=*/
113
51
        failover_async_client
114
51
            ? absl::make_optional(
115
12
                  [&failover_async_client, &service_method, &dispatcher = dispatcher_, &scope,
116
12
                   &rate_limit_settings](
117
12
                      GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>*
118
12
                          callbacks)
119
12
                      -> GrpcStreamInterfacePtr<envoy::service::discovery::v3::DiscoveryRequest,
120
12
                                                envoy::service::discovery::v3::DiscoveryResponse> {
121
12
                    return std::make_unique<
122
12
                        GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
123
12
                                   envoy::service::discovery::v3::DiscoveryResponse>>(
124
12
                        callbacks, std::move(failover_async_client), service_method, dispatcher,
125
12
                        scope,
126
                        // TODO(adisuissa): the backoff strategy for the failover should
127
                        // be the same as the primary source.
128
12
                        std::make_unique<FixedBackOffStrategy>(
129
12
                            GrpcMuxFailover<envoy::service::discovery::v3::DiscoveryRequest,
130
12
                                            envoy::service::discovery::v3::DiscoveryResponse>::
131
12
                                DefaultFailoverBackoffMilliseconds),
132
12
                        rate_limit_settings,
133
12
                        GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
134
12
                                   envoy::service::discovery::v3::DiscoveryResponse>::
135
12
                            ConnectedStateValue::SecondEntry);
136
12
                  })
137
51
            : absl::nullopt,
138
51
        /*grpc_mux_callbacks=*/*this,
139
51
        /*dispatch=*/dispatcher_);
140
51
  }
141
1063
  return std::make_unique<GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
142
1063
                                     envoy::service::discovery::v3::DiscoveryResponse>>(
143
1063
      this, std::move(async_client), service_method, dispatcher_, scope,
144
1063
      std::move(backoff_strategy), rate_limit_settings,
145
1063
      GrpcStream<
146
1063
          envoy::service::discovery::v3::DiscoveryRequest,
147
1063
          envoy::service::discovery::v3::DiscoveryResponse>::ConnectedStateValue::FirstEntry);
148
1114
}
149

            
150
1104
GrpcMuxImpl::~GrpcMuxImpl() { AllMuxes::get().erase(this); }
151

            
152
10652
void GrpcMuxImpl::shutdownAll() { AllMuxes::get().shutdownAll(); }
153

            
154
14
void GrpcMuxImpl::onDynamicContextUpdate(absl::string_view resource_type_url) {
155
14
  auto api_state = api_state_.find(resource_type_url);
156
14
  if (api_state == api_state_.end()) {
157
2
    return;
158
2
  }
159
12
  api_state->second->must_send_node_ = true;
160
12
  queueDiscoveryRequest(resource_type_url);
161
12
}
162

            
163
1091
void GrpcMuxImpl::start() {
164
1091
  ASSERT(!started_);
165
1091
  if (started_) {
166
    return;
167
  }
168
1091
  started_ = true;
169
1091
  grpc_stream_->establishNewStream();
170
1091
}
171

            
172
4899
void GrpcMuxImpl::sendDiscoveryRequest(absl::string_view type_url) {
173
4899
  if (shutdown_) {
174
4
    return;
175
4
  }
176

            
177
4895
  ApiState& api_state = apiStateFor(type_url);
178
4895
  auto& request = api_state.request_;
179
4895
  request.mutable_resource_names()->Clear();
180

            
181
  // Maintain a set to avoid dupes.
182
4895
  absl::node_hash_set<std::string> resources;
183
5005
  for (const auto* watch : api_state.watches_) {
184
5014
    for (const std::string& resource : watch->resources_) {
185
3041
      if (!resources.contains(resource)) {
186
3006
        resources.emplace(resource);
187
3006
        request.add_resource_names(resource);
188
3006
      }
189
3041
    }
190
4930
  }
191

            
192
4895
  if (api_state.must_send_node_ || !skip_subsequent_node_ || first_stream_request_) {
193
    // Node may have been cleared during a previous request.
194
2253
    request.mutable_node()->CopyFrom(local_info_.node());
195
2253
    api_state.must_send_node_ = false;
196
4427
  } else {
197
2642
    request.clear_node();
198
2642
  }
199
4895
  ENVOY_LOG(trace, "Sending DiscoveryRequest for {}: {}", type_url, request.ShortDebugString());
200
4895
  grpc_stream_->sendMessage(request);
201
4895
  first_stream_request_ = false;
202

            
203
  // clear error_detail after the request is sent if it exists.
204
4895
  if (apiStateFor(type_url).request_.has_error_detail()) {
205
97
    apiStateFor(type_url).request_.clear_error_detail();
206
97
  }
207
4895
}
208

            
209
1301
void GrpcMuxImpl::clearNonce() {
210
  // Iterate over all api_states (for each type_url), and clear its nonce.
211
1429
  for (auto& [type_url, api_state] : api_state_) {
212
1409
    if (api_state) {
213
1409
      api_state->request_.clear_response_nonce();
214
1409
    }
215
1409
  }
216
1301
}
217

            
218
void GrpcMuxImpl::loadConfigFromDelegate(const std::string& type_url,
219
599
                                         const absl::flat_hash_set<std::string>& resource_names) {
220
599
  if (!xds_resources_delegate_.has_value()) {
221
599
    return;
222
599
  }
223
  ApiState& api_state = apiStateFor(type_url);
224
  if (api_state.watches_.empty()) {
225
    // No watches, so exit without loading config from storage.
226
    return;
227
  }
228

            
229
  const XdsConfigSourceId source_id{target_xds_authority_, type_url};
230
  TRY_ASSERT_MAIN_THREAD {
231
    std::vector<envoy::service::discovery::v3::Resource> resources =
232
        xds_resources_delegate_->getResources(source_id, resource_names);
233
    if (resources.empty()) {
234
      // There are no persisted resources, so nothing to process.
235
      return;
236
    }
237

            
238
    std::vector<DecodedResourcePtr> decoded_resources;
239
    OpaqueResourceDecoder& resource_decoder = *api_state.watches_.front()->resource_decoder_;
240
    std::string version_info;
241
    for (const auto& resource : resources) {
242
      if (version_info.empty()) {
243
        version_info = resource.version();
244
      } else {
245
        ASSERT(resource.version() == version_info);
246
      }
247

            
248
      TRY_ASSERT_MAIN_THREAD {
249
        decoded_resources.emplace_back(
250
            std::make_unique<DecodedResourceImpl>(resource_decoder, resource));
251
      }
252
      END_TRY
253
      CATCH(const EnvoyException& e,
254
            { xds_resources_delegate_->onResourceLoadFailed(source_id, resource.name(), e); });
255
    }
256

            
257
    processDiscoveryResources(decoded_resources, api_state, type_url, version_info,
258
                              /*call_delegate=*/false);
259
  }
260
  END_TRY
261
  CATCH(const EnvoyException& e, {
262
    // TODO(abeyad): do something else here?
263
    ENVOY_LOG_MISC(warn, "Failed to load config from delegate for {}: {}", source_id.toKey(),
264
                   e.what());
265
  });
266
}
267

            
268
GrpcMuxWatchPtr GrpcMuxImpl::addWatch(const std::string& type_url,
269
                                      const absl::flat_hash_set<std::string>& resources,
270
                                      SubscriptionCallbacks& callbacks,
271
                                      OpaqueResourceDecoderSharedPtr resource_decoder,
272
1422
                                      const SubscriptionOptions& options) {
273
  // Resource cache is only used for EDS resources.
274
1422
  EdsResourcesCacheOptRef resources_cache{absl::nullopt};
275
1422
  if (eds_resources_cache_ &&
276
1422
      (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
277
173
    resources_cache = makeOptRefFromPtr(eds_resources_cache_.get());
278
173
  }
279
1422
  auto watch = std::make_unique<GrpcMuxWatchImpl>(resources, callbacks, resource_decoder, type_url,
280
1422
                                                  *this, options, local_info_, resources_cache);
281
1422
  ENVOY_LOG(debug, "gRPC mux addWatch for " + type_url);
282

            
283
  // Lazily kick off the requests based on first subscription. This has the
284
  // convenient side-effect that we order messages on the channel based on
285
  // Envoy's internal dependency ordering.
286
  // TODO(gsagula): move TokenBucketImpl params to a config.
287
1422
  if (!apiStateFor(type_url).subscribed_) {
288
1321
    apiStateFor(type_url).request_.set_type_url(type_url);
289
1321
    apiStateFor(type_url).request_.mutable_node()->MergeFrom(local_info_.node());
290
1321
    apiStateFor(type_url).subscribed_ = true;
291
1321
    subscriptions_.emplace_back(type_url);
292
1321
  }
293

            
294
  // This will send an updated request on each subscription.
295
  // TODO(htuch): For RDS/EDS, this will generate a new DiscoveryRequest on each resource we added.
296
  // Consider in the future adding some kind of collation/batching during CDS/LDS updates so that we
297
  // only send a single RDS/EDS update after the CDS/LDS update.
298
1422
  queueDiscoveryRequest(type_url);
299

            
300
1422
  return watch;
301
1422
}
302

            
303
absl::Status
304
GrpcMuxImpl::updateMuxSource(Grpc::RawAsyncClientSharedPtr&& primary_async_client,
305
                             Grpc::RawAsyncClientSharedPtr&& failover_async_client,
306
                             Stats::Scope& scope, BackOffStrategyPtr&& backoff_strategy,
307
8
                             const envoy::config::core::v3::ApiConfigSource& ads_config_source) {
308
  // Process the rate limit settings.
309
8
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
310
8
      Utility::parseRateLimitSettings(ads_config_source);
311
8
  RETURN_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
312

            
313
6
  const Protobuf::MethodDescriptor& service_method =
314
6
      *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
315
6
          "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources");
316

            
317
  // Disconnect from current xDS servers.
318
6
  ENVOY_LOG_MISC(info, "Replacing xDS gRPC mux source");
319
6
  grpc_stream_->closeStream();
320
6
  grpc_stream_ = createGrpcStreamObject(std::move(primary_async_client),
321
6
                                        std::move(failover_async_client), service_method, scope,
322
6
                                        std::move(backoff_strategy), *rate_limit_settings_or_error);
323
  // No need to update the config_validators_ as they may contain some state
324
  // that needs to be kept across different GrpcMux objects.
325

            
326
  // Start the subscriptions over the grpc_stream.
327
6
  grpc_stream_->establishNewStream();
328

            
329
6
  return absl::OkStatus();
330
8
}
331

            
332
3099
ScopedResume GrpcMuxImpl::pause(const std::string& type_url) {
333
3099
  return pause(std::vector<std::string>{type_url});
334
3099
}
335

            
336
3875
ScopedResume GrpcMuxImpl::pause(const std::vector<std::string> type_urls) {
337
4685
  for (const auto& type_url : type_urls) {
338
4685
    ApiState& api_state = apiStateFor(type_url);
339
4685
    ENVOY_LOG(debug, "Pausing discovery requests for {} (previous count {})", type_url,
340
4685
              api_state.pauses_);
341
4685
    ++api_state.pauses_;
342
4685
  }
343
3875
  return std::make_unique<Cleanup>([this, type_urls]() {
344
4683
    for (const auto& type_url : type_urls) {
345
4683
      ApiState& api_state = apiStateFor(type_url);
346
4683
      ENVOY_LOG(debug, "Decreasing pause count on discovery requests for {} (previous count {})",
347
4683
                type_url, api_state.pauses_);
348
4683
      ASSERT(api_state.paused());
349

            
350
4683
      if (--api_state.pauses_ == 0 && api_state.pending_ && api_state.subscribed_) {
351
3302
        ENVOY_LOG(debug, "Resuming discovery requests for {}", type_url);
352
3302
        queueDiscoveryRequest(type_url);
353
3302
        api_state.pending_ = false;
354
3302
      }
355
4683
    }
356
3873
  });
357
3875
}
358

            
359
void GrpcMuxImpl::onDiscoveryResponse(
360
    std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
361
3091
    ControlPlaneStats& control_plane_stats) {
362
3091
  const std::string type_url = message->type_url();
363
3091
  ENVOY_LOG(debug, "Received gRPC message for {} at version {}", type_url, message->version_info());
364

            
365
3091
  if (api_state_.count(type_url) == 0) {
366
    // TODO(yuval-k): This should never happen. consider dropping the stream as this is a
367
    // protocol violation
368
2
    ENVOY_LOG(warn, "Ignoring the message for type URL {} as it has no current subscribers.",
369
2
              type_url);
370
2
    return;
371
2
  }
372

            
373
3089
  ApiState& api_state = apiStateFor(type_url);
374

            
375
3089
  if (message->has_control_plane()) {
376
33
    control_plane_stats.identifier_.set(message->control_plane().identifier());
377

            
378
33
    if (message->control_plane().identifier() != api_state.control_plane_identifier_) {
379
23
      api_state.control_plane_identifier_ = message->control_plane().identifier();
380
23
      ENVOY_LOG(debug, "Receiving gRPC updates for {} from {}", type_url,
381
23
                api_state.control_plane_identifier_);
382
23
    }
383
33
  }
384

            
385
3089
  if (api_state.watches_.empty()) {
386
    // update the nonce as we are processing this response.
387
4
    api_state.request_.set_response_nonce(message->nonce());
388
4
    if (message->resources().empty()) {
389
      // No watches and no resources. This can happen when envoy unregisters from a
390
      // resource that's removed from the server as well. For example, a deleted cluster
391
      // triggers un-watching the ClusterLoadAssignment watch, and at the same time the
392
      // xDS server sends an empty list of ClusterLoadAssignment resources. we'll accept
393
      // this update. no need to send a discovery request, as we don't watch for anything.
394
2
      api_state.request_.set_version_info(message->version_info());
395
2
    } else {
396
      // No watches and we have resources - this should not happen. send a NACK (by not
397
      // updating the version).
398
2
      ENVOY_LOG(warn, "Ignoring unwatched type URL {}", type_url);
399
2
      queueDiscoveryRequest(type_url);
400
2
    }
401
4
    return;
402
4
  }
403
3085
  ScopedResume same_type_resume;
404
  // We pause updates of the same type. This is necessary for SotW and GrpcMuxImpl, since unlike
405
  // delta and NewGRpcMuxImpl, independent watch additions/removals trigger updates regardless of
406
  // the delta state. The proper fix for this is to converge these implementations,
407
  // see https://github.com/envoyproxy/envoy/issues/11477.
408
3085
  same_type_resume = pause(type_url);
409
3085
  TRY_ASSERT_MAIN_THREAD {
410
3085
    std::vector<DecodedResourcePtr> resources;
411
3085
    OpaqueResourceDecoder& resource_decoder = *api_state.watches_.front()->resource_decoder_;
412

            
413
3210
    for (const auto& resource : message->resources()) {
414
      // TODO(snowp): Check the underlying type when the resource is a Resource.
415
2024
      if (!resource.Is<envoy::service::discovery::v3::Resource>() &&
416
2024
          type_url != resource.type_url()) {
417
14
        throwEnvoyExceptionOrPanic(
418
14
            fmt::format("{} does not match the message-wide type URL {} in DiscoveryResponse {}",
419
14
                        resource.type_url(), type_url, message->DebugString()));
420
14
      }
421

            
422
2010
      auto decoded_resource = THROW_OR_RETURN_VALUE(
423
2010
          DecodedResourceImpl::fromResource(resource_decoder, resource, message->version_info()),
424
2010
          DecodedResourceImplPtr);
425

            
426
2010
      if (!isHeartbeatResource(type_url, *decoded_resource)) {
427
2006
        resources.emplace_back(std::move(decoded_resource));
428
2006
      }
429
2010
    }
430

            
431
3071
    processDiscoveryResources(resources, api_state, type_url, message->version_info(),
432
3071
                              /*call_delegate=*/true);
433

            
434
    // Processing point when resources are successfully ingested.
435
3071
    if (xds_config_tracker_.has_value()) {
436
4
      xds_config_tracker_->onConfigAccepted(type_url, resources);
437
4
    }
438
3071
  }
439
3071
  END_TRY
440
3085
  CATCH(const EnvoyException& e, {
441
3085
    for (auto watch : api_state.watches_) {
442
3085
      watch->callbacks_.onConfigUpdateFailed(
443
3085
          Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
444
3085
    }
445
3085
    ::google::rpc::Status* error_detail = api_state.request_.mutable_error_detail();
446
3085
    error_detail->set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
447
3085
    error_detail->set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
448

            
449
    // Processing point when there is any exception during the parse and ingestion process.
450
3085
    if (xds_config_tracker_.has_value()) {
451
3085
      xds_config_tracker_->onConfigRejected(*message, error_detail->message());
452
3085
    }
453
3085
  });
454
3085
  api_state.previously_fetched_data_ = true;
455
3085
  api_state.request_.set_response_nonce(message->nonce());
456
3085
  ASSERT(api_state.paused());
457
3085
  queueDiscoveryRequest(type_url);
458
3085
}
459

            
460
void GrpcMuxImpl::processDiscoveryResources(const std::vector<DecodedResourcePtr>& resources,
461
                                            ApiState& api_state, const std::string& type_url,
462
                                            const std::string& version_info,
463
3069
                                            const bool call_delegate) {
464
3069
  ASSERT_IS_MAIN_OR_TEST_THREAD();
465
  // To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we
466
  // build a map here from resource name to resource and then walk watches_.
467
  // We have to walk all watches (and need an efficient map as a result) to
468
  // ensure we deliver empty config updates when a resource is dropped. We make the map ordered
469
  // for test determinism.
470
3069
  absl::btree_map<std::string, DecodedResourceRef> resource_ref_map;
471
3069
  std::vector<DecodedResourceRef> all_resource_refs;
472

            
473
3069
  const auto scoped_ttl_update = api_state.ttl_.scopedTtlUpdate();
474

            
475
3194
  for (const auto& resource : resources) {
476
2006
    if (resource->ttl()) {
477
38
      api_state.ttl_.add(*resource->ttl(), resource->name());
478
1968
    } else {
479
1968
      api_state.ttl_.clear(resource->name());
480
1968
    }
481

            
482
2006
    all_resource_refs.emplace_back(*resource);
483
2006
    if (XdsResourceIdentifier::hasXdsTpScheme(resource->name())) {
484
      // Sort the context params of an xdstp resource, so we can compare them easily.
485
33
      auto resource_or_error = XdsResourceIdentifier::decodeUrn(resource->name());
486
33
      THROW_IF_NOT_OK_REF(resource_or_error.status());
487
33
      xds::core::v3::ResourceName xdstp_resource = resource_or_error.value();
488
33
      XdsResourceIdentifier::EncodeOptions options;
489
33
      options.sort_context_params_ = true;
490
33
      resource_ref_map.emplace(XdsResourceIdentifier::encodeUrn(xdstp_resource, options),
491
33
                               *resource);
492
1987
    } else {
493
1973
      resource_ref_map.emplace(resource->name(), *resource);
494
1973
    }
495
2006
  }
496

            
497
  // Execute external config validators if there are any watches.
498
3069
  if (!api_state.watches_.empty()) {
499
3069
    config_validators_->executeValidators(type_url, resources);
500
3069
  }
501

            
502
3114
  for (auto watch : api_state.watches_) {
503
    // onConfigUpdate should be called in all cases for single watch xDS (Cluster and
504
    // Listener) even if the message does not have resources so that update_empty stat
505
    // is properly incremented and state-of-the-world semantics are maintained.
506
3112
    if (watch->resources_.empty()) {
507
1235
      THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(all_resource_refs, version_info));
508
1215
      continue;
509
1235
    }
510
1877
    std::vector<DecodedResourceRef> found_resources;
511
1918
    for (const auto& watched_resource_name : watch->resources_) {
512
      // Look for a singleton subscription.
513
1918
      auto it = resource_ref_map.find(watched_resource_name);
514
1918
      if (it != resource_ref_map.end()) {
515
944
        found_resources.emplace_back(it->second);
516
1860
      } else if (isXdsTpWildcard(watched_resource_name)) {
517
        // See if the resources match the xdstp wildcard subscription.
518
        // Note: although it is unlikely that Envoy will need to support a resource that is mapped
519
        // to both a singleton and collection watch, this code still supports this use case.
520
        // TODO(abeyad): This could be made more efficient, e.g. by pre-computing and having a map
521
        // entry for each wildcard watch.
522
6
        for (const auto& resource_ref_it : resource_ref_map) {
523
6
          if (XdsResourceIdentifier::hasXdsTpScheme(resource_ref_it.first) &&
524
6
              convertToWildcard(resource_ref_it.first) == watched_resource_name) {
525
4
            found_resources.emplace_back(resource_ref_it.second);
526
4
          }
527
6
        }
528
4
      }
529
1918
    }
530

            
531
    // onConfigUpdate should be called only on watches(clusters/listeners) that have
532
    // updates in the message for EDS/RDS.
533
1877
    if (!found_resources.empty()) {
534
920
      THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate(found_resources, version_info));
535
      // Resource cache is only used for EDS resources.
536
920
      if (eds_resources_cache_ &&
537
920
          (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>())) {
538
178
        for (const auto& resource : found_resources) {
539
178
          const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment =
540
178
              dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
541
178
                  resource.get().resource());
542
178
          eds_resources_cache_->setResource(resource.get().name(), cluster_load_assignment);
543
178
        }
544
        // No need to remove resources from the cache, as currently only non-collection
545
        // subscriptions are supported, and these resources are removed in the call
546
        // to updateWatchInterest().
547
178
      }
548
920
    }
549
1877
  }
550

            
551
  // All config updates have been applied without throwing an exception, so we'll call the xDS
552
  // resources delegate, if any.
553
3049
  if (call_delegate && xds_resources_delegate_.has_value()) {
554
4
    xds_resources_delegate_->onConfigUpdated(XdsConfigSourceId{target_xds_authority_, type_url},
555
4
                                             all_resource_refs);
556
4
  }
557

            
558
  // TODO(mattklein123): In the future if we start tracking per-resource versions, we
559
  // would do that tracking here.
560
3049
  api_state.request_.set_version_info(version_info);
561
3049
  Memory::Utils::tryShrinkHeap();
562
3049
}
563

            
564
2
void GrpcMuxImpl::onWriteable() { drainRequests(); }
565

            
566
1301
void GrpcMuxImpl::onStreamEstablished() {
567
1301
  first_stream_request_ = true;
568
1301
  grpc_stream_->maybeUpdateQueueSizeStat(0);
569
1301
  clearNonce();
570
1301
  request_queue_ = std::make_unique<std::queue<std::string>>();
571
1337
  for (const auto& type_url : subscriptions_) {
572
1256
    queueDiscoveryRequest(type_url);
573
1256
  }
574
1301
}
575

            
576
1305
void GrpcMuxImpl::onEstablishmentFailure(bool) {
577
2223
  for (const auto& api_state : api_state_) {
578
2225
    for (auto watch : api_state.second->watches_) {
579
1641
      watch->callbacks_.onConfigUpdateFailed(
580
1641
          Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr);
581
1641
    }
582
2223
    if (!api_state.second->previously_fetched_data_) {
583
      // On the initialization of the gRPC mux, if connection to the xDS server fails, load the
584
      // persisted config, if available. The locally persisted config will be used until
585
      // connectivity is established with the xDS server.
586
599
      loadConfigFromDelegate(
587
599
          /*type_url=*/api_state.first,
588
599
          absl::flat_hash_set<std::string>{api_state.second->request_.resource_names().begin(),
589
599
                                           api_state.second->request_.resource_names().end()});
590
599
      api_state.second->previously_fetched_data_ = true;
591
599
    }
592
2223
  }
593
1305
}
594

            
595
9922
void GrpcMuxImpl::queueDiscoveryRequest(absl::string_view queue_item) {
596
9922
  if (!grpc_stream_->grpcStreamAvailable()) {
597
1625
    ENVOY_LOG(debug, "No stream available to queueDiscoveryRequest for {}", queue_item);
598
1625
    return; // Drop this request; the reconnect will enqueue a new one.
599
1625
  }
600
8297
  ApiState& api_state = apiStateFor(queue_item);
601
8297
  if (api_state.paused()) {
602
3376
    ENVOY_LOG(trace, "API {} paused during queueDiscoveryRequest(), setting pending.", queue_item);
603
3376
    api_state.pending_ = true;
604
3376
    return; // Drop this request; the unpause will enqueue a new one.
605
3376
  }
606
4921
  request_queue_->emplace(std::string(queue_item));
607
4921
  drainRequests();
608
4921
}
609

            
610
void GrpcMuxImpl::expiryCallback(absl::string_view type_url,
611
32
                                 const std::vector<std::string>& expired) {
612
  // The TtlManager triggers a callback with a list of all the expired elements, which we need
613
  // to compare against the various watched resources to return the subset that each watch is
614
  // subscribed to.
615

            
616
  // We convert the incoming list into a set in order to more efficiently perform this
617
  // comparison when there are a lot of watches.
618
32
  absl::flat_hash_set<std::string> all_expired;
619
32
  all_expired.insert(expired.begin(), expired.end());
620

            
621
  // Note: We can blindly dereference the lookup here since the only time we call this is in a
622
  // callback that is created at the same time as we insert the ApiState for this type.
623
32
  for (auto watch : api_state_.find(type_url)->second->watches_) {
624
32
    Protobuf::RepeatedPtrField<std::string> found_resources_for_watch;
625

            
626
32
    for (const auto& resource : expired) {
627
32
      if (all_expired.find(resource) != all_expired.end()) {
628
32
        found_resources_for_watch.Add(std::string(resource));
629
32
      }
630
32
    }
631

            
632
32
    THROW_IF_NOT_OK(watch->callbacks_.onConfigUpdate({}, found_resources_for_watch, ""));
633
32
  }
634
32
}
635

            
636
37450
GrpcMuxImpl::ApiState& GrpcMuxImpl::apiStateFor(absl::string_view type_url) {
637
37450
  auto itr = api_state_.find(type_url);
638
37450
  if (itr == api_state_.end()) {
639
1799
    api_state_.emplace(
640
1799
        type_url, std::make_unique<ApiState>(dispatcher_, [this, type_url](const auto& expired) {
641
32
          expiryCallback(type_url, expired);
642
32
        }));
643
1799
  }
644

            
645
37450
  return *api_state_.find(type_url)->second;
646
37450
}
647

            
648
4923
void GrpcMuxImpl::drainRequests() {
649
9822
  while (!request_queue_->empty() && grpc_stream_->checkRateLimitAllowsDrain()) {
650
    // Process the request, if rate limiting is not enabled at all or if it is under rate limit.
651
4899
    sendDiscoveryRequest(request_queue_->front());
652
4899
    request_queue_->pop();
653
4899
  }
654
4923
  grpc_stream_->maybeUpdateQueueSizeStat(request_queue_->size());
655
4923
}
656

            
657
6
Upstream::LoadStatsReporter* GrpcMuxImpl::maybeCreateLoadStatsReporter() {
658
6
  if (!lrs_server_ && load_stats_reporter_factory_ &&
659
6
      Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_lrs_server_self_ads")) {
660
2
    ENVOY_LOG(info, "Creating self-hosted LRS reporter for xDS-gRPC-Mux (target-authority: {})",
661
2
              target_xds_authority_);
662
2
    lrs_server_ = load_stats_reporter_factory_();
663
2
    if (!lrs_server_) {
664
      ENVOY_LOG(warn,
665
                "Failed to create self-hosted LRS reporter, not using an xDS-based LRS server");
666
      return nullptr;
667
    }
668
2
    ENVOY_LOG(
669
2
        debug,
670
2
        "Successfully created self-hosted LRS reporter for xDS-gRPC-Mux (target-authority: {})",
671
2
        target_xds_authority_);
672
2
  }
673
6
  return lrs_server_.get();
674
6
}
675

            
676
4
Upstream::LoadStatsReporter* GrpcMuxImpl::loadStatsReporter() const { return lrs_server_.get(); }
677

            
678
// A factory class for creating GrpcMuxImpl so it does not have to be
679
// hard-compiled into cluster_manager_impl.cc
680
class GrpcMuxFactory : public MuxFactory {
681
public:
682
491
  std::string name() const override { return "envoy.config_mux.grpc_mux_factory"; }
683
10652
  void shutdownAll() override { return GrpcMuxImpl::shutdownAll(); }
684
  std::shared_ptr<GrpcMux>
685
  create(Grpc::RawAsyncClientSharedPtr&& async_client,
686
         Grpc::RawAsyncClientSharedPtr&& failover_async_client, Event::Dispatcher& dispatcher,
687
         Random::RandomGenerator&, Stats::Scope& scope,
688
         const envoy::config::core::v3::ApiConfigSource& ads_config,
689
         const LocalInfo::LocalInfo& local_info, CustomConfigValidatorsPtr&& config_validators,
690
         BackOffStrategyPtr&& backoff_strategy, XdsConfigTrackerOptRef xds_config_tracker,
691
         XdsResourcesDelegateOptRef xds_resources_delegate,
692
         std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> load_stats_reporter_factory)
693
148
      override {
694
148
    absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
695
148
        Utility::parseRateLimitSettings(ads_config);
696
148
    THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
697
147
    GrpcMuxContext grpc_mux_context{
698
147
        /*async_client_=*/std::move(async_client),
699
147
        /*failover_async_client_=*/std::move(failover_async_client),
700
147
        /*dispatcher_=*/dispatcher,
701
        /*service_method_=*/
702
147
        *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
703
147
            "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources"),
704
147
        /*local_info_=*/local_info,
705
147
        /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
706
147
        /*scope_=*/scope,
707
147
        /*config_validators_=*/std::move(config_validators),
708
147
        /*xds_resources_delegate_=*/xds_resources_delegate,
709
147
        /*xds_config_tracker_=*/xds_config_tracker,
710
147
        /*backoff_strategy_=*/std::move(backoff_strategy),
711
147
        /*target_xds_authority_=*/Config::Utility::getGrpcControlPlane(ads_config).value_or(""),
712
147
        /*eds_resources_cache_=*/std::make_unique<EdsResourcesCacheImpl>(dispatcher),
713
147
        /*skip_subsequent_node_=*/ads_config.set_node_on_first_message_only(),
714
147
        /*load_stats_reporter_factory_=*/load_stats_reporter_factory};
715
147
    return std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context);
716
148
  }
717
};
718

            
719
REGISTER_FACTORY(GrpcMuxFactory, MuxFactory);
720

            
721
} // namespace Config
722
} // namespace Envoy