Line data Source code
1 : #pragma once
2 :
3 : #include <cstdint>
4 : #include <memory>
5 : #include <queue>
6 :
7 : #include "envoy/common/random_generator.h"
8 : #include "envoy/common/time.h"
9 : #include "envoy/common/token_bucket.h"
10 : #include "envoy/config/custom_config_validators.h"
11 : #include "envoy/config/grpc_mux.h"
12 : #include "envoy/config/subscription.h"
13 : #include "envoy/config/xds_config_tracker.h"
14 : #include "envoy/config/xds_resources_delegate.h"
15 : #include "envoy/event/dispatcher.h"
16 : #include "envoy/grpc/status.h"
17 : #include "envoy/service/discovery/v3/discovery.pb.h"
18 : #include "envoy/upstream/cluster_manager.h"
19 :
20 : #include "source/common/common/logger.h"
21 : #include "source/common/common/utility.h"
22 : #include "source/common/config/api_version.h"
23 : #include "source/common/grpc/common.h"
24 : #include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
25 : #include "source/extensions/config_subscription/grpc/grpc_stream.h"
26 : #include "source/extensions/config_subscription/grpc/pausable_ack_queue.h"
27 : #include "source/extensions/config_subscription/grpc/watch_map.h"
28 : #include "source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h"
29 : #include "source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h"
30 :
31 : #include "absl/container/node_hash_map.h"
32 :
33 : namespace Envoy {
34 : namespace Config {
35 : namespace XdsMux {
36 :
37 : class ShutdownableMux {
38 : public:
39 14 : virtual ~ShutdownableMux() = default;
40 : virtual void shutdown() PURE;
41 : };
42 :
43 : // Manages subscriptions to one or more type of resource. The logical protocol
44 : // state of those subscription(s) is handled by SubscriptionState.
45 : // This class owns the GrpcStream used to talk to the server, maintains queuing
46 : // logic to properly order the subscription(s)' various messages, and allows
47 : // starting/stopping/pausing of the subscriptions.
48 : //
49 : // @tparam S SubscriptionState state type, either SotwSubscriptionState or DeltaSubscriptionState
50 : // @tparam F SubscriptionStateFactory type, either SotwSubscriptionStateFactory or
51 : // DeltaSubscriptionStateFactory
52 : // @tparam RQ Xds request type, either envoy::service::discovery::v3::DiscoveryRequest or
53 : // envoy::service::discovery::v3::DeltaDiscoveryRequest
54 : // @tparam RS Xds response type, either envoy::service::discovery::v3::DiscoveryResponse or
55 : // envoy::service::discovery::v3::DeltaDiscoveryResponse
56 : //
57 : template <class S, class F, class RQ, class RS>
58 : class GrpcMuxImpl : public GrpcStreamCallbacks<RS>,
59 : public GrpcMux,
60 : public ShutdownableMux,
61 : Logger::Loggable<Logger::Id::config> {
62 : public:
63 : GrpcMuxImpl(std::unique_ptr<F> subscription_state_factory, GrpcMuxContext& grpc_mux_context,
64 : bool skip_subsequent_node);
65 :
66 : ~GrpcMuxImpl() override;
67 :
68 : // Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
69 : // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
70 : // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
71 : // would then cause all `GrpcMuxImpl` to be destructed.
72 : // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
73 : static void shutdownAll();
74 :
75 28 : void shutdown() override { shutdown_ = true; }
76 0 : bool isShutdown() { return shutdown_; }
77 :
78 : // TODO (dmitri-d) return a naked pointer instead of the wrapper once the legacy mux has been
79 : // removed and the mux interface can be changed
80 : Config::GrpcMuxWatchPtr addWatch(const std::string& type_url,
81 : const absl::flat_hash_set<std::string>& resources,
82 : SubscriptionCallbacks& callbacks,
83 : OpaqueResourceDecoderSharedPtr resource_decoder,
84 : const SubscriptionOptions& options) override;
85 : void updateWatch(const std::string& type_url, Watch* watch,
86 : const absl::flat_hash_set<std::string>& resources,
87 : const SubscriptionOptions& options);
88 : void removeWatch(const std::string& type_url, Watch* watch);
89 :
90 : ScopedResume pause(const std::string& type_url) override;
91 : ScopedResume pause(const std::vector<std::string> type_urls) override;
92 : void start() override;
93 0 : const absl::flat_hash_map<std::string, std::unique_ptr<S>>& subscriptions() const {
94 0 : return subscriptions_;
95 0 : }
96 :
97 : // GrpcStreamCallbacks
98 14 : void onStreamEstablished() override { handleEstablishedStream(); }
99 14 : void onEstablishmentFailure() override { handleStreamEstablishmentFailure(); }
100 0 : void onWriteable() override { trySendDiscoveryRequests(); }
101 : void onDiscoveryResponse(std::unique_ptr<RS>&& message,
102 111 : ControlPlaneStats& control_plane_stats) override {
103 111 : genericHandleResponse(message->type_url(), *message, control_plane_stats);
104 111 : }
105 :
106 0 : EdsResourcesCacheOptRef edsResourcesCache() override {
107 0 : return makeOptRefFromPtr(eds_resources_cache_.get());
108 0 : }
109 :
110 0 : GrpcStream<RQ, RS>& grpcStreamForTest() { return grpc_stream_; }
111 :
112 : protected:
113 : class WatchImpl : public Envoy::Config::GrpcMuxWatch {
114 : public:
115 : WatchImpl(const std::string& type_url, Watch* watch, GrpcMuxImpl& parent,
116 : const SubscriptionOptions& options)
117 67 : : type_url_(type_url), watch_(watch), parent_(parent), options_(options) {}
118 :
119 67 : ~WatchImpl() override { remove(); }
120 :
121 67 : void remove() {
122 67 : if (watch_) {
123 67 : parent_.removeWatch(type_url_, watch_);
124 67 : watch_ = nullptr;
125 67 : }
126 67 : }
127 :
128 0 : void update(const absl::flat_hash_set<std::string>& resources) override {
129 0 : parent_.updateWatch(type_url_, watch_, resources, options_);
130 0 : }
131 :
132 : private:
133 : const std::string type_url_;
134 : Watch* watch_;
135 : GrpcMuxImpl& parent_;
136 : const SubscriptionOptions options_;
137 : };
138 :
139 : void sendGrpcMessage(RQ& msg_proto, S& sub_state);
140 452 : void maybeUpdateQueueSizeStat(uint64_t size) { grpc_stream_.maybeUpdateQueueSizeStat(size); }
141 180 : bool grpcStreamAvailable() { return grpc_stream_.grpcStreamAvailable(); }
142 180 : bool rateLimitAllowsDrain() { return grpc_stream_.checkRateLimitAllowsDrain(); }
143 180 : void sendMessage(RQ& msg_proto) { grpc_stream_.sendMessage(msg_proto); }
144 :
145 : S& subscriptionStateFor(const std::string& type_url);
146 : WatchMap& watchMapFor(const std::string& type_url);
147 : void handleEstablishedStream();
148 : void handleStreamEstablishmentFailure();
149 : void genericHandleResponse(const std::string& type_url, const RS& response_proto,
150 : ControlPlaneStats& control_plane_stats);
151 : void trySendDiscoveryRequests();
152 166 : bool skipSubsequentNode() const { return skip_subsequent_node_; }
153 180 : bool anyRequestSentYetInCurrentStream() const { return any_request_sent_yet_in_current_stream_; }
154 194 : void setAnyRequestSentYetInCurrentStream(bool value) {
155 194 : any_request_sent_yet_in_current_stream_ = value;
156 194 : }
157 14 : const LocalInfo::LocalInfo& localInfo() const { return local_info_; }
158 :
159 : private:
160 : // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
161 : // whether we *want* to send a (Delta)DiscoveryRequest).
162 : bool canSendDiscoveryRequest(const std::string& type_url);
163 :
164 : // Checks whether we have something to say in a (Delta)DiscoveryRequest, which can be an ACK
165 : // and/or a subscription update. (Does not check whether we *can* send that
166 : // (Delta)DiscoveryRequest). Returns the type_url we should send the DeltaDiscoveryRequest for (if
167 : // any). First, prioritizes ACKs over non-ACK subscription interest updates. Then, prioritizes
168 : // non-ACK updates in the order the various types of subscriptions were activated (as tracked by
169 : // subscription_ordering_).
170 : absl::optional<std::string> whoWantsToSendDiscoveryRequest();
171 :
172 : // Invoked when dynamic context parameters change for a resource type.
173 : void onDynamicContextUpdate(absl::string_view resource_type_url);
174 :
175 : GrpcStream<RQ, RS> grpc_stream_;
176 :
177 : // Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All
178 : // of our different resource types' ACKs are mixed together in this queue. See class for
179 : // description of how it interacts with pause() and resume().
180 : PausableAckQueue pausable_ack_queue_;
181 :
182 : // Makes SubscriptionStates, to be held in the subscriptions_ map. Whether this GrpcMux is doing
183 : // delta or state of the world xDS is determined by which concrete subclass this variable gets.
184 : std::unique_ptr<F> subscription_state_factory_;
185 :
186 : // Map key is type_url.
187 : // Only addWatch() should insert into these maps.
188 : absl::flat_hash_map<std::string, std::unique_ptr<S>> subscriptions_;
189 : absl::flat_hash_map<std::string, std::unique_ptr<WatchMap>> watch_maps_;
190 :
191 : // Determines the order of initial discovery requests. (Assumes that subscriptions are added
192 : // to this GrpcMux in the order of Envoy's dependency ordering).
193 : std::list<std::string> subscription_ordering_;
194 :
195 : // Whether to enable the optimization of only including the node field in the very first
196 : // discovery request in an xDS gRPC stream (really just one: *not* per-type_url).
197 : const bool skip_subsequent_node_;
198 :
199 : // State to help with skip_subsequent_node's logic.
200 : bool any_request_sent_yet_in_current_stream_{};
201 :
202 : // Used to populate the (Delta)DiscoveryRequest's node field. That field is the same across
203 : // all type_urls, and moreover, the 'skip_subsequent_node' logic needs to operate across all
204 : // the type_urls. So, while the SubscriptionStates populate every other field of these messages,
205 : // this one is up to GrpcMux.
206 : const LocalInfo::LocalInfo& local_info_;
207 : Common::CallbackHandlePtr dynamic_update_callback_handle_;
208 : CustomConfigValidatorsPtr config_validators_;
209 : XdsConfigTrackerOptRef xds_config_tracker_;
210 : XdsResourcesDelegateOptRef xds_resources_delegate_;
211 : EdsResourcesCachePtr eds_resources_cache_;
212 : const std::string target_xds_authority_;
213 :
214 : bool started_{false};
215 : // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
216 : // true because it may contain dangling pointers.
217 : std::atomic<bool> shutdown_{false};
218 : };
219 :
220 : class GrpcMuxDelta : public GrpcMuxImpl<DeltaSubscriptionState, DeltaSubscriptionStateFactory,
221 : envoy::service::discovery::v3::DeltaDiscoveryRequest,
222 : envoy::service::discovery::v3::DeltaDiscoveryResponse> {
223 : public:
224 : GrpcMuxDelta(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node);
225 :
226 : // GrpcStreamCallbacks
227 : void requestOnDemandUpdate(const std::string& type_url,
228 : const absl::flat_hash_set<std::string>& for_update) override;
229 : };
230 :
231 : class GrpcMuxSotw : public GrpcMuxImpl<SotwSubscriptionState, SotwSubscriptionStateFactory,
232 : envoy::service::discovery::v3::DiscoveryRequest,
233 : envoy::service::discovery::v3::DiscoveryResponse> {
234 : public:
235 : GrpcMuxSotw(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node);
236 :
237 : // GrpcStreamCallbacks
238 0 : void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
239 0 : ENVOY_BUG(false, "unexpected request for on demand update");
240 0 : }
241 : };
242 :
243 : class NullGrpcMuxImpl : public GrpcMux {
244 : public:
245 0 : void start() override {}
246 :
247 0 : ScopedResume pause(const std::string&) override {
248 0 : return std::make_unique<Cleanup>([]() {});
249 0 : }
250 0 : ScopedResume pause(const std::vector<std::string>) override {
251 0 : return std::make_unique<Cleanup>([]() {});
252 0 : }
253 :
254 : Config::GrpcMuxWatchPtr addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
255 : SubscriptionCallbacks&, OpaqueResourceDecoderSharedPtr,
256 : const SubscriptionOptions&) override;
257 :
258 0 : void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
259 0 : ENVOY_BUG(false, "unexpected request for on demand update");
260 0 : }
261 :
262 0 : EdsResourcesCacheOptRef edsResourcesCache() override { return {}; }
263 : };
264 :
265 : } // namespace XdsMux
266 : } // namespace Config
267 : } // namespace Envoy
|