1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/common/random_generator.h"
6
#include "envoy/common/token_bucket.h"
7
#include "envoy/config/endpoint/v3/endpoint.pb.h"
8
#include "envoy/config/grpc_mux.h"
9
#include "envoy/config/subscription.h"
10
#include "envoy/config/xds_config_tracker.h"
11
#include "envoy/service/discovery/v3/discovery.pb.h"
12

            
13
#include "source/common/common/logger.h"
14
#include "source/common/config/api_version.h"
15
#include "source/common/config/resource_name.h"
16
#include "source/common/grpc/common.h"
17
#include "source/common/runtime/runtime_features.h"
18
#include "source/extensions/config_subscription/grpc/delta_subscription_state.h"
19
#include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
20
#include "source/extensions/config_subscription/grpc/grpc_mux_failover.h"
21
#include "source/extensions/config_subscription/grpc/pausable_ack_queue.h"
22
#include "source/extensions/config_subscription/grpc/watch_map.h"
23

            
24
namespace Envoy {
25
namespace Config {
26

            
27
// Manages subscriptions to one or more type of resource. The logical protocol
28
// state of those subscription(s) is handled by DeltaSubscriptionState.
29
// This class owns the GrpcStream used to talk to the server, maintains queuing
30
// logic to properly order the subscription(s)' various messages, and allows
31
// starting/stopping/pausing of the subscriptions.
32
class NewGrpcMuxImpl
33
    : public GrpcMux,
34
      public GrpcStreamCallbacks<envoy::service::discovery::v3::DeltaDiscoveryResponse>,
35
      Logger::Loggable<Logger::Id::config> {
36
public:
37
  NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context);
38

            
39
  ~NewGrpcMuxImpl() override;
40

            
41
  // Causes all NewGrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
42
  // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
43
  // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
44
  // would then cause all `NewGrpcMuxImpl` to be destructed.
45
  // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
46
  static void shutdownAll();
47

            
48
469
  void shutdown() { shutdown_ = true; }
49

            
50
  GrpcMuxWatchPtr addWatch(const std::string& type_url,
51
                           const absl::flat_hash_set<std::string>& resources,
52
                           SubscriptionCallbacks& callbacks,
53
                           OpaqueResourceDecoderSharedPtr resource_decoder,
54
                           const SubscriptionOptions& options) override;
55

            
56
  void requestOnDemandUpdate(const std::string& type_url,
57
                             const absl::flat_hash_set<std::string>& for_update) override;
58

            
59
167
  EdsResourcesCacheOptRef edsResourcesCache() override {
60
167
    return makeOptRefFromPtr(eds_resources_cache_.get());
61
167
  }
62

            
63
  ScopedResume pause(const std::string& type_url) override;
64
  ScopedResume pause(const std::vector<std::string> type_urls) override;
65

            
66
  void onDiscoveryResponse(
67
      std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message,
68
      ControlPlaneStats& control_plane_stats) override;
69

            
70
  void onStreamEstablished() override;
71

            
72
  void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) override;
73

            
74
  void onWriteable() override;
75

            
76
  void kickOffAck(UpdateAck ack);
77

            
78
  // TODO(fredlas) remove this from the GrpcMux interface.
79
  void start() override;
80

            
81
  absl::Status
82
  updateMuxSource(Grpc::RawAsyncClientSharedPtr&& primary_async_client,
83
                  Grpc::RawAsyncClientSharedPtr&& failover_async_client, Stats::Scope& scope,
84
                  BackOffStrategyPtr&& backoff_strategy,
85
                  const envoy::config::core::v3::ApiConfigSource& ads_config_source) override;
86

            
87
  // TODO(adisuissa): finish implementation.
88
2
  Upstream::LoadStatsReporter* loadStatsReporter() const override { return nullptr; }
89
2
  Upstream::LoadStatsReporter* maybeCreateLoadStatsReporter() override { return nullptr; }
90

            
91
  GrpcStreamInterface<envoy::service::discovery::v3::DeltaDiscoveryRequest,
92
                      envoy::service::discovery::v3::DeltaDiscoveryResponse>&
93
4
  grpcStreamForTest() {
94
    // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
95
    // return grpc_stream_.currentStreamForTest() directly (defined in GrpcMuxFailover).
96
4
    if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
97
2
      return dynamic_cast<GrpcMuxFailover<envoy::service::discovery::v3::DeltaDiscoveryRequest,
98
2
                                          envoy::service::discovery::v3::DeltaDiscoveryResponse>*>(
99
2
                 grpc_stream_.get())
100
2
          ->currentStreamForTest();
101
2
    }
102
2
    return *grpc_stream_.get();
103
4
  }
