1
#include "source/extensions/config_subscription/grpc/grpc_subscription_factory.h"
2

            
3
#include "source/common/config/custom_config_validators_impl.h"
4
#include "source/common/config/type_to_endpoint.h"
5
#include "source/common/upstream/load_stats_reporter_impl.h"
6
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
7
#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
8
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
9
#include "source/extensions/config_subscription/grpc/new_grpc_mux_impl.h"
10
#include "source/extensions/config_subscription/grpc/xds_mux/grpc_mux_impl.h"
11

            
12
namespace Envoy {
13
namespace Config {
14

            
15
SubscriptionPtr
16
922
GrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionData& data) {
17
922
  GrpcMuxSharedPtr mux;
18
922
  const envoy::config::core::v3::ApiConfigSource& api_config_source =
19
922
      data.config_.api_config_source();
20
922
  CustomConfigValidatorsPtr custom_config_validators = std::make_unique<CustomConfigValidatorsImpl>(
21
922
      data.validation_visitor_, data.server_, api_config_source.config_validators());
22
922
  const std::string control_plane_id = Utility::getGrpcControlPlane(api_config_source).value_or("");
23

            
24
922
  auto strategy_or_error = Utility::prepareJitteredExponentialBackOffStrategy(
25
922
      api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs,
26
922
      SubscriptionFactory::RetryMaxDelayMs);
27
922
  THROW_IF_NOT_OK_REF(strategy_or_error.status());
28
921
  JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());
29

            
30
921
  auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource(
31
921
      data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0, false);
32
921
  THROW_IF_NOT_OK_REF(factory_primary_or_error.status());
33
921
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
34
921
      Utility::parseRateLimitSettings(api_config_source);
35
921
  THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
36

            
37
921
  absl::StatusOr<Grpc::RawAsyncClientSharedPtr> primary_client_or_error =
38
921
      factory_primary_or_error.value()->createUncachedRawAsyncClient();
39
921
  THROW_IF_NOT_OK_REF(primary_client_or_error.status());
40
921
  Grpc::RawAsyncClientSharedPtr primary_client = std::move(*primary_client_or_error);
41

            
42
921
  std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
43
921
      [&, data, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
44
    auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
45
        data.local_info_, data.cm_, data.scope_, primary_client, data.dispatcher_);
46
    return reporter;
47
  };
48

            
49
921
  GrpcMuxContext grpc_mux_context{
50
921
      /*async_client_=*/std::move(primary_client),
51
921
      /*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
52
921
      /*dispatcher_=*/data.dispatcher_,
53
921
      /*service_method_=*/sotwGrpcMethod(data.type_url_),
54
921
      /*local_info_=*/data.local_info_,
55
921
      /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
56
921
      /*scope_=*/data.scope_,
57
921
      /*config_validators_=*/std::move(custom_config_validators),
58
921
      /*xds_resources_delegate_=*/data.xds_resources_delegate_,
59
921
      /*xds_config_tracker_=*/data.xds_config_tracker_,
60
921
      /*backoff_strategy_=*/std::move(backoff_strategy),
61
921
      /*target_xds_authority_=*/control_plane_id,
62
921
      /*eds_resources_cache_=*/nullptr, // EDS cache is only used for ADS.
63
921
      /*skip_subsequent_node_=*/api_config_source.set_node_on_first_message_only(),
64
921
      /*load_stats_reporter_factory_=*/lrs_factory};
65

            
66
921
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
67
36
    mux = std::make_shared<Config::XdsMux::GrpcMuxSotw>(grpc_mux_context);
68
891
  } else {
69
885
    mux = std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context);
70
885
  }
71
921
  return std::make_unique<GrpcSubscriptionImpl>(
72
921
      std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
73
921
      data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_),
74
921
      /*is_aggregated*/ false, data.options_);
