1
#pragma once
2

            
3
#include <cstdint>
4
#include <memory>
5
#include <queue>
6

            
7
#include "envoy/common/random_generator.h"
8
#include "envoy/common/time.h"
9
#include "envoy/config/custom_config_validators.h"
10
#include "envoy/config/endpoint/v3/endpoint.pb.h"
11
#include "envoy/config/grpc_mux.h"
12
#include "envoy/config/subscription.h"
13
#include "envoy/config/xds_config_tracker.h"
14
#include "envoy/config/xds_resources_delegate.h"
15
#include "envoy/event/dispatcher.h"
16
#include "envoy/grpc/status.h"
17
#include "envoy/service/discovery/v3/discovery.pb.h"
18
#include "envoy/upstream/cluster_manager.h"
19

            
20
#include "source/common/common/cleanup.h"
21
#include "source/common/common/logger.h"
22
#include "source/common/common/utility.h"
23
#include "source/common/config/api_version.h"
24
#include "source/common/config/resource_name.h"
25
#include "source/common/config/ttl.h"
26
#include "source/common/config/utility.h"
27
#include "source/common/config/xds_context_params.h"
28
#include "source/common/config/xds_resource.h"
29
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
30
#include "source/extensions/config_subscription/grpc/grpc_mux_failover.h"
31

            
32
#include "absl/container/node_hash_map.h"
33
#include "xds/core/v3/resource_name.pb.h"
34

            
35
namespace Envoy {
36
namespace Config {
37
/**
38
 * ADS API implementation that fetches via gRPC.
39
 */
40
class GrpcMuxImpl : public GrpcMux,
41
                    public GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>,
42
                    public Logger::Loggable<Logger::Id::config> {
43
public:
44
  explicit GrpcMuxImpl(GrpcMuxContext& grpc_mux_context);
45

            
46
  ~GrpcMuxImpl() override;
47

            
48
  // Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
49
  // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
50
  // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
51
  // would then cause all `GrpcMuxImpl` to be destructed.
52
  // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
53
  static void shutdownAll();
54

            
55
983
  void shutdown() { shutdown_ = true; }
56

            
57
  void start() override;
58

            
59
  // GrpcMux
60
  ScopedResume pause(const std::string& type_url) override;
61
  ScopedResume pause(const std::vector<std::string> type_urls) override;
62

            
63
  GrpcMuxWatchPtr addWatch(const std::string& type_url,
64
                           const absl::flat_hash_set<std::string>& resources,
65
                           SubscriptionCallbacks& callbacks,
66
                           OpaqueResourceDecoderSharedPtr resource_decoder,
67
                           const SubscriptionOptions& options) override;
68

            
69
  void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
70
  }
71

            
72
157
  EdsResourcesCacheOptRef edsResourcesCache() override {
73
157
    return makeOptRefFromPtr(eds_resources_cache_.get());
74
157
  }
75

            
76
  absl::Status
77
  updateMuxSource(Grpc::RawAsyncClientSharedPtr&& primary_async_client,
78
                  Grpc::RawAsyncClientSharedPtr&& failover_async_client, Stats::Scope& scope,
79
                  BackOffStrategyPtr&& backoff_strategy,
80
                  const envoy::config::core::v3::ApiConfigSource& ads_config_source) override;
81

            
82
  Upstream::LoadStatsReporter* maybeCreateLoadStatsReporter() override;
83
  Upstream::LoadStatsReporter* loadStatsReporter() const override;
84

            
85
  void handleDiscoveryResponse(
86
      std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);
87

            
88
  // Config::GrpcStreamCallbacks
89
  void onStreamEstablished() override;
90
  void onEstablishmentFailure(bool) override;
91
  void
92
  onDiscoveryResponse(std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
93
                      ControlPlaneStats& control_plane_stats) override;
94
  void onWriteable() override;
95

            
96
  GrpcStreamInterface<envoy::service::discovery::v3::DiscoveryRequest,
97
                      envoy::service::discovery::v3::DiscoveryResponse>&
98
999
  grpcStreamForTest() {
99
    // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
100
    // return grpc_stream_.currentStreamForTest() directly (defined in GrpcMuxFailover).
101
999
    if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
102
499
      return dynamic_cast<GrpcMuxFailover<envoy::service::discovery::v3::DiscoveryRequest,
103
499
                                          envoy::service::discovery::v3::DiscoveryResponse>*>(
104
499
                 grpc_stream_.get())
105
499
          ->currentStreamForTest();
106
499
    }
107
500
    return *grpc_stream_.get();
108
999
  }
109

            
110
private:
111
  // Helper function to create the grpc_stream_ object.
112
  std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DiscoveryRequest,
113
                                      envoy::service::discovery::v3::DiscoveryResponse>>
114
  createGrpcStreamObject(Grpc::RawAsyncClientSharedPtr&& async_client,
115
                         Grpc::RawAsyncClientSharedPtr&& failover_async_client,
116
                         const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope,
117
                         BackOffStrategyPtr&& backoff_strategy,
118
                         const RateLimitSettings& rate_limit_settings);
119

            
120
  void drainRequests();
121
  void setRetryTimer();
122
  void sendDiscoveryRequest(absl::string_view type_url);
123
  // Clears the nonces of all subscribed types in this gRPC mux.
124
  void clearNonce();
125

            
126
  struct GrpcMuxWatchImpl : public GrpcMuxWatch {
127
    GrpcMuxWatchImpl(const absl::flat_hash_set<std::string>& resources,
128
                     SubscriptionCallbacks& callbacks,
129
                     OpaqueResourceDecoderSharedPtr resource_decoder, const std::string& type_url,
130
                     GrpcMuxImpl& parent, const SubscriptionOptions& options,
131
                     const LocalInfo::LocalInfo& local_info,
132
                     EdsResourcesCacheOptRef eds_resources_cache)
133
1422
        : callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url),
134
1422
          parent_(parent), subscription_options_(options), local_info_(local_info),
135
1422
          watches_(parent.apiStateFor(type_url).watches_),
136
1422
          eds_resources_cache_(eds_resources_cache) {
137
1422
      updateResources(resources);
138
      // If eds resources cache is provided, then the type must be ClusterLoadAssignment.
139
1422
      ASSERT(
140
1422
          !eds_resources_cache_.has_value() ||
141
1422
          (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>()));
142
1422
    }
143

            
144
1422
    ~GrpcMuxWatchImpl() override {
145
1422
      watches_.erase(iter_);
146
1422
      if (!resources_.empty()) {
147
829
        parent_.queueDiscoveryRequest(type_url_);
148
829
        if (eds_resources_cache_.has_value()) {
149
173
          removeResourcesFromCache(resources_);
150
173
        }
151
829
      }
152
1422
    }
153

            
154
7
    void update(const absl::flat_hash_set<std::string>& resources) override {
155
7
      watches_.erase(iter_);
156
7
      if (!resources_.empty()) {
157
7
        parent_.queueDiscoveryRequest(type_url_);
158
7
      }
159
7
      updateResources(resources);
160
7
      parent_.queueDiscoveryRequest(type_url_);
161
7
    }
162

            
163
    // Maintain deterministic wire ordering via ordered std::set.
164
    std::set<std::string> resources_;
165
    SubscriptionCallbacks& callbacks_;
166
    OpaqueResourceDecoderSharedPtr resource_decoder_;
167
    const std::string type_url_;
168
    GrpcMuxImpl& parent_;
169

            
170
  private:
171
1429
    void updateResources(const absl::flat_hash_set<std::string>& resources) {
172
      // Finding the list of removed resources by keeping the current resources
173
      // set until the end the function and computing the diff.
174
      // Temporarily keep the resources prior to the update to find which ones
175
      // were removed.
176
1429
      std::set<std::string> previous_resources;
177
1429
      previous_resources.swap(resources_);
178
1429
      std::transform(
179
1429
          resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()),
180
1466
          [this](const std::string& resource_name) -> std::string {
181
889
            if (XdsResourceIdentifier::hasXdsTpScheme(resource_name)) {
182
37
              auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
183
37
              THROW_IF_NOT_OK_REF(xdstp_resource_or_error.status());
184
37
              auto xdstp_resource = xdstp_resource_or_error.value();
185
37
              if (subscription_options_.add_xdstp_node_context_params_) {
186
8
                const auto context = XdsContextParams::encodeResource(
187
8
                    local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
188
8
                xdstp_resource.mutable_context()->CopyFrom(context);
189
8
              }
190
37
              XdsResourceIdentifier::EncodeOptions encode_options;
191
37
              encode_options.sort_context_params_ = true;
192
37
              return XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options);
193
37
            }
194
852
            return resource_name;
195
889
          });
196
1429
      if (eds_resources_cache_.has_value()) {
197
        // Compute the removed resources and remove them from the cache.
198
175
        std::set<std::string> removed_resources;
199
175
        std::set_difference(previous_resources.begin(), previous_resources.end(),
200
175
                            resources_.begin(), resources_.end(),
201
175
                            std::inserter(removed_resources, removed_resources.begin()));
202
175
        removeResourcesFromCache(removed_resources);
203
175
      }
204
      // move this watch to the beginning of the list
205
1429
      iter_ = watches_.emplace(watches_.begin(), this);
206
1429
    }
