1
#include "source/common/router/rds_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7

            
8
#include "envoy/admin/v3/config_dump.pb.h"
9
#include "envoy/config/core/v3/config_source.pb.h"
10
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
11
#include "envoy/service/discovery/v3/discovery.pb.h"
12

            
13
#include "source/common/common/assert.h"
14
#include "source/common/common/fmt.h"
15
#include "source/common/config/api_version.h"
16
#include "source/common/config/utility.h"
17
#include "source/common/http/header_map_impl.h"
18
#include "source/common/protobuf/utility.h"
19
#include "source/common/router/config_impl.h"
20
#include "source/common/router/route_config_update_receiver_impl.h"
21

            
22
namespace Envoy {
23
namespace Router {
24

            
25
absl::StatusOr<std::unique_ptr<RdsRouteConfigSubscription>> RdsRouteConfigSubscription::create(
26
    RouteConfigUpdatePtr&& config_update,
27
    Envoy::Config::OpaqueResourceDecoderSharedPtr&& resource_decoder,
28
    const envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
29
    const uint64_t manager_identifier, Server::Configuration::ServerFactoryContext& factory_context,
30
    const std::string& stat_prefix,
31
369
    Rds::RouteConfigProviderManager& route_config_provider_manager) {
32
369
  absl::Status creation_status = absl::OkStatus();
33
369
  auto ret = std::unique_ptr<RdsRouteConfigSubscription>(new RdsRouteConfigSubscription(
34
369
      std::move(config_update), std::move(resource_decoder), rds, manager_identifier,
35
369
      factory_context, stat_prefix, route_config_provider_manager, creation_status));
36
369
  RETURN_IF_NOT_OK(creation_status);
37
369
  return ret;
38
369
}
39

            
40
// TODO(htuch): If support for multiple clusters is added per #1170 cluster_name_
41
RdsRouteConfigSubscription::RdsRouteConfigSubscription(
42
    RouteConfigUpdatePtr&& config_update,
43
    Envoy::Config::OpaqueResourceDecoderSharedPtr&& resource_decoder,
44
    const envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
45
    const uint64_t manager_identifier, Server::Configuration::ServerFactoryContext& factory_context,
46
    const std::string& stat_prefix, Rds::RouteConfigProviderManager& route_config_provider_manager,
47
    absl::Status& creation_status)
48
369
    : Rds::RdsRouteConfigSubscription(std::move(config_update), std::move(resource_decoder),
49
369
                                      rds.config_source(), rds.route_config_name(),
50
369
                                      manager_identifier, factory_context, stat_prefix + "rds.",
51
369
                                      "RDS", route_config_provider_manager, creation_status),
52
369
      config_update_info_(static_cast<RouteConfigUpdateReceiver*>(
53
369
          Rds::RdsRouteConfigSubscription::config_update_info_.get())) {}
54

            
55
369
RdsRouteConfigSubscription::~RdsRouteConfigSubscription() { config_update_info_.release(); }
56

            
57
absl::Status RdsRouteConfigSubscription::beforeProviderUpdate(
58
374
    std::unique_ptr<Init::ManagerImpl>& noop_init_manager, std::unique_ptr<Cleanup>& resume_rds) {
59
374
  if (config_update_info_->protobufConfigurationCast().has_vhds() &&
60
374
      config_update_info_->vhdsConfigurationChanged()) {
61
39
    ENVOY_LOG(debug,
62
39
              "rds: vhds configuration present/changed, (re)starting vhds: config_name={} hash={}",
63
39
              route_config_name_, routeConfigUpdate()->configHash());
64
39
    ASSERT(config_update_info_->configInfo().has_value());
65
39
    maybeCreateInitManager(routeConfigUpdate()->configInfo().value().version_, noop_init_manager,
66
39
                           resume_rds);
67
39
    auto subscription_or_error = VhdsSubscription::createVhdsSubscription(
68
39
        config_update_info_, factory_context_, stat_prefix_, route_config_provider_);
69
39
    RETURN_IF_NOT_OK_REF(subscription_or_error.status());
70
39
    vhds_subscription_ = std::move(subscription_or_error.value());
71
39
    vhds_subscription_->registerInitTargetWithInitManager(
72
39
        noop_init_manager == nullptr ? local_init_manager_ : *noop_init_manager);
73
39
  }
74
374
  return absl::OkStatus();
75
374
}
76

            
77
374
absl::Status RdsRouteConfigSubscription::afterProviderUpdate() {
78
  // RDS update removed VHDS configuration
79
374
  if (!config_update_info_->protobufConfigurationCast().has_vhds()) {
80
333
    vhds_subscription_.release();
81
333
  }
82

            
83
374
  return update_callback_manager_.runCallbacks();
84
374
}
85

            
86
// Initialize a no-op InitManager in case the one in the factory_context has completed
87
// initialization. This can happen if an RDS config update for an already established RDS
88
// subscription contains VHDS configuration.
89
void RdsRouteConfigSubscription::maybeCreateInitManager(
90
    const std::string& version_info, std::unique_ptr<Init::ManagerImpl>& init_manager,
91
41
    std::unique_ptr<Cleanup>& init_vhds) {
92
41
  if (local_init_manager_.state() == Init::Manager::State::Initialized) {
93
11
    init_manager = std::make_unique<Init::ManagerImpl>(
94
11
        fmt::format("VHDS {}:{}", route_config_name_, version_info));
95
11
    init_vhds = std::make_unique<Cleanup>([this, &init_manager, version_info] {
96
      // For new RDS subscriptions created after listener warming up, we don't wait for them to warm
97
      // up.
98
11
      Init::WatcherImpl noop_watcher(
99
          // Note: we just throw it away.
100
11
          fmt::format("VHDS ConfigUpdate watcher {}:{}", route_config_name_, version_info),
101
11
          []() { /*Do nothing.*/ });
102
11
      init_manager->initialize(noop_watcher);
103
11
    });
104
11
  }
105
41
}
106

            
107
24
void RdsRouteConfigSubscription::updateOnDemand(const std::string& aliases) {
108
24
  if (vhds_subscription_.get() == nullptr) {
109
    return;
110
  }
111
24
  vhds_subscription_->updateOnDemand(aliases);
112
24
}
113

            
114
RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl(
115
    RdsRouteConfigSubscriptionSharedPtr&& subscription,
116
    Server::Configuration::ServerFactoryContext& factory_context)
117
369
    : base_(subscription, factory_context), config_update_info_(subscription->routeConfigUpdate()),
118
369
      factory_context_(factory_context) {
119
  // The subscription referenced by the 'base_' and by 'this' is the same.
120
  // In it the provider is already set by the 'base_' so it points to that.
121
  // Need to set again to point to 'this'.
122
369
  base_.subscription().routeConfigProvider() = this;
123
369
}
124

            
125
539
RdsRouteConfigSubscription& RdsRouteConfigProviderImpl::subscription() {
126
539
  return static_cast<RdsRouteConfigSubscription&>(base_.subscription());
127
539
}
128

            
129
446
absl::Status RdsRouteConfigProviderImpl::onConfigUpdate() {
130
446
  auto status = base_.onConfigUpdate();
131
446
  if (!status.ok()) {
132
    return status;
133
  }
134

            
135
446
  const auto aliases = config_update_info_->resourceIdsInLastVhdsUpdate();
136
  // Regular (non-VHDS) RDS updates don't populate aliases fields in resources.
137
446
  if (aliases.empty()) {
138
378
    return absl::OkStatus();
139
378
  }
140

            
141
68
  const auto config =
142
68
      std::static_pointer_cast<const ConfigImpl>(config_update_info_->parsedConfiguration());
143
  // Notifies connections that RouteConfiguration update has been propagated.
144
  // Callbacks processing is performed in FIFO order. The callback is skipped if alias used in
145
  // the VHDS update request do not match the aliases in the update response
146
92
  for (auto it = config_update_callbacks_.begin(); it != config_update_callbacks_.end();) {
147
24
    auto found = aliases.find(it->alias_);
148
24
    if (found != aliases.end()) {
149
      // TODO(dmitri-d) HeaderMapImpl is expensive, need to profile this
150
24
      auto host_header = Http::RequestHeaderMapImpl::create();
151
24
      host_header->setHost(VhdsSubscription::aliasToDomainName(it->alias_));
152
24
      const bool host_exists = config->virtualHostExists(*host_header);
153
24
      std::weak_ptr<Http::RouteConfigUpdatedCallback> current_cb(it->cb_);
154
24
      it->thread_local_dispatcher_.post([current_cb, host_exists] {
155
24
        if (auto cb = current_cb.lock()) {
156
24
          (*cb)(host_exists);
157
24
        }
158
24
      });
159
24
      it = config_update_callbacks_.erase(it);
160
24
    } else {
161
      it++;
162
    }
163
24
  }
164
68
  return absl::OkStatus();
165
446
}
166

            
167
755
ConfigConstSharedPtr RdsRouteConfigProviderImpl::configCast() const {
168
755
  ASSERT(dynamic_cast<const Config*>(RdsRouteConfigProviderImpl::config().get()));
169
755
  return std::static_pointer_cast<const Config>(RdsRouteConfigProviderImpl::config());
170
755
}
171

            
172
// Schedules a VHDS request on the main thread and queues up the callback to use when the VHDS
173
// response has been propagated to the worker thread that was the request origin.
174
void RdsRouteConfigProviderImpl::requestVirtualHostsUpdate(
175
    const std::string& for_domain, Event::Dispatcher& thread_local_dispatcher,
176
25
    std::weak_ptr<Http::RouteConfigUpdatedCallback> route_config_updated_cb) {
177
25
  auto alias = VhdsSubscription::domainNameToAlias(
178
25
      config_update_info_->protobufConfigurationCast().name(), for_domain);
179
  // The RdsRouteConfigProviderImpl instance can go away before the dispatcher has a chance to
180
  // execute the callback. still_alive shared_ptr will be deallocated when the current instance of
181
  // the RdsRouteConfigProviderImpl is deallocated; we rely on a weak_ptr to still_alive flag to
182
  // determine if the RdsRouteConfigProviderImpl instance is still valid.
183
25
  factory_context_.mainThreadDispatcher().post(
184
25
      [this, maybe_still_alive = std::weak_ptr<bool>(still_alive_), alias, &thread_local_dispatcher,
185
25
       route_config_updated_cb]() -> void {
186
25
        if (maybe_still_alive.lock()) {
187
24
          subscription().updateOnDemand(alias);
188
24
          config_update_callbacks_.push_back(
189
24
              {alias, thread_local_dispatcher, route_config_updated_cb});
190
24
        }
191
25
      });
192
25
}
193
RouteConfigProviderSharedPtr RdsFactoryImpl::createRdsRouteConfigProvider(
194
    const envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
195
    Server::Configuration::ServerFactoryContext& factory_context, const std::string& stat_prefix,
196
    Init::Manager& init_manager, ProtoTraitsImpl& proto_traits,
197
475
    Rds::RouteConfigProviderManager& manager) {
198
475
  auto provider = manager.addDynamicProvider(
199
475
      rds, rds.route_config_name(), init_manager,
200
475
      [&factory_context, &rds, &stat_prefix, &manager, &proto_traits](uint64_t manager_identifier) {
201
369
        auto config_update =
202
369
            std::make_unique<RouteConfigUpdateReceiverImpl>(proto_traits, factory_context);
203
369
        auto resource_decoder = std::make_shared<
204
369
            Envoy::Config::OpaqueResourceDecoderImpl<envoy::config::route::v3::RouteConfiguration>>(
205
369
            factory_context.messageValidationContext().dynamicValidationVisitor(), "name");
206
369
        auto subscription =
207
369
            THROW_OR_RETURN_VALUE(RdsRouteConfigSubscription::create(
208
369
                                      std::move(config_update), std::move(resource_decoder), rds,
209
369
                                      manager_identifier, factory_context, stat_prefix, manager),
210
369
                                  std::unique_ptr<RdsRouteConfigSubscription>);
211
369
        auto provider =
212
369
            std::make_shared<RdsRouteConfigProviderImpl>(std::move(subscription), factory_context);
213
369
        return std::make_pair(provider, &provider->subscription().initTarget());
214
369
      });
215
475
  ASSERT(dynamic_cast<RouteConfigProvider*>(provider.get()));
216
475
  return std::static_pointer_cast<RouteConfigProvider>(provider);
217
475
}
218

            
219
REGISTER_FACTORY(RdsFactoryImpl, RdsFactory);
220

            
221
} // namespace Router
222
} // namespace Envoy