1
#include "source/extensions/config_subscription/grpc/grpc_subscription_factory.h"
2

            
3
#include "source/common/config/custom_config_validators_impl.h"
4
#include "source/common/config/type_to_endpoint.h"
5
#include "source/common/upstream/load_stats_reporter_impl.h"
6
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
7
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
8
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
9
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
10
#include "source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h"
11

            
12
namespace Envoy {
13
namespace Config {
14

            
15
SubscriptionPtr
16
899
GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionData& data) {
17
899
  GrpcMuxSharedPtr mux;
18
899
  const envoy::config::core::v3::ApiConfigSource& api_config_source =
19
899
      data.config_.api_config_source();
20
899
  CustomConfigValidatorsPtr custom_config_validators = std::make_unique<CustomConfigValidatorsImpl>(
21
899
      data.validation_visitor_, data.server_, api_config_source.config_validators());
22
899
  const std::string control_plane_id = Utility::getGrpcControlPlane(api_config_source).value_or("");
23

            
24
899
  auto strategy_or_error = Utility::prepareJitteredExponentialBackOffStrategy(
25
899
      api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs,
26
899
      SubscriptionFactory::RetryMaxDelayMs);
27
899
  THROW_IF_NOT_OK_REF(strategy_or_error.status());
28
898
  JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());
29

            
30
898
  auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource(
31
898
      data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0, false);
32
898
  THROW_IF_NOT_OK_REF(factory_primary_or_error.status());
33
898
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
34
898
      Utility::parseRateLimitSettings(api_config_source);
35
898
  THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
36

            
37
898
  absl::StatusOr<Grpc::RawAsyncClientSharedPtr> primary_client_or_error =
38
898
      factory_primary_or_error.value()->createUncachedRawAsyncClient();
39
898
  THROW_IF_NOT_OK_REF(primary_client_or_error.status());
40
898
  Grpc::RawAsyncClientSharedPtr primary_client = std::move(*primary_client_or_error);
41

            
42
898
  std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
43
898
      [&, data, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
44
    auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
45
        data.local_info_, data.cm_, data.scope_, primary_client, data.dispatcher_);
46
    return reporter;
47
  };
48

            
49
898
  GrpcMuxContext grpc_mux_context{
50
898
      /*async_client_=*/std::move(primary_client),
51
898
      /*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
52
898
      /*dispatcher_=*/data.dispatcher_,
53
898
      /*service_method_=*/sotwGrpcMethod(data.type_url_),
54
898
      /*local_info_=*/data.local_info_,
55
898
      /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
56
898
      /*scope_=*/data.scope_,
57
898
      /*config_validators_=*/std::move(custom_config_validators),
58
898
      /*xds_resources_delegate_=*/data.xds_resources_delegate_,
59
898
      /*xds_config_tracker_=*/data.xds_config_tracker_,
60
898
      /*backoff_strategy_=*/std::move(backoff_strategy),
61
898
      /*target_xds_authority_=*/control_plane_id,
62
898
      /*eds_resources_cache_=*/nullptr, // EDS cache is only used for ADS.
63
898
      /*skip_subsequent_node_=*/api_config_source.set_node_on_first_message_only(),
64
898
      /*load_stats_reporter_factory_=*/lrs_factory};
65

            
66
898
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
67
26
    mux = std::make_shared<Config::XdsMux::GrpcMuxSotw>(grpc_mux_context);
68
872
  } else {
69
872
    mux = std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context);
70
872
  }
71
898
  return std::make_unique<GrpcSubscriptionImpl>(
72
898
      std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
73
898
      data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_),
74
898
      /*is_aggregated*/ false, data.options_);
75
898
}
76

            
77
SubscriptionPtr
78
241
DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionData& data) {
79
241
  GrpcMuxSharedPtr mux;
80
241
  const envoy::config::core::v3::ApiConfigSource& api_config_source =
81
241
      data.config_.api_config_source();
82
241
  CustomConfigValidatorsPtr custom_config_validators = std::make_unique<CustomConfigValidatorsImpl>(
83
241
      data.validation_visitor_, data.server_, api_config_source.config_validators());
84

            
85
241
  auto strategy_or_error = Utility::prepareJitteredExponentialBackOffStrategy(
86
241
      api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs,
87
241
      SubscriptionFactory::RetryMaxDelayMs);
88
241
  THROW_IF_NOT_OK_REF(strategy_or_error.status());
89
241
  JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());
90

            
91
241
  auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource(
92
241
      data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0, false);
93
241
  THROW_IF_NOT_OK_REF(factory_primary_or_error.status());
94
241
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
95
241
      Utility::parseRateLimitSettings(api_config_source);
96
241
  THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
97

            
98
241
  absl::StatusOr<Grpc::RawAsyncClientSharedPtr> primary_client_or_error =
99
241
      factory_primary_or_error.value()->createUncachedRawAsyncClient();
100
241
  THROW_IF_NOT_OK_REF(primary_client_or_error.status());
101
241
  Grpc::RawAsyncClientSharedPtr primary_client = std::move(*primary_client_or_error);
102

            
103
241
  std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
104
241
      [&, data, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
105
    auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
106
        data.local_info_, data.cm_, data.scope_, primary_client, data.dispatcher_);
107
    return reporter;
108
  };
109

            
110
241
  GrpcMuxContext grpc_mux_context{
111
241
      /*async_client_=*/std::move(primary_client),
112
241
      /*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
113
241
      /*dispatcher_=*/data.dispatcher_,
114
241
      /*service_method_=*/deltaGrpcMethod(data.type_url_),
115
241
      /*local_info_=*/data.local_info_,
116
241
      /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
117
241
      /*scope_=*/data.scope_,
118
241
      /*config_validators_=*/std::move(custom_config_validators),
119
241
      /*xds_resources_delegate_=*/{},
120
241
      /*xds_config_tracker_=*/data.xds_config_tracker_,
121
241
      /*backoff_strategy_=*/std::move(backoff_strategy),
122
241
      /*target_xds_authority_=*/"",
123
241
      /*eds_resources_cache_=*/nullptr, // EDS cache is only used for ADS.
124
241
      /*skip_subsequent_node_=*/api_config_source.set_node_on_first_message_only(),
125
241
      /*load_stats_reporter_factory_=*/lrs_factory};
126

            
127
241
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
128
20
    mux = std::make_shared<Config::XdsMux::GrpcMuxDelta>(grpc_mux_context);
129
221
  } else {
130
221
    mux = std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context);
131
221
  }
132
241
  return std::make_unique<GrpcSubscriptionImpl>(
133
241
      std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
134
241
      data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_),
135
241
      /*is_aggregated*/ false, data.options_);
136
241
}
137

            
138
SubscriptionPtr
139
1376
AdsConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionData& data) {
140
1376
  return std::make_unique<GrpcSubscriptionImpl>(
141
1376
      data.ads_grpc_mux_, data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
142
1376
      data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_), true,
143
1376
      data.options_);
144
1376
}
145

            
146
REGISTER_FACTORY(GrpcConfigSubscriptionFactory, ConfigSubscriptionFactory);
147
REGISTER_FACTORY(DeltaGrpcConfigSubscriptionFactory, ConfigSubscriptionFactory);
148
REGISTER_FACTORY(AdsConfigSubscriptionFactory, ConfigSubscriptionFactory);
149

            
150
} // namespace Config
151
} // namespace Envoy