1
#include "source/common/router/vhds.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7

            
8
#include "envoy/config/core/v3/config_source.pb.h"
9
#include "envoy/config/subscription.h"
10
#include "envoy/service/discovery/v3/discovery.pb.h"
11

            
12
#include "source/common/common/assert.h"
13
#include "source/common/common/fmt.h"
14
#include "source/common/config/api_version.h"
15
#include "source/common/config/utility.h"
16
#include "source/common/grpc/common.h"
17
#include "source/common/protobuf/utility.h"
18
#include "source/common/router/config_impl.h"
19

            
20
namespace Envoy {
21
namespace Router {
22

            
23
absl::StatusOr<VhdsSubscriptionPtr> VhdsSubscription::createVhdsSubscription(
24
    RouteConfigUpdatePtr& config_update_info,
25
    Server::Configuration::ServerFactoryContext& factory_context, const std::string& stat_prefix,
26
47
    Rds::RouteConfigProvider* route_config_provider) {
27
47
  const auto& vhds_config_source =
28
47
      config_update_info->protobufConfigurationCast().vhds().config_source();
29
  // VHDS only supports Delta xDS. This can be specified either explicitly via DELTA_GRPC
30
  // or implicitly by using ADS when the parent ADS stream is in Delta mode.
31
47
  const bool is_ads = vhds_config_source.config_source_specifier_case() ==
32
47
                      envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kAds;
33
47
  const bool is_delta_grpc = vhds_config_source.has_api_config_source() &&
34
47
                             vhds_config_source.api_config_source().api_type() ==
35
36
                                 envoy::config::core::v3::ApiConfigSource::DELTA_GRPC;
36

            
37
47
  if (!is_ads && !is_delta_grpc) {
38
2
    return absl::InvalidArgumentError(
39
2
        "vhds: only 'DELTA_GRPC' or 'ADS' (which uses Delta xDS) is supported as a config source.");
40
2
  }
41

            
42
  // If using ADS, verify the parent ADS stream is in Delta mode
43
45
  if (is_ads) {
44
11
    const auto& bootstrap = factory_context.bootstrap();
45
11
    if (!bootstrap.has_dynamic_resources() || !bootstrap.dynamic_resources().has_ads_config()) {
46
1
      return absl::InvalidArgumentError(
47
1
          "vhds: ADS config source specified but no ADS configured in bootstrap.");
48
1
    }
49
10
    const auto& ads_config = bootstrap.dynamic_resources().ads_config();
50
10
    if (ads_config.api_type() != envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
51
1
      return absl::InvalidArgumentError(
52
1
          "vhds: ADS must use DELTA_GRPC api_type when used as VHDS config source.");
53
1
    }
54
10
  }
55

            
56
43
  auto status = absl::OkStatus();
57
43
  auto ret = std::unique_ptr<VhdsSubscription>(new VhdsSubscription(
58
43
      config_update_info, factory_context, stat_prefix, route_config_provider, status));
59
43
  RETURN_IF_ERROR(status);
60
43
  return ret;
61
43
}
62

            
63
// Implements callbacks to handle DeltaDiscovery protocol for VirtualHostDiscoveryService
64
VhdsSubscription::VhdsSubscription(RouteConfigUpdatePtr& config_update_info,
65
                                   Server::Configuration::ServerFactoryContext& factory_context,
66
                                   const std::string& stat_prefix,
67
                                   Rds::RouteConfigProvider* route_config_provider,
68
                                   absl::Status& status)
69
43
    : Envoy::Config::SubscriptionBase<envoy::config::route::v3::VirtualHost>(
70
43
          factory_context.messageValidationContext().dynamicValidationVisitor(), "name"),
71
43
      config_update_info_(config_update_info),
72
43
      scope_(factory_context.scope().createScope(
73
43
          stat_prefix + "vhds." + config_update_info_->protobufConfigurationCast().name() + ".")),
74
43
      stats_({ALL_VHDS_STATS(POOL_COUNTER(*scope_))}),
75
43
      init_target_(fmt::format("VhdsConfigSubscription {}",
76
43
                               config_update_info_->protobufConfigurationCast().name()),
77
43
                   [this]() {
78
39
                     subscription_->start(
79
39
                         {config_update_info_->protobufConfigurationCast().name()});
80
39
                   }),
81
43
      route_config_provider_(route_config_provider) {
82
43
  const auto resource_name = getResourceName();
83
43
  Envoy::Config::SubscriptionOptions options;
84
43
  options.use_namespace_matching_ = true;
85
43
  absl::StatusOr<Envoy::Config::SubscriptionPtr> status_or =
86
43
      factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
87
43
          config_update_info_->protobufConfigurationCast().vhds().config_source(),
88
43
          Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, options);
89
43
  SET_AND_RETURN_IF_NOT_OK(status_or.status(), status);
90
43
  subscription_ = std::move(status_or.value());
91
43
}
92

            
93
24
void VhdsSubscription::updateOnDemand(const std::string& with_route_config_name_prefix) {
94
24
  subscription_->requestOnDemandUpdate({with_route_config_name_prefix});
95
24
}
96

            
97
void VhdsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
98
2
                                            const EnvoyException*) {
99
2
  ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
100
  // We need to allow server startup to continue, even if we have a bad
101
  // config.
102
2
  init_target_.ready();
103
2
}
104

            
105
absl::Status VhdsSubscription::onConfigUpdate(
106
    const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
107
    const Protobuf::RepeatedPtrField<std::string>& removed_resources,
108
76
    const std::string& version_info) {
109
76
  RouteConfigUpdateReceiver::VirtualHostRefVector added_vhosts;
110
76
  std::set<std::string> added_resource_ids;
111
82
  for (const auto& resource : added_resources) {
112
82
    added_resource_ids.emplace(resource.get().name());
113
82
    std::copy(resource.get().aliases().begin(), resource.get().aliases().end(),
114
82
              std::inserter(added_resource_ids, added_resource_ids.end()));
115
    // the management server returns empty resources (they contain no virtual hosts in this case)
116
    // for aliases that it couldn't resolve.
117
82
    if (!resource.get().hasResource()) {
118
4
      continue;
119
4
    }
120
78
    added_vhosts.emplace_back(
121
78
        dynamic_cast<const envoy::config::route::v3::VirtualHost&>(resource.get().resource()));
122
78
  }
123
76
  if (config_update_info_->onVhdsUpdate(added_vhosts, std::move(added_resource_ids),
124
76
                                        removed_resources, version_info)) {
125
74
    stats_.config_reload_.inc();
126
74
    ENVOY_LOG(debug, "vhds: loading new configuration: config_name={} hash={}",
127
74
              config_update_info_->protobufConfigurationCast().name(),
128
74
              config_update_info_->configHash());
129
74
    if (route_config_provider_ != nullptr) {
130
72
      RETURN_IF_NOT_OK(route_config_provider_->onConfigUpdate());
131
72
    }
132
74
  }
133

            
134
76
  init_target_.ready();
135
76
  return absl::OkStatus();
136
76
}
137

            
138
} // namespace Router
139
} // namespace Envoy