207

            
208
348
    void removeResourcesFromCache(const std::set<std::string>& resources_to_remove) {
209
348
      ASSERT(eds_resources_cache_.has_value());
210
      // Iterate over the resources to remove, and if no other watcher
211
      // registered for that resource, remove it from the cache.
212
348
      for (const auto& resource_name : resources_to_remove) {
213
        // Counts the number of watchers that watch the resource.
214
175
        uint32_t resource_watchers_count = 0;
215
175
        for (const auto& watch : watches_) {
216
          // Skip the current watcher as it is intending to remove the resource.
217
72
          if (watch == this) {
218
            continue;
219
          }
220
72
          if (watch->resources_.find(resource_name) != watch->resources_.end()) {
221
24
            resource_watchers_count++;
222
24
          }
223
72
        }
224
        // Other than "this" watcher, the resource is not watched by any other
225
        // watcher, so it can be removed.
226
175
        if (resource_watchers_count == 0) {
227
157
          eds_resources_cache_->removeResource(resource_name);
228
157
        }
229
175
      }
230
348
    }
231

            
232
    using WatchList = std::list<GrpcMuxWatchImpl*>;
233
    const SubscriptionOptions& subscription_options_;
234
    const LocalInfo::LocalInfo& local_info_;
235
    WatchList& watches_;
