1
#include "source/common/rds/rds_route_config_subscription.h"
2

            
3
#include "source/common/common/logger.h"
4
#include "source/common/rds/util.h"
5

            
6
namespace Envoy {
7
namespace Rds {
8

            
9
absl::StatusOr<std::unique_ptr<RdsRouteConfigSubscription>> RdsRouteConfigSubscription::create(
10
    RouteConfigUpdatePtr&& config_update,
11
    Envoy::Config::OpaqueResourceDecoderSharedPtr&& resource_decoder,
12
    const envoy::config::core::v3::ConfigSource& config_source,
13
    const std::string& route_config_name, const uint64_t manager_identifier,
14
    Server::Configuration::ServerFactoryContext& factory_context, const std::string& stat_prefix,
15
11
    const std::string& rds_type, RouteConfigProviderManager& route_config_provider_manager) {
16
11
  absl::Status creation_status = absl::OkStatus();
17
11
  auto ret = std::unique_ptr<RdsRouteConfigSubscription>(new RdsRouteConfigSubscription(
18
11
      std::move(config_update), std::move(resource_decoder), config_source, route_config_name,
19
11
      manager_identifier, factory_context, stat_prefix, rds_type, route_config_provider_manager,
20
11
      creation_status));
21
11
  RETURN_IF_NOT_OK(creation_status);
22
11
  return ret;
23
11
}
24

            
25
RdsRouteConfigSubscription::RdsRouteConfigSubscription(
26
    RouteConfigUpdatePtr&& config_update,
27
    Envoy::Config::OpaqueResourceDecoderSharedPtr&& resource_decoder,
28
    const envoy::config::core::v3::ConfigSource& config_source,
29
    const std::string& route_config_name, const uint64_t manager_identifier,
30
    Server::Configuration::ServerFactoryContext& factory_context, const std::string& stat_prefix,
31
    const std::string& rds_type, RouteConfigProviderManager& route_config_provider_manager,
32
    absl::Status& creation_status)
33
380
    : route_config_name_(route_config_name),
34
380
      scope_(factory_context.scope().createScope(stat_prefix + route_config_name_ + ".")),
35
380
      factory_context_(factory_context),
36
380
      parent_init_target_(
37
380
          fmt::format("RdsRouteConfigSubscription {} init {}", rds_type, route_config_name_),
38
380
          [this]() { local_init_manager_.initialize(local_init_watcher_); }),
39
380
      local_init_watcher_(fmt::format("{} local-init-watcher {}", rds_type, route_config_name_),
40
380
                          [this]() { parent_init_target_.ready(); }),
41
380
      local_init_target_(fmt::format("RdsRouteConfigSubscription {} local-init-target {}", rds_type,
42
380
                                     route_config_name_),
43
380
                         [this]() { subscription_->start({route_config_name_}); }),
44
380
      local_init_manager_(fmt::format("{} local-init-manager {}", rds_type, route_config_name_)),
45
380
      stat_prefix_(stat_prefix), rds_type_(rds_type),
46
380
      stats_({ALL_RDS_STATS(POOL_COUNTER(*scope_), POOL_GAUGE(*scope_))}),
47
380
      route_config_provider_manager_(route_config_provider_manager),
48
380
      manager_identifier_(manager_identifier), config_update_info_(std::move(config_update)),
49
380
      resource_decoder_(std::move(resource_decoder)) {
50
380
  const auto resource_type = route_config_provider_manager_.protoTraits().resourceType();
51
380
  auto subscription_or_error =
52
380
      Runtime::runtimeFeatureEnabled(
53
380
          "envoy.reloadable_features.xdstp_based_config_singleton_subscriptions")
54
380
          ? factory_context.xdsManager().subscribeToSingletonResource(
55
16
                route_config_name_, config_source, Envoy::Grpc::Common::typeUrl(resource_type),
56
16
                *scope_, *this, resource_decoder_, {})
57
380
          : factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
58
364
                config_source, Envoy::Grpc::Common::typeUrl(resource_type), *scope_, *this,
59
364
                resource_decoder_, {});
60
380
  SET_AND_RETURN_IF_NOT_OK(subscription_or_error.status(), creation_status);
61
380
  subscription_ = std::move(*subscription_or_error);
62
380
  local_init_manager_.add(local_init_target_);
63
380
}
64

            
65
380
RdsRouteConfigSubscription::~RdsRouteConfigSubscription() {
66
  // If we get destroyed during initialization, make sure we signal that we "initialized".
67
380
  local_init_target_.ready();
68

            
69
  // The ownership of RdsRouteConfigProviderImpl is shared among all HttpConnectionManagers that
70
  // hold a shared_ptr to it. The RouteConfigProviderManager holds weak_ptrs to the
71
  // RdsRouteConfigProviders. Therefore, the map entry for the RdsRouteConfigProvider has to get
72
  // cleaned by the RdsRouteConfigProvider's destructor.
73
380
  route_config_provider_manager_.eraseDynamicProvider(manager_identifier_);
74
380
}
75

            
76
absl::Status RdsRouteConfigSubscription::onConfigUpdate(
77
    const std::vector<Envoy::Config::DecodedResourceRef>& resources,
78
446
    const std::string& version_info) {
79
446
  if (resources.empty()) {
80
2
    ENVOY_LOG(debug, "Missing {} RouteConfiguration for {} in onConfigUpdate()", rds_type_,
81
2
              route_config_name_);
82
2
    stats_.update_empty_.inc();
83
2
    local_init_target_.ready();
84
2
    return absl::OkStatus();
85
2
  }
86
444
  if (resources.size() != 1) {
87
1
    return absl::InvalidArgumentError(
88
1
        fmt::format("Unexpected {} resource length: {}", rds_type_, resources.size()));
89
1
  }
90

            
91
443
  const auto& route_config = resources[0].get().resource();
92
443
  Protobuf::ReflectableMessage reflectable_config = createReflectableMessage(route_config);
93
443
  if (reflectable_config->GetDescriptor()->full_name() !=
94
443
      route_config_provider_manager_.protoTraits().resourceType()) {
95
1
    return absl::InvalidArgumentError(
96
1
        fmt::format("Unexpected {} configuration type (expecting {}): {}", rds_type_,
97
1
                    route_config_provider_manager_.protoTraits().resourceType(),
98
1
                    reflectable_config->GetDescriptor()->full_name()));
99
1
  }
100
442
  if (resourceName(route_config_provider_manager_.protoTraits(), route_config) !=
101
442
      route_config_name_) {
102
2
    return absl::InvalidArgumentError(
103
2
        fmt::format("Unexpected {} configuration (expecting {}): {}", rds_type_, route_config_name_,
104
2
                    resourceName(route_config_provider_manager_.protoTraits(), route_config)));
105
2
  }
106
440
  std::unique_ptr<Init::ManagerImpl> noop_init_manager;
107
440
  std::unique_ptr<Cleanup> resume_rds;
108
440
  if (config_update_info_->onRdsUpdate(route_config, version_info)) {
109
380
    stats_.config_reload_.inc();
110
380
    stats_.config_reload_time_ms_.set(DateUtil::nowToMilliseconds(factory_context_.timeSource()));
111

            
112
380
    RETURN_IF_NOT_OK(beforeProviderUpdate(noop_init_manager, resume_rds));
113

            
114
380
    ENVOY_LOG(debug, "rds: loading new configuration: config_name={} hash={}", route_config_name_,
115
380
              config_update_info_->configHash());
116

            
117
380
    if (route_config_provider_ != nullptr) {
118
380
      RETURN_IF_NOT_OK(route_config_provider_->onConfigUpdate());
119
380
    }
120

            
121
380
    RETURN_IF_NOT_OK(afterProviderUpdate());
122
380
  }
123

            
124
440
  local_init_target_.ready();
125
440
  return absl::OkStatus();
126
440
}
127

            
128
absl::Status RdsRouteConfigSubscription::onConfigUpdate(
129
    const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
130
88
    const Protobuf::RepeatedPtrField<std::string>& removed_resources, const std::string&) {
131
88
  if (!removed_resources.empty()) {
132
    // TODO(#2500) when on-demand resource loading is supported, an RDS removal may make sense
133
    // (see discussion in #6879), and so we should do something other than ignoring here.
134
2
    ENVOY_LOG(trace,
135
2
              "Server sent a delta {} update attempting to remove a resource (name: {}). Ignoring.",
136
2
              rds_type_, removed_resources[0]);
137
2
  }
138
88
  if (!added_resources.empty()) {
139
86
    return onConfigUpdate(added_resources, added_resources[0].get().version());
140
86
  }
141
2
  return absl::OkStatus();
142
88
}
143

            
144
void RdsRouteConfigSubscription::onConfigUpdateFailed(
145
21
    Envoy::Config::ConfigUpdateFailureReason reason, const EnvoyException*) {
146
21
  ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
147
  // We need to allow server startup to continue, even if we have a bad
148
  // config.
149
21
  local_init_target_.ready();
150
21
}
151

            
152
} // namespace Rds
153
} // namespace Envoy