1
#pragma once
2

            
3
#include <cstdint>
4
#include <memory>
5
#include <queue>
6

            
7
#include "envoy/common/random_generator.h"
8
#include "envoy/common/time.h"
9
#include "envoy/common/token_bucket.h"
10
#include "envoy/config/custom_config_validators.h"
11
#include "envoy/config/grpc_mux.h"
12
#include "envoy/config/subscription.h"
13
#include "envoy/config/xds_config_tracker.h"
14
#include "envoy/config/xds_resources_delegate.h"
15
#include "envoy/event/dispatcher.h"
16
#include "envoy/grpc/status.h"
17
#include "envoy/service/discovery/v3/discovery.pb.h"
18
#include "envoy/upstream/cluster_manager.h"
19

            
20
#include "source/common/common/logger.h"
21
#include "source/common/common/utility.h"
22
#include "source/common/config/api_version.h"
23
#include "source/common/grpc/common.h"
24
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
25
#include "source/extensions/config_subscription/grpc/grpc_mux_failover.h"
26
#include "source/extensions/config_subscription/grpc/pausable_ack_queue.h"
27
#include "source/extensions/config_subscription/grpc/watch_map.h"
28
#include "source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h"
29
#include "source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h"
30

            
31
#include "absl/container/node_hash_map.h"
32

            
33
namespace Envoy {
34
namespace Config {
35
namespace XdsMux {
36

            
37
class ShutdownableMux {
38
public:
39
259
  virtual ~ShutdownableMux() = default;
40
  virtual void shutdown() PURE;
41
};
42

            
43
// Manages subscriptions to one or more type of resource. The logical protocol
44
// state of those subscription(s) is handled by SubscriptionState.
45
// This class owns the GrpcStream used to talk to the server, maintains queuing
46
// logic to properly order the subscription(s)' various messages, and allows
47
// starting/stopping/pausing of the subscriptions.
48
//
49
// @tparam S SubscriptionState state type, either SotwSubscriptionState or DeltaSubscriptionState
50
// @tparam F SubscriptionStateFactory type, either SotwSubscriptionStateFactory or
51
// DeltaSubscriptionStateFactory
52
// @tparam RQ Xds request type, either envoy::service::discovery::v3::DiscoveryRequest or
53
// envoy::service::discovery::v3::DeltaDiscoveryRequest
54
// @tparam RS Xds response type, either envoy::service::discovery::v3::DiscoveryResponse or
55
// envoy::service::discovery::v3::DeltaDiscoveryResponse
56
//
57
template <class S, class F, class RQ, class RS>
58
class GrpcMuxImpl : public GrpcStreamCallbacks<RS>,
59
                    public GrpcMux,
60
                    public ShutdownableMux,
61
                    Logger::Loggable<Logger::Id::config> {
62
public:
63
  GrpcMuxImpl(std::unique_ptr<F> subscription_state_factory, GrpcMuxContext& grpc_mux_context);
64

            
65
  ~GrpcMuxImpl() override;
66

            
67
  // Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
68
  // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
69
  // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
70
  // would then cause all `GrpcMuxImpl` to be destructed.
71
  // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
72
  static void shutdownAll();
73

            
74
296
  void shutdown() override { shutdown_ = true; }
75
4
  bool isShutdown() { return shutdown_; }
76

            
77
  // TODO (dmitri-d) return a naked pointer instead of the wrapper once the legacy mux has been
78
  // removed and the mux interface can be changed
79
  Config::GrpcMuxWatchPtr addWatch(const std::string& type_url,
80
                                   const absl::flat_hash_set<std::string>& resources,
81
                                   SubscriptionCallbacks& callbacks,
82
                                   OpaqueResourceDecoderSharedPtr resource_decoder,
83
                                   const SubscriptionOptions& options) override;
84
  void updateWatch(const std::string& type_url, Watch* watch,
85
                   const absl::flat_hash_set<std::string>& resources,
86
                   const SubscriptionOptions& options);
87
  void removeWatch(const std::string& type_url, Watch* watch);
88

            
89
  ScopedResume pause(const std::string& type_url) override;
90
  ScopedResume pause(const std::vector<std::string> type_urls) override;
91
  void start() override;
92
2
  const absl::flat_hash_map<std::string, std::unique_ptr<S>>& subscriptions() const {
93
2
    return subscriptions_;
94
2
  }
95

            
96
  // GrpcStreamCallbacks
97
362
  void onStreamEstablished() override { handleEstablishedStream(); }
98
266
  void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override {
99
266
    handleStreamEstablishmentFailure(next_attempt_may_send_initial_resource_version);
100
266
  }
101
2
  void onWriteable() override { trySendDiscoveryRequests(); }
102
  void onDiscoveryResponse(std::unique_ptr<RS>&& message,
103
1363
                           ControlPlaneStats& control_plane_stats) override {
104
1363
    genericHandleResponse(message->type_url(), *message, control_plane_stats);
105
1363
  }
106

            
107
  absl::Status
108
  updateMuxSource(Grpc::RawAsyncClientSharedPtr&& primary_async_client,
109
                  Grpc::RawAsyncClientSharedPtr&& failover_async_client, Stats::Scope& scope,
110
                  BackOffStrategyPtr&& backoff_strategy,
111
                  const envoy::config::core::v3::ApiConfigSource& ads_config_source) override;
112

            
113
48
  EdsResourcesCacheOptRef edsResourcesCache() override {
114
48
    return makeOptRefFromPtr(eds_resources_cache_.get());
115
48
  }
116

            
117
  // TODO(adisuissa): finish implementation.
118
4
  Upstream::LoadStatsReporter* loadStatsReporter() const override { return nullptr; }
119
4
  Upstream::LoadStatsReporter* maybeCreateLoadStatsReporter() override { return nullptr; }
120

            
121
13
  GrpcStreamInterface<RQ, RS>& grpcStreamForTest() {
122
    // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
123
    // return grpc_stream_.currentStreamForTest() directly (defined in GrpcMuxFailover).
124
13
    if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
125
6
      return dynamic_cast<GrpcMuxFailover<RQ, RS>*>(grpc_stream_.get())->currentStreamForTest();
126
6
    }
127
7
    return *grpc_stream_.get();
128
13
  }
129

            
130
protected:
131
  class WatchImpl : public Envoy::Config::GrpcMuxWatch {
132
  public:
133
    WatchImpl(const std::string& type_url, Watch* watch, GrpcMuxImpl& parent,
134
              const SubscriptionOptions& options)
135
449
        : type_url_(type_url), watch_(watch), parent_(parent), options_(options) {}
136

            
137
449
    ~WatchImpl() override { remove(); }
138

            
139
449
    void remove() {
140
449
      if (watch_) {
141
449
        parent_.removeWatch(type_url_, watch_);
142
449
        watch_ = nullptr;
143
449
      }
144
449
    }
145

            
146
20
    void update(const absl::flat_hash_set<std::string>& resources) override {
147
20
      parent_.updateWatch(type_url_, watch_, resources, options_);
148
20
    }
149

            
150
  private:
151
    const std::string type_url_;
152
    Watch* watch_;
153
    GrpcMuxImpl& parent_;
154
    const SubscriptionOptions options_;
155
  };
156

            
157
  void sendGrpcMessage(RQ& msg_proto, S& sub_state);
158
3974
  void maybeUpdateQueueSizeStat(uint64_t size) { grpc_stream_->maybeUpdateQueueSizeStat(size); }
159
2325
  bool grpcStreamAvailable() { return grpc_stream_->grpcStreamAvailable(); }
160
2176
  bool rateLimitAllowsDrain() { return grpc_stream_->checkRateLimitAllowsDrain(); }
161
2132
  void sendMessage(RQ& msg_proto) { grpc_stream_->sendMessage(msg_proto); }
162

            
163
  S& subscriptionStateFor(const std::string& type_url);
164
  WatchMap& watchMapFor(const std::string& type_url);
165
  void handleEstablishedStream();
166
  void handleStreamEstablishmentFailure(bool next_attempt_may_send_initial_resource_version);
167
  // May modify the order of the resources in response_proto to put all the
168
  // non-heartbeat resources first.
169
  void genericHandleResponse(const std::string& type_url, RS& response_proto,
170
                             ControlPlaneStats& control_plane_stats);
171
  void trySendDiscoveryRequests();
172
1781
  bool skipSubsequentNode() const { return skip_subsequent_node_; }
173
2124
  bool anyRequestSentYetInCurrentStream() const { return any_request_sent_yet_in_current_stream_; }
174
2494
  void setAnyRequestSentYetInCurrentStream(bool value) {
175
2494
    any_request_sent_yet_in_current_stream_ = value;
176
2494
  }
177
483
  const LocalInfo::LocalInfo& localInfo() const { return local_info_; }
178

            
179
  virtual absl::string_view methodName() const PURE;
180

            
181
private:
182
  // Helper function to create the grpc_stream_ object.
183
  // TODO(adisuissa): this should be removed when envoy.restart_features.xds_failover_support
184
  // is deprecated.
185
  std::unique_ptr<GrpcStreamInterface<RQ, RS>>
186
  createGrpcStreamObject(Grpc::RawAsyncClientSharedPtr&& async_client,
187
                         Grpc::RawAsyncClientSharedPtr&& failover_async_client,
188
                         const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope,
189
                         BackOffStrategyPtr&& backoff_strategy,
190
                         const RateLimitSettings& rate_limit_settings);
191

            
192
  // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
193
  // whether we *want* to send a (Delta)DiscoveryRequest).
194
  bool canSendDiscoveryRequest(const std::string& type_url);
195

            
196
  // Checks whether we have something to say in a (Delta)DiscoveryRequest, which can be an ACK
197
  // and/or a subscription update. (Does not check whether we *can* send that
198
  // (Delta)DiscoveryRequest). Returns the type_url we should send the DeltaDiscoveryRequest for (if
199
  // any). First, prioritizes ACKs over non-ACK subscription interest updates. Then, prioritizes
200
  // non-ACK updates in the order the various types of subscriptions were activated (as tracked by
201
  // subscription_ordering_).
202
  absl::optional<std::string> whoWantsToSendDiscoveryRequest();
203

            
204
  // Invoked when dynamic context parameters change for a resource type.
205
  void onDynamicContextUpdate(absl::string_view resource_type_url);
206

            
207
  Event::Dispatcher& dispatcher_;
208
  // Multiplexes the stream to the primary and failover sources.
209
  // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
210
  // convert from unique_ptr<GrpcStreamInterface> to GrpcMuxFailover directly.
211
  std::unique_ptr<GrpcStreamInterface<RQ, RS>> grpc_stream_;
212

            
213
  // Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All
214
  // of our different resource types' ACKs are mixed together in this queue. See class for
215
  // description of how it interacts with pause() and resume().
216
  PausableAckQueue pausable_ack_queue_;
217

            
218
  // Makes SubscriptionStates, to be held in the subscriptions_ map. Whether this GrpcMux is doing
219
  // delta or state of the world xDS is determined by which concrete subclass this variable gets.
220
  std::unique_ptr<F> subscription_state_factory_;
221

            
222
  // Map key is type_url.
223
  // Only addWatch() should insert into these maps.
224
  absl::flat_hash_map<std::string, std::unique_ptr<S>> subscriptions_;
225
  absl::flat_hash_map<std::string, std::unique_ptr<WatchMap>> watch_maps_;
226

            
227
  // Determines the order of initial discovery requests. (Assumes that subscriptions are added
228
  // to this GrpcMux in the order of Envoy's dependency ordering).
229
  std::list<std::string> subscription_ordering_;
230

            
231
  // Whether to enable the optimization of only including the node field in the very first
232
  // discovery request in an xDS gRPC stream (really just one: *not* per-type_url).
233
  const bool skip_subsequent_node_;
234

            
235
  // State to help with skip_subsequent_node's logic.
236
  bool any_request_sent_yet_in_current_stream_{};
237

            
238
  // Used to populate the (Delta)DiscoveryRequest's node field. That field is the same across
239
  // all type_urls, and moreover, the 'skip_subsequent_node' logic needs to operate across all
240
  // the type_urls. So, while the SubscriptionStates populate every other field of these messages,
241
  // this one is up to GrpcMux.
242
  const LocalInfo::LocalInfo& local_info_;
243
  Common::CallbackHandlePtr dynamic_update_callback_handle_;
244
  CustomConfigValidatorsPtr config_validators_;
245
  XdsConfigTrackerOptRef xds_config_tracker_;
246
  XdsResourcesDelegateOptRef xds_resources_delegate_;
247
  EdsResourcesCachePtr eds_resources_cache_;
248
  const std::string target_xds_authority_;
249

            
250
  // Used to track whether initial_resource_versions should be populated on the
251
  // next reconnection.
252
  bool should_send_initial_resource_versions_{true};
253

            
254
  bool started_{false};
255
  // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
256
  // true because it may contain dangling pointers.
257
  std::atomic<bool> shutdown_{false};
258
};
259

            
260
class GrpcMuxDelta : public GrpcMuxImpl<DeltaSubscriptionState, DeltaSubscriptionStateFactory,
261
                                        envoy::service::discovery::v3::DeltaDiscoveryRequest,
262
                                        envoy::service::discovery::v3::DeltaDiscoveryResponse> {
263
public:
264
  explicit GrpcMuxDelta(GrpcMuxContext& grpc_mux_context);
265

            
266
  // GrpcStreamCallbacks
267
  void requestOnDemandUpdate(const std::string& type_url,
268
                             const absl::flat_hash_set<std::string>& for_update) override;
269

            
270
private:
271
6
  absl::string_view methodName() const override {
272
6
    return "envoy.service.discovery.v3.AggregatedDiscoveryService.DeltaAggregatedResources";
273
6
  }
274
};
275

            
276
class GrpcMuxSotw : public GrpcMuxImpl<SotwSubscriptionState, SotwSubscriptionStateFactory,
277
                                       envoy::service::discovery::v3::DiscoveryRequest,
278
                                       envoy::service::discovery::v3::DiscoveryResponse> {
279
public:
280
  explicit GrpcMuxSotw(GrpcMuxContext& grpc_mux_context);
281

            
282
  // GrpcStreamCallbacks
283
  void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
284
    ENVOY_BUG(false, "unexpected request for on demand update");
285
  }
286

            
287
private:
288
4
  absl::string_view methodName() const override {
289
4
    return "envoy.service.discovery.v3.AggregatedDiscoveryService.StreamAggregatedResources";
290
4
  }
291
};
292

            
293
class NullGrpcMuxImpl : public GrpcMux {
294
public:
295
1
  void start() override {}
296

            
297
1
  ScopedResume pause(const std::string&) override {
298
1
    return std::make_unique<Cleanup>([]() {});
299
1
  }
300
1
  ScopedResume pause(const std::vector<std::string>) override {
301
1
    return std::make_unique<Cleanup>([]() {});
302
1
  }
303

            
304
  Config::GrpcMuxWatchPtr addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
305
                                   SubscriptionCallbacks&, OpaqueResourceDecoderSharedPtr,
306
                                   const SubscriptionOptions&) override;
307

            
308
  absl::Status updateMuxSource(Grpc::RawAsyncClientSharedPtr&&, Grpc::RawAsyncClientSharedPtr&&,
309
                               Stats::Scope&, BackOffStrategyPtr&&,
310
                               const envoy::config::core::v3::ApiConfigSource&) override {
311
    return absl::UnimplementedError("");
312
  }
313

            
314
1
  void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
315
1
    ENVOY_BUG(false, "unexpected request for on demand update");
316
1
  }
317

            
318
1
  EdsResourcesCacheOptRef edsResourcesCache() override { return {}; }
319

            
320
1
  Upstream::LoadStatsReporter* loadStatsReporter() const override { return nullptr; }
321
1
  Upstream::LoadStatsReporter* maybeCreateLoadStatsReporter() override { return nullptr; }
322
};
323

            
324
} // namespace XdsMux
325
} // namespace Config
326
} // namespace Envoy