236
    WatchList::iterator iter_;
237
    // Optional cache for the specific ClusterLoadAssignments of this watch.
238
    EdsResourcesCacheOptRef eds_resources_cache_;
239
  };
240

            
241
  // Per muxed API state.
242
  struct ApiState {
243
    ApiState(Event::Dispatcher& dispatcher,
244
             std::function<void(const std::vector<std::string>&)> callback)
245
1799
        : ttl_(callback, dispatcher, dispatcher.timeSource()) {}
246

            
247
8297
    bool paused() const { return pauses_ > 0; }
248

            
249
    // Watches on the returned resources for the API;
250
    std::list<GrpcMuxWatchImpl*> watches_;
251
    // Current DiscoveryRequest for API.
252
    envoy::service::discovery::v3::DiscoveryRequest request_;
253
    // Count of unresumed pause() invocations.
254
    uint32_t pauses_{};
255
    // Was a DiscoveryRequest elided during a pause?
256
    bool pending_{};
257
    // Has this API been tracked in subscriptions_?
258
    bool subscribed_{};
259
    // This resource type must have a Node sent at next request.
260
    bool must_send_node_{};
261
    TtlManager ttl_;
262
    // The identifier for the server that sent the most recent response, or
263
    // empty if there is none.
264
    std::string control_plane_identifier_{};
265
    // If true, xDS resources were previously fetched from an xDS source or an xDS delegate.
266
    bool previously_fetched_data_{false};
267
  };
