1
#include "source/common/router/rds_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7

            
8
#include "envoy/admin/v3/config_dump.pb.h"
9
#include "envoy/config/core/v3/config_source.pb.h"
10
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
11
#include "envoy/service/discovery/v3/discovery.pb.h"
12

            
13
#include "source/common/common/assert.h"
14
#include "source/common/common/fmt.h"
15
#include "source/common/config/api_version.h"
16
#include "source/common/config/utility.h"
17
#include "source/common/http/header_map_impl.h"
18
#include "source/common/protobuf/utility.h"
19
#include "source/common/router/config_impl.h"
20
#include "source/common/router/route_config_update_receiver_impl.h"
21

            
22
namespace Envoy {
23
namespace Router {
24

            
25
absl::StatusOr<std::unique_ptr<RdsRouteConfigSubscription>> RdsRouteConfigSubscription::create(
26
    RouteConfigUpdatePtr&& config_update,
27
    Envoy::Config::OpaqueResourceDecoderSharedPtr&& resource_decoder,
28
    const envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
29
    const uint64_t manager_identifier, Server::Configuration::ServerFactoryContext& factory_context,
30
    const std::string& stat_prefix,
31
395
    Rds::RouteConfigProviderManager& route_config_provider_manager) {
32
395
  absl::Status creation_status = absl::OkStatus();
33
395
  auto ret = std::unique_ptr<RdsRouteConfigSubscription>(new RdsRouteConfigSubscription(
34
395
      std::move(config_update), std::move(resource_decoder), rds, manager_identifier,
35
395
      factory_context, stat_prefix, route_config_provider_manager, creation_status));
36
395
  RETURN_IF_NOT_OK(creation_status);
37
395
  return ret;
38
395
}
39

            
40
// TODO(htuch): If support for multiple clusters is added per #1170 cluster_name_
41
RdsRouteConfigSubscription::RdsRouteConfigSubscription(
42
    RouteConfigUpdatePtr&& config_update,
43
    Envoy::Config::OpaqueResourceDecoderSharedPtr&& resource_decoder,
44
    const envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
45
    const uint64_t manager_identifier, Server::Configuration::ServerFactoryContext& factory_context,
46
    const std::string& stat_prefix, Rds::RouteConfigProviderManager& route_config_provider_manager,
47
    absl::Status& creation_status)
48
395
    : Rds::RdsRouteConfigSubscription(std::move(config_update), std::move(resource_decoder),
49
395
                                      rds.config_source(), rds.route_config_name(),
50
395
                                      manager_identifier, factory_context, stat_prefix + "rds.",
51
395
                                      "RDS", route_config_provider_manager, creation_status),
52
395
      config_update_info_(static_cast<RouteConfigUpdateReceiver*>(
53
395
          Rds::RdsRouteConfigSubscription::config_update_info_.get())) {}
54

            
55
395
RdsRouteConfigSubscription::~RdsRouteConfigSubscription() { config_update_info_.release(); }
56

            
57
absl::Status RdsRouteConfigSubscription::beforeProviderUpdate(
58
402
    std::unique_ptr<Init::ManagerImpl>& noop_init_manager, std::unique_ptr<Cleanup>& resume_rds) {
59
402
  if (config_update_info_->protobufConfigurationCast().has_vhds() &&
60
402
      config_update_info_->vhdsConfigurationChanged()) {
61
65
    ENVOY_LOG(debug,
62
65
              "rds: vhds configuration present/changed, (re)starting vhds: config_name={} hash={}",
63
65
              route_config_name_, routeConfigUpdate()->configHash());
64
65
    ASSERT(config_update_info_->configInfo().has_value());
65
65
    maybeCreateInitManager(routeConfigUpdate()->configInfo().value().version_, noop_init_manager,
66
65
                           resume_rds);
67
65
    auto subscription_or_error = VhdsSubscription::createVhdsSubscription(
68
65
        config_update_info_, factory_context_, stat_prefix_, route_config_provider_);
69
65
    RETURN_IF_NOT_OK_REF(subscription_or_error.status());
70
65
    vhds_subscription_ = std::move(subscription_or_error.value());
71
65
    vhds_subscription_->registerInitTargetWithInitManager(
72
65
        noop_init_manager == nullptr ? local_init_manager_ : *noop_init_manager);
73
65
  }
74
402
  return absl::OkStatus();
75
402
}
76

            
77
402
absl::Status RdsRouteConfigSubscription::afterProviderUpdate() {
78
  // RDS update removed VHDS configuration
79
402
  if (!config_update_info_->protobufConfigurationCast().has_vhds()) {
80
333
    vhds_subscription_.release();
81
333
  }
82

            
83
402
  return update_callback_manager_.runCallbacks();
84
402
}
85

            
86
// Initialize a no-op InitManager in case the one in the factory_context has completed
87
// initialization. This can happen if an RDS config update for an already established RDS
88
// subscription contains VHDS configuration.
89
void RdsRouteConfigSubscription::maybeCreateInitManager(
90
    const std::string& version_info, std::unique_ptr<Init::ManagerImpl>& init_manager,
91
67
    std::unique_ptr<Cleanup>& init_vhds) {
92
67
  if (local_init_manager_.state() == Init::Manager::State::Initialized) {
93
17
    init_manager = std::make_unique<Init::ManagerImpl>(
94
17
        fmt::format("VHDS {}:{}", route_config_name_, version_info));
95
17
    init_vhds = std::make_unique<Cleanup>([this, &init_manager, version_info] {
96
      // For new RDS subscriptions created after listener warming up, we don't wait for them to warm
97
      // up.
98
17
      Init::WatcherImpl noop_watcher(
99
          // Note: we just throw it away.
100
17
          fmt::format("VHDS ConfigUpdate watcher {}:{}", route_config_name_, version_info),
101
17
          []() { /*Do nothing.*/ });
102
17
      init_manager->initialize(noop_watcher);
103
17
    });
104
17
  }
105
67
}
106

            
107
48
void RdsRouteConfigSubscription::updateOnDemand(const std::string& aliases) {
108
48
  if (vhds_subscription_.get() == nullptr) {
109
    return;
110
  }
111
48
  vhds_subscription_->updateOnDemand(aliases);
112
48
}
113

            
114
RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl(
115
    RdsRouteConfigSubscriptionSharedPtr&& subscription,
116
    Server::Configuration::ServerFactoryContext& factory_context)
117
395
    : base_(subscription, factory_context), config_update_info_(subscription->routeConfigUpdate()),
118
395
      factory_context_(factory_context) {
119
  // The subscription referenced by the 'base_' and by 'this' is the same.
120
  // In it the provider is already set by the 'base_' so it points to that.
121
  // Need to set again to point to 'this'.
122
395
  base_.subscription().routeConfigProvider() = this;
123
395
}
124

            
125
589
RdsRouteConfigSubscription& RdsRouteConfigProviderImpl::subscription() {
126
589
  return static_cast<RdsRouteConfigSubscription&>(base_.subscription());
127
589
}
128

            
129
530
absl::Status RdsRouteConfigProviderImpl::onConfigUpdate() {
130
530
  auto status = base_.onConfigUpdate();
131
530
  if (!status.ok()) {
132
    return status;
133
  }
134

            
135
530
  const auto aliases = config_update_info_->resourceIdsInLastVhdsUpdate();
136
  // Regular (non-VHDS) RDS updates don't populate aliases fields in resources.
137
530
  if (aliases.empty()) {
138
408
    return absl::OkStatus();
139
408
  }
140

            
141
122
  const auto config =
142
122
      std::static_pointer_cast<const ConfigImpl>(config_update_info_->parsedConfiguration());
143
  // Notifies connections that RouteConfiguration update has been propagated.
144
  // Callbacks processing is performed in FIFO order. The callback is skipped if alias used in
145
  // the VHDS update request do not match the aliases in the update response
146
170
  for (auto it = config_update_callbacks_.begin(); it != config_update_callbacks_.end();) {
147
48
    auto found = aliases.find(it->alias_);
148
48
    if (found != aliases.end()) {
149
      // TODO(dmitri-d) HeaderMapImpl is expensive, need to profile this
150
48
      auto host_header = Http::RequestHeaderMapImpl::create();
151
48
      host_header->setHost(VhdsSubscription::aliasToDomainName(it->alias_));
152
48
      const bool host_exists = config->virtualHostExists(*host_header);
153
48
      std::weak_ptr<Http::RouteConfigUpdatedCallback> current_cb(it->cb_);
154
48
      it->thread_local_dispatcher_.post([current_cb, host_exists] {
155
48
        if (auto cb = current_cb.lock()) {
156
48
          (*cb)(host_exists);
157
48
        }
158
48
      });
159
48
      it = config_update_callbacks_.erase(it);
160
48
    } else {
161
      it++;
162
    }
163
48
  }
164
122
  return absl::OkStatus();
165
530
}
166

            
167
865
ConfigConstSharedPtr RdsRouteConfigProviderImpl::configCast() const {
168
865
  ASSERT(dynamic_cast<const Config*>(RdsRouteConfigProviderImpl::config().get()));
169
865
  return std::static_pointer_cast<const Config>(RdsRouteConfigProviderImpl::config());
170
865
}
171

            
172
// Schedules a VHDS request on the main thread and queues up the callback to use when the VHDS
173
// response has been propagated to the worker thread that was the request origin.
174
void RdsRouteConfigProviderImpl::requestVirtualHostsUpdate(
175
    const std::string& for_domain, Event::Dispatcher& thread_local_dispatcher,
176
49
    std::weak_ptr<Http::RouteConfigUpdatedCallback> route_config_updated_cb) {
177
49
  auto alias = VhdsSubscription::domainNameToAlias(
178
49
      config_update_info_->protobufConfigurationCast().name(), for_domain);
179
  // The RdsRouteConfigProviderImpl instance can go away before the dispatcher has a chance to
180
  // execute the callback. still_alive shared_ptr will be deallocated when the current instance of
181
  // the RdsRouteConfigProviderImpl is deallocated; we rely on a weak_ptr to still_alive flag to
182
  // determine if the RdsRouteConfigProviderImpl instance is still valid.
183
49
  factory_context_.mainThreadDispatcher().post(
184
49
      [this, maybe_still_alive = std::weak_ptr<bool>(still_alive_), alias, &thread_local_dispatcher,
185
49
       route_config_updated_cb]() -> void {
186
49
        if (maybe_still_alive.lock()) {
187
48
          subscription().updateOnDemand(alias);
188
48
          config_update_callbacks_.push_back(
189
48
              {alias, thread_local_dispatcher, route_config_updated_cb});
190
48
        }
191
49
      });
192
49
}
193
RouteConfigProviderSharedPtr RdsFactoryImpl::createRdsRouteConfigProvider(
194
    const envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
195
    Server::Configuration::ServerFactoryContext& factory_context, const std::string& stat_prefix,
196
    Init::Manager& init_manager, ProtoTraitsImpl& proto_traits,
197
501
    Rds::RouteConfigProviderManager& manager) {
198
501
  auto provider = manager.addDynamicProvider(
199
501
      rds, rds.route_config_name(), init_manager,
200
501
      [&factory_context, &rds, &stat_prefix, &manager, &proto_traits](uint64_t manager_identifier) {
201
395
        auto config_update =
202
395
            std::make_unique<RouteConfigUpdateReceiverImpl>(proto_traits, factory_context);
203
395
        auto resource_decoder = std::make_shared<
204
395
            Envoy::Config::OpaqueResourceDecoderImpl<envoy::config::route::v3::RouteConfiguration>>(
205
395
            factory_context.messageValidationContext().dynamicValidationVisitor(), "name");
206
395
        auto subscription =
207
395
            THROW_OR_RETURN_VALUE(RdsRouteConfigSubscription::create(
208
395
                                      std::move(config_update), std::move(resource_decoder), rds,
209
395
                                      manager_identifier, factory_context, stat_prefix, manager),
210
395
                                  std::unique_ptr<RdsRouteConfigSubscription>);
211
395
        auto provider =
212
395
            std::make_shared<RdsRouteConfigProviderImpl>(std::move(subscription), factory_context);
213
395
        return std::make_pair(provider, &provider->subscription().initTarget());
214
395
      });
215
501
  ASSERT(dynamic_cast<RouteConfigProvider*>(provider.get()));
216
501
  return std::static_pointer_cast<RouteConfigProvider>(provider);
217
501
}
218

            
219
REGISTER_FACTORY(RdsFactoryImpl, RdsFactory);
220

            
221
} // namespace Router
222
} // namespace Envoy