1
#include "cilium/grpc_subscription.h"
2

            
3
#include <fmt/format.h>
4

            
5
#include <chrono>
6
#include <memory>
7
#include <string>
8
#include <utility>
9
#include <vector>
10

            
11
#include "envoy/annotations/resource.pb.h"
12
#include "envoy/common/exception.h"
13
#include "envoy/common/random_generator.h"
14
#include "envoy/config/core/v3/config_source.pb.h"
15
#include "envoy/config/custom_config_validators.h"
16
#include "envoy/config/subscription.h"
17
#include "envoy/config/subscription_factory.h"
18
#include "envoy/event/dispatcher.h"
19
#include "envoy/grpc/async_client.h"
20
#include "envoy/local_info/local_info.h"
21
#include "envoy/stats/scope.h"
22
#include "envoy/upstream/cluster_manager.h"
23

            
24
#include "source/common/common/assert.h"
25
#include "source/common/common/backoff_strategy.h"
26
#include "source/common/config/utility.h"
27
#include "source/common/grpc/common.h"
28
#include "source/common/protobuf/protobuf.h" // IWYU pragma: keep
29
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
30
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
31

            
32
#include "absl/container/flat_hash_map.h"
33
#include "absl/status/statusor.h"
34
#include "absl/strings/match.h"
35
#include "absl/strings/string_view.h"
36
#include "absl/types/optional.h"
37

            
38
namespace Envoy {
39
namespace Cilium {
40

            
41
namespace {
42

            
43
// service RPC method fully qualified names.
44
struct Service {
45
  std::string sotw_grpc_method_;
46
  std::string delta_grpc_method_;
47
  std::string rest_method_;
48
};
49

            
50
// Map from resource type URL to service RPC methods.
51
using TypeUrlToServiceMap = absl::flat_hash_map<std::string, Service>;
52

            
53
TypeUrlToServiceMap* buildTypeUrlToServiceMap() {
54
  auto* type_url_to_service_map = new TypeUrlToServiceMap();
55
  // This happens once in the lifetime of Envoy. We build a reverse map from
56
  // resource type URL to service methods. We explicitly enumerate all services,
57
  // since DescriptorPool doesn't support iterating over all descriptors, due
58
  // its lazy load design, see
59
  // https://www.mail-archive.com/protobuf@googlegroups.com/msg04540.html.
60
  for (absl::string_view name : {
61
           "cilium.NetworkPolicyDiscoveryService",
62
           "cilium.NetworkPolicyHostsDiscoveryService",
63
       }) {
64
    const auto* service_desc =
65
        Protobuf::DescriptorPool::generated_pool()->FindServiceByName(std::string(name));
66
    // TODO(htuch): this should become an ASSERT once all v3 descriptors are
67
    // linked in.
68
    ASSERT(service_desc != nullptr, fmt::format("{} missing", name));
69
    ASSERT(service_desc->options().HasExtension(envoy::annotations::resource));
70
    const std::string resource_type_url = Grpc::Common::typeUrl(
71
        service_desc->options().GetExtension(envoy::annotations::resource).type());
72
    Service& service = (*type_url_to_service_map)[resource_type_url];
73
    // We populate the service methods that are known below, but it's possible
74
    // that some services don't implement all, e.g. VHDS doesn't support SotW or
75
    // REST.
76
    for (int method_index = 0; method_index < service_desc->method_count(); ++method_index) {
77
      const auto& method_desc = *service_desc->method(method_index);
78
      if (absl::StartsWith(method_desc.name(), "Stream")) {
79
        service.sotw_grpc_method_ = method_desc.full_name();
80
      } else if (absl::StartsWith(method_desc.name(), "Delta")) {
81
        service.delta_grpc_method_ = method_desc.full_name();
82
      } else if (absl::StartsWith(method_desc.name(), "Fetch")) {
83
        service.rest_method_ = method_desc.full_name();
84
      } else {
85
        ASSERT(false, "Unknown xDS service method");
86
      }
87
    }
88
  }
89
  return type_url_to_service_map;
90
}
91

            
92
TypeUrlToServiceMap& typeUrlToServiceMap() {
93
  static TypeUrlToServiceMap* type_url_to_service_map = buildTypeUrlToServiceMap();
94
  return *type_url_to_service_map;
95
}
96

            
97
class NopConfigValidatorsImpl : public Envoy::Config::CustomConfigValidators {
98
public:
99
  NopConfigValidatorsImpl() = default;
100

            
101
  void executeValidators(absl::string_view,
102
                         const std::vector<Envoy::Config::DecodedResourcePtr>&) override {}
103
  void executeValidators(absl::string_view, const std::vector<Envoy::Config::DecodedResourcePtr>&,
104
                         const Protobuf::RepeatedPtrField<std::string>&) override {}
105
};
106

            
107
} // namespace
108

            
109
const Protobuf::MethodDescriptor& deltaGrpcMethod(absl::string_view type_url) {
110
  const auto it = typeUrlToServiceMap().find(static_cast<std::string>(type_url));
111
  ASSERT(it != typeUrlToServiceMap().cend());
112
  return *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
113
      it->second.delta_grpc_method_);
114
}
115

            
116
const Protobuf::MethodDescriptor& sotwGrpcMethod(absl::string_view type_url) {
117
  const auto it = typeUrlToServiceMap().find(static_cast<std::string>(type_url));
118
  ASSERT(it != typeUrlToServiceMap().cend());
119
  return *Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
120
      it->second.sotw_grpc_method_);
121
}
122

            
123
// Hard-coded Cilium gRPC cluster
124
// Note: No rate-limit settings are used, consider if needed.
125
10
envoy::config::core::v3::ConfigSource getCiliumXDSAPIConfig() {
126
10
  auto config_source = envoy::config::core::v3::ConfigSource();
127
  /* config_source.initial_fetch_timeout is set to 50 millliseconds.
128
   * This applies only to SDS Secrets for now, as for NPDS and NPHDS we explicitly set the timeout
129
   * as 0 (no timeout).
130
   */
131
10
  config_source.mutable_initial_fetch_timeout()->set_nanos(50000000);
132
10
  config_source.set_resource_api_version(envoy::config::core::v3::ApiVersion::V3);
133
10
  auto api_config_source = config_source.mutable_api_config_source();
134
10
  api_config_source->set_set_node_on_first_message_only(true);
135
10
  api_config_source->set_api_type(envoy::config::core::v3::ApiConfigSource::GRPC);
136
10
  api_config_source->set_transport_api_version(envoy::config::core::v3::ApiVersion::V3);
137
10
  api_config_source->add_grpc_services()->mutable_envoy_grpc()->set_cluster_name("xds-grpc-cilium");
138
10
  return config_source;
139
10
}
140

            
141
envoy::config::core::v3::ConfigSource cilium_xds_api_config = getCiliumXDSAPIConfig();
142

            
143
std::unique_ptr<Config::GrpcSubscriptionImpl>
144
subscribe(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
145
          Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher,
146
          Random::RandomGenerator& random, Stats::Scope& scope,
147
          Config::SubscriptionCallbacks& callbacks,
148
          Config::OpaqueResourceDecoderSharedPtr resource_decoder,
149
          std::chrono::milliseconds init_fetch_timeout) {
150
  const envoy::config::core::v3::ApiConfigSource& api_config_source =
151
      cilium_xds_api_config.api_config_source();
152
  THROW_IF_NOT_OK(Config::Utility::checkApiConfigSourceSubscriptionBackingCluster(
153
      cm.primaryClusters(), api_config_source));
154

            
155
  Config::SubscriptionStats stats = Config::Utility::generateStats(scope);
156
  Envoy::Config::SubscriptionOptions options;
157

            
158
  // No-op custom validators
159
  Envoy::Config::CustomConfigValidatorsPtr nop_config_validators =
160
      std::make_unique<NopConfigValidatorsImpl>();
161
  auto factory_or_error = Config::Utility::factoryForGrpcApiConfigSource(
162
      cm.grpcAsyncClientManager(), api_config_source, scope, true, 0, false);
163
  THROW_IF_NOT_OK_REF(factory_or_error.status());
164

            
165
  absl::StatusOr<Config::RateLimitSettings> rate_limit_settings_or_error =
166
      Config::Utility::parseRateLimitSettings(api_config_source);
167
  THROW_IF_NOT_OK_REF(rate_limit_settings_or_error.status());
168

            
169
  Config::GrpcMuxContext grpc_mux_context{
170
      /*async_client_=*/THROW_OR_RETURN_VALUE(
171
          factory_or_error.value()->createUncachedRawAsyncClient(), Grpc::RawAsyncClientPtr),
172
      /*failover_async_client_=*/nullptr,
173
      /*dispatcher_=*/dispatcher,
174
      /*service_method_=*/sotwGrpcMethod(type_url),
175
      /*local_info_=*/local_info,
176
      /*rate_limit_settings_=*/rate_limit_settings_or_error.value(),
177
      /*scope_=*/scope,
178
      /*config_validators_=*/std::move(nop_config_validators),
179
      /*xds_resources_delegate_=*/absl::nullopt,
180
      /*xds_config_tracker_=*/absl::nullopt,
181
      /*backoff_strategy_=*/
182
      std::make_unique<JitteredExponentialBackOffStrategy>(
183
          Config::SubscriptionFactory::RetryInitialDelayMs,
184
          Config::SubscriptionFactory::RetryMaxDelayMs, random),
185
      /*target_xds_authority_=*/"",
186
      /*eds_resources_cache_=*/nullptr // EDS cache is only used for ADS.
187
  };
188

            
189
  return std::make_unique<Config::GrpcSubscriptionImpl>(
190
      std::make_shared<GrpcMuxImpl>(grpc_mux_context,
191
                                    api_config_source.set_node_on_first_message_only()),
192
      callbacks, resource_decoder, stats, type_url, dispatcher, init_fetch_timeout,
193
      /*is_aggregated*/ false, options);
194
}
195

            
196
} // namespace Cilium
197
} // namespace Envoy