75
921
}
76

            
77
SubscriptionPtr
78
267
DeltaGrpcConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionData& data) {
79
267
  GrpcMuxSharedPtr mux;
80
267
  const envoy::config::core::v3::ApiConfigSource& api_config_source =
81
267
      data.config_.api_config_source();
82
267
  CustomConfigValidatorsPtr custom_config_validators = std::make_unique<CustomConfigValidatorsImpl>(
83
267
      data.validation_visitor_, data.server_, api_config_source.config_validators());
84

            
85
267
  auto strategy_or_error = Utility::prepareJitteredExponentialBackOffStrategy(
86
267
      api_config_source, data.api_.randomGenerator(), SubscriptionFactory::RetryInitialDelayMs,
87
267
      SubscriptionFactory::RetryMaxDelayMs);
88
267
  THROW_IF_NOT_OK_REF(strategy_or_error.status());
89
267
  JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());
90

            
91
267
  auto factory_primary_or_error = Utility::factoryForGrpcApiConfigSource(
92
267
      data.cm_.grpcAsyncClientManager(), api_config_source, data.scope_, true, 0, false);
93
267
  THROW_IF_NOT_OK_REF(factory_primary_or_error.status());
94
267
  absl::StatusOr<RateLimitSettings> rate_limit_settings_or_error =
95
267
      Utility::parseRateLimitSettings(api_config_source);
96
267
  THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
97

            
98
267
  absl::StatusOr<Grpc::RawAsyncClientSharedPtr> primary_client_or_error =
99
267
      factory_primary_or_error.value()->createUncachedRawAsyncClient();
100
267
  THROW_IF_NOT_OK_REF(primary_client_or_error.status());
101
267
  Grpc::RawAsyncClientSharedPtr primary_client = std::move(*primary_client_or_error);
102

            
103
267
  std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
104
267
      [&, data, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
105
    auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
106
        data.local_info_, data.cm_, data.scope_, primary_client, data.dispatcher_);
107
    return reporter;
108
  };
109

            
110
267
  GrpcMuxContext grpc_mux_context{
111
267
      /*async_client_=*/std::move(primary_client),
112
267
      /*failover_async_client_=*/nullptr, // Failover is only supported for ADS.
113
267
      /*dispatcher_=*/data.dispatcher_,
114
267
      /*service_method_=*/deltaGrpcMethod(data.type_url_),
115
267
      /*local_info_=*/data.local_info_,
116
267
      /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
117
267
      /*scope_=*/data.scope_,
118
267
      /*config_validators_=*/std::move(custom_config_validators),
119
267
      /*xds_resources_delegate_=*/{},
120
267
      /*xds_config_tracker_=*/data.xds_config_tracker_,
121
267
      /*backoff_strategy_=*/std::move(backoff_strategy),
122
267
      /*target_xds_authority_=*/"",
123
267
      /*eds_resources_cache_=*/nullptr, // EDS cache is only used for ADS.
124
267
      /*skip_subsequent_node_=*/api_config_source.set_node_on_first_message_only(),
125
267
      /*load_stats_reporter_factory_=*/lrs_factory};
126

            
127
267
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
128
30
    mux = std::make_shared<Config::XdsMux::GrpcMuxDelta>(grpc_mux_context);
129
243
  } else {
130
237
    mux = std::make_shared<Config::NewGrpcMuxImpl>(grpc_mux_context);
131
237
  }
132
267
  return std::make_unique<GrpcSubscriptionImpl>(
133
267
      std::move(mux), data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
134
267
      data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_),
135
267
      /*is_aggregated*/ false, data.options_);
136
267
}
137

            
138
SubscriptionPtr
139
1368
AdsConfigSubscriptionFactory::create(ConfigSubscriptionFactory::SubscriptionData& data) {
140
1368
  return std::make_unique<GrpcSubscriptionImpl>(
141
1368
      data.ads_grpc_mux_, data.callbacks_, data.resource_decoder_, data.stats_, data.type_url_,
142
1368
      data.dispatcher_, Utility::configSourceInitialFetchTimeout(data.config_), true,
143
1368
      data.options_);
144
1368
}
145

            
146
REGISTER_FACTORY(GrpcConfigSubscriptionFactory, ConfigSubscriptionFactory);
147
REGISTER_FACTORY(DeltaGrpcConfigSubscriptionFactory, ConfigSubscriptionFactory);
148
REGISTER_FACTORY(AdsConfigSubscriptionFactory, ConfigSubscriptionFactory);
149

            
150
} // namespace Config
151
} // namespace Envoy