268

            
269
2008
  bool isHeartbeatResource(const std::string& type_url, const DecodedResource& resource) {
270
2008
    return !resource.hasResource() &&
271
2008
           resource.version() == apiStateFor(type_url).request_.version_info();
272
2008
  }
273
  void expiryCallback(absl::string_view type_url, const std::vector<std::string>& expired);
274
  // Request queue management logic.
275
  void queueDiscoveryRequest(absl::string_view queue_item);
276
  // Invoked when dynamic context parameters change for a resource type.
277
  void onDynamicContextUpdate(absl::string_view resource_type_url);
278
  // Must be invoked from the main or test thread.
279
  void loadConfigFromDelegate(const std::string& type_url,
280
                              const absl::flat_hash_set<std::string>& resource_names);
281
  // Must be invoked from the main or test thread.
282
  void processDiscoveryResources(const std::vector<DecodedResourcePtr>& resources,
283
                                 ApiState& api_state, const std::string& type_url,
284
                                 const std::string& version_info, bool call_delegate);
285

            
286
  Event::Dispatcher& dispatcher_;
287
  // Multiplexes the stream to the primary and failover sources.
288
  // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
289
  // convert from unique_ptr<GrpcStreamInterface> to GrpcMuxFailover directly.
290
  std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DiscoveryRequest,
291
                                      envoy::service::discovery::v3::DiscoveryResponse>>
292
      grpc_stream_;
293
  const LocalInfo::LocalInfo& local_info_;
294
  const bool skip_subsequent_node_;
295
  CustomConfigValidatorsPtr config_validators_;
296
  XdsConfigTrackerOptRef xds_config_tracker_;
297
  XdsResourcesDelegateOptRef xds_resources_delegate_;
298
  EdsResourcesCachePtr eds_resources_cache_;
299
  const std::string target_xds_authority_;
300
  // A Load-Stats-Reporter factory method that allows to lazily create the
301
  // reporter if needed.
302
  std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> load_stats_reporter_factory_;
303
  // The load stats reporter, lazily created.
304
  std::unique_ptr<Upstream::LoadStatsReporter> lrs_server_;
305
  bool first_stream_request_{true};
306

            
307
  // Helper function for looking up and potentially allocating a new ApiState.
308
  ApiState& apiStateFor(absl::string_view type_url);
309

            
310
  absl::node_hash_map<std::string, std::unique_ptr<ApiState>> api_state_;
311

            
312
  // Envoy's dependency ordering.
313
  std::list<std::string> subscriptions_;
314

            
315
  // A queue to store requests while rate limited. Note that when requests
316
  // cannot be sent due to the gRPC stream being down, this queue does not
317
  // store them; rather, they are simply dropped. This string is a type
318
  // URL.
319
  std::unique_ptr<std::queue<std::string>> request_queue_;
320

            
321
  Common::CallbackHandlePtr dynamic_update_callback_handle_;
322

            
323
  bool started_{false};
324
  // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
325
  // true because it may contain dangling pointers.
326
  std::atomic<bool> shutdown_{false};
327
};
328

            
329
using GrpcMuxImplPtr = std::unique_ptr<GrpcMuxImpl>;
330
using GrpcMuxImplSharedPtr = std::shared_ptr<GrpcMuxImpl>;
331

            
332
class GrpcMuxFactory;
333
DECLARE_FACTORY(GrpcMuxFactory);
334

            
335
} // namespace Config
336
} // namespace Envoy