1
#include "source/extensions/config_subscription/grpc/grpc_collection_subscription_factory.h"
2

            
3
#include "source/common/config/custom_config_validators_impl.h"
4
#include "source/common/config/type_to_endpoint.h"
5
#include "source/common/upstream/load_stats_reporter_impl.h"
6
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
7
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
8
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
9
#include "source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h"
10

            
11
namespace Envoy {
12
namespace Config {
13

            
14
SubscriptionPtr DeltaGrpcCollectionConfigSubscriptionFactory::create(
15
28
    ConfigSubscriptionFactory::SubscriptionData& data) {
16
28
  const envoy::config::core::v3::ApiConfigSource& api_config_source =
17
28
      data.config_.api_config_source();
18
28
  CustomConfigValidatorsPtr custom_config_validators = std::make_unique<CustomConfigValidatorsImpl>(
19
28
      data.validation_visitor_, data.server_, api_config_source.config_validators());
20

            
21
28
  auto strategy_or_error = Utility::prepareJitteredExponentialBackOffStrategy(
22
28
      api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs,
23
28
      SubscriptionFactory::RetryMaxDelayMs);
24
28
  THROW_IF_NOT_OK_REF(strategy_or_error.status());
25
28
  JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());
26

            
27
28
  auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
28
28
      data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0, false);
29
28
  THROW_IF_NOT_OK_REF(factory_primary_or_error.status());
30
28
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
31
28
      Utility::parseRateLimitSettings(api_config_source);
32
28
  THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
33

            
34
28
  absl::StatusOr<Grpc::RawAsyncClientSharedPtr> primary_client_or_error =
35
28
      factory_primary_or_error.value()->createUncachedRawAsyncClient();
36
28
  THROW_IF_NOT_OK_REF(primary_client_or_error.status());
37
28
  Grpc::RawAsyncClientSharedPtr primary_client = std::move(*primary_client_or_error);
38

            
39
28
  std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
40
28
      [&, data, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
41
    auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
42
        data.local_info_, data.cm_, data.scope_, primary_client, data.dispatcher_);
43
    return reporter;
44
  };
45

            
46
28
  GrpcMuxContext grpc_mux_context{
47
28
      /*primary_async_client_=*/std::move(primary_client),
48
28
      /*failover_async_client_=*/nullptr,
49
28
      /*dispatcher_=*/data.dispatcher_,
50
28
      /*service_method_=*/deltaGrpcMethod(data.type_url_),
51
28
      /*local_info_=*/data.local_info_,
52
28
      /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
53
28
      /*scope_=*/data.scope_,
54
28
      /*config_validators_=*/std::move(custom_config_validators),
55
28
      /*xds_resources_delegate_=*/{},
56
28
      /*xds_config_tracker_=*/data.xds_config_tracker_,
57
28
      /*backoff_strategy_=*/std::move(backoff_strategy),
58
28
      /*target_xds_authority_=*/"",
59
28
      /*eds_resources_cache_=*/nullptr, // No EDS resources cache needed from collections.
60
28
      /*skip_subsequent_node_=*/api_config_source.set_node_on_first_message_only(),
61
28
      /*load_stats_reporter_factory_=*/lrs_factory};
62
28
  return std::make_unique<GrpcCollectionSubscriptionImpl>(
63
28
      data.collection_locator_.value(), std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context),
64
28
      data.callbacks_, data.resource_decoder_, data.stats_, data.dispatcher_,
65
28
      Utility::configSourceInitialFetchTimeout(data.config_), /*is_aggregated=*/false,
66
28
      data.options_);
67
28
}
68

            
69
SubscriptionPtr AggregatedGrpcCollectionConfigSubscriptionFactory::create(
70
32
    ConfigSubscriptionFactory::SubscriptionData& data) {
71
32
  return std::make_unique<GrpcCollectionSubscriptionImpl>(
72
32
      data.collection_locator_.value(), data.cm_.adsMux(), data.callbacks_, data.resource_decoder_,
73
32
      data.stats_, data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_),
74
32
      /*is_aggregated=*/true, data.options_);
75
32
}
76

            
77
SubscriptionPtr
78
14
AdsCollectionConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionData& data) {
79
  // All Envoy collections currently are xDS resource graph roots and require node context
80
  // parameters.
81
14
  return std::make_unique<GrpcCollectionSubscriptionImpl>(
82
14
      data.collection_locator_.value(), data.cm_.adsMux(), data.callbacks_, data.resource_decoder_,
83
14
      data.stats_, data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_), true,
84
14
      data.options_);
85
14
}
86

            
87
REGISTER_FACTORY(DeltaGrpcCollectionConfigSubscriptionFactory, ConfigSubscriptionFactory);
88
REGISTER_FACTORY(AggregatedGrpcCollectionConfigSubscriptionFactory, ConfigSubscriptionFactory);
89
REGISTER_FACTORY(AdsCollectionConfigSubscriptionFactory, ConfigSubscriptionFactory);
90

            
91
} // namespace Config
92
} // namespace Envoy