104

            
105
  struct SubscriptionStuff {
106
    SubscriptionStuff(const std::string& type_url, const bool use_namespace_matching,
107
                      Event::Dispatcher& dispatcher, CustomConfigValidators* config_validators,
108
                      XdsConfigTrackerOptRef xds_config_tracker,
109
                      EdsResourcesCacheOptRef eds_resources_cache)
110
796
        : watch_map_(use_namespace_matching, type_url, config_validators, eds_resources_cache),
111
796
          sub_state_(type_url, watch_map_, dispatcher, xds_config_tracker) {
112
      // If eds resources cache is provided, then the type must be ClusterLoadAssignment.
113
796
      ASSERT(
114
796
          !eds_resources_cache.has_value() ||
115
796
          (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>()));
116
796
    }
117

            
118
    WatchMap watch_map_;
119
    DeltaSubscriptionState sub_state_;
120
    std::string control_plane_identifier_{};
121

            
122
    SubscriptionStuff(const SubscriptionStuff&) = delete;
123
    SubscriptionStuff& operator=(const SubscriptionStuff&) = delete;
124
  };
125

            
126
  using SubscriptionStuffPtr = std::unique_ptr<SubscriptionStuff>;
127

            
128
  // for use in tests only
129
2
  const absl::flat_hash_map<std::string, SubscriptionStuffPtr>& subscriptions() {
130
2
    return subscriptions_;
131
2
  }
132

            
133
private:
134
  class WatchImpl : public GrpcMuxWatch {
135
  public:
136
    WatchImpl(const std::string& type_url, Watch* watch, NewGrpcMuxImpl& parent,
137
              const SubscriptionOptions& options)
138
942
        : type_url_(type_url), watch_(watch), parent_(parent), options_(options) {}
139

            
140
942
    ~WatchImpl() override { remove(); }
141

            
142
942
    void remove() {
143
942
      if (watch_) {
144
942
        parent_.removeWatch(type_url_, watch_);
145
942
        watch_ = nullptr;
146
942
      }
147
942
    }
148

            
149
15
    void update(const absl::flat_hash_set<std::string>& resources) override {
150
15
      parent_.updateWatch(type_url_, watch_, resources, options_);
151
15
    }
152

            
153
  private:
154
    const std::string type_url_;
155
    Watch* watch_;
156
    NewGrpcMuxImpl& parent_;
157
    const SubscriptionOptions options_;
158
  };
159

            
160
  using SubscriptionsMap = absl::flat_hash_map<std::string, SubscriptionStuffPtr>;
161

            
162
  // Helper function to create the grpc_stream_ object.
163
  std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DeltaDiscoveryRequest,
164
                                      envoy::service::discovery::v3::DeltaDiscoveryResponse>>
165
  createGrpcStreamObject(Grpc::RawAsyncClientSharedPtr&& async_client,
166
                         Grpc::RawAsyncClientSharedPtr&& failover_async_client,
167
                         const Protobuf::MethodDescriptor& service_method, Stats::Scope& scope,
168
                         BackOffStrategyPtr&& backoff_strategy,
169
                         const RateLimitSettings& rate_limit_settings);
170

            
171
  void removeWatch(const std::string& type_url, Watch* watch);
172

            
173
  // Updates the list of resource names watched by the given watch. If an added name is new across
174
  // the whole subscription, or if a removed name has no other watch interested in it, then the
175
  // subscription will enqueue and attempt to send an appropriate discovery request.
176
  void updateWatch(const std::string& type_url, Watch* watch,
177
                   const absl::flat_hash_set<std::string>& resources,
178
                   const SubscriptionOptions& options);
179

            
180
  // Adds a subscription for the type_url to the subscriptions map and order list.
181
  SubscriptionsMap::iterator addSubscription(const std::string& type_url,
182
                                             bool use_namespace_matching);
183

            
184
  void trySendDiscoveryRequests();
185

            
186
  // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
187
  // whether we *want* to send a DeltaDiscoveryRequest).
188
  bool canSendDiscoveryRequest(const std::string& type_url);
189

            
190
  // Checks whether we have something to say in a DeltaDiscoveryRequest, which can be an ACK and/or
191
  // a subscription update. (Does not check whether we *can* send that DeltaDiscoveryRequest).
192
  // Returns the type_url we should send the DeltaDiscoveryRequest for (if any).
193
  // First, prioritizes ACKs over non-ACK subscription interest updates.
194
  // Then, prioritizes non-ACK updates in the order the various types
195
  // of subscriptions were activated.
196
  absl::optional<std::string> whoWantsToSendDiscoveryRequest();
197

            
198
  // Invoked when dynamic context parameters change for a resource type.
199
  void onDynamicContextUpdate(absl::string_view resource_type_url);
200

            
201
  // Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All
202
  // of our different resource types' ACKs are mixed together in this queue. See class for
203
  // description of how it interacts with pause() and resume().
204
  PausableAckQueue pausable_ack_queue_;
205

            
206
  // Map key is type_url.
207
  SubscriptionsMap subscriptions_;
208

            
209
  // Determines the order of initial discovery requests. (Assumes that subscriptions are added in
210
  // the order of Envoy's dependency ordering).
211
  std::list<std::string> subscription_ordering_;
212

            
213
  Event::Dispatcher& dispatcher_;
214
  // Multiplexes the stream to the primary and failover sources.
215
  // TODO(adisuissa): Once envoy.restart_features.xds_failover_support is deprecated,
216
  // convert from unique_ptr<GrpcStreamInterface> to GrpcMuxFailover directly.
217
  std::unique_ptr<GrpcStreamInterface<envoy::service::discovery::v3::DeltaDiscoveryRequest,
218
                                      envoy::service::discovery::v3::DeltaDiscoveryResponse>>
219
      grpc_stream_;
220

            
221
  const LocalInfo::LocalInfo& local_info_;
222
  CustomConfigValidatorsPtr config_validators_;
223
  Common::CallbackHandlePtr dynamic_update_callback_handle_;
224
  XdsConfigTrackerOptRef xds_config_tracker_;
225
  const bool skip_subsequent_node_;
226
  EdsResourcesCachePtr eds_resources_cache_;
227
  bool first_request_on_stream_{true};
228

            
229
  // Used to track whether initial_resource_versions should be populated on the
230
  // next reconnection.
231
  bool should_send_initial_resource_versions_{true};
232
  bool started_{false};
233
  // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
234
  // true because it may contain dangling pointers.
235
  std::atomic<bool> shutdown_{false};
236
};
237

            
238
using NewGrpcMuxImplPtr = std::unique_ptr<NewGrpcMuxImpl>;
239
using NewGrpcMuxImplSharedPtr = std::shared_ptr<NewGrpcMuxImpl>;
240

            
241
class NewGrpcMuxFactory;
242
DECLARE_FACTORY(NewGrpcMuxFactory);
243

            
244
} // namespace Config
245
} // namespace Envoy