Line data Source code
1 : #pragma once
2 :
3 : #include <cstdint>
4 : #include <memory>
5 : #include <queue>
6 :
7 : #include "envoy/common/random_generator.h"
8 : #include "envoy/common/time.h"
9 : #include "envoy/config/custom_config_validators.h"
10 : #include "envoy/config/endpoint/v3/endpoint.pb.h"
11 : #include "envoy/config/grpc_mux.h"
12 : #include "envoy/config/subscription.h"
13 : #include "envoy/config/xds_config_tracker.h"
14 : #include "envoy/config/xds_resources_delegate.h"
15 : #include "envoy/event/dispatcher.h"
16 : #include "envoy/grpc/status.h"
17 : #include "envoy/service/discovery/v3/discovery.pb.h"
18 : #include "envoy/upstream/cluster_manager.h"
19 :
20 : #include "source/common/common/cleanup.h"
21 : #include "source/common/common/logger.h"
22 : #include "source/common/common/utility.h"
23 : #include "source/common/config/api_version.h"
24 : #include "source/common/config/resource_name.h"
25 : #include "source/common/config/ttl.h"
26 : #include "source/common/config/utility.h"
27 : #include "source/common/config/xds_context_params.h"
28 : #include "source/common/config/xds_resource.h"
29 : #include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
30 : #include "source/extensions/config_subscription/grpc/grpc_stream.h"
31 :
32 : #include "absl/container/node_hash_map.h"
33 : #include "xds/core/v3/resource_name.pb.h"
34 :
35 : namespace Envoy {
36 : namespace Config {
37 : /**
38 : * ADS API implementation that fetches via gRPC.
39 : */
40 : class GrpcMuxImpl : public GrpcMux,
41 : public GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>,
42 : public Logger::Loggable<Logger::Id::config> {
43 : public:
44 : GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node);
45 :
46 : ~GrpcMuxImpl() override;
47 :
48 : // Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
49 : // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
50 : // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
51 : // would then cause all `GrpcMuxImpl` to be destructed.
52 : // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
53 : static void shutdownAll();
54 :
55 11 : void shutdown() { shutdown_ = true; }
56 :
57 : void start() override;
58 :
59 : // GrpcMux
60 : ScopedResume pause(const std::string& type_url) override;
61 : ScopedResume pause(const std::vector<std::string> type_urls) override;
62 :
63 : GrpcMuxWatchPtr addWatch(const std::string& type_url,
64 : const absl::flat_hash_set<std::string>& resources,
65 : SubscriptionCallbacks& callbacks,
66 : OpaqueResourceDecoderSharedPtr resource_decoder,
67 : const SubscriptionOptions& options) override;
68 :
69 0 : void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
70 0 : }
71 :
72 0 : EdsResourcesCacheOptRef edsResourcesCache() override {
73 0 : return makeOptRefFromPtr(eds_resources_cache_.get());
74 0 : }
75 :
76 : void handleDiscoveryResponse(
77 : std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);
78 :
79 : // Config::GrpcStreamCallbacks
80 : void onStreamEstablished() override;
81 : void onEstablishmentFailure() override;
82 : void
83 : onDiscoveryResponse(std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
84 : ControlPlaneStats& control_plane_stats) override;
85 : void onWriteable() override;
86 :
87 : GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
88 : envoy::service::discovery::v3::DiscoveryResponse>&
89 0 : grpcStreamForTest() {
90 0 : return grpc_stream_;
91 0 : }
92 :
93 : private:
94 : void drainRequests();
95 : void setRetryTimer();
96 : void sendDiscoveryRequest(absl::string_view type_url);
97 : // Clears the nonces of all subscribed types in this gRPC mux.
98 : void clearNonce();
99 :
100 : struct GrpcMuxWatchImpl : public GrpcMuxWatch {
101 : GrpcMuxWatchImpl(const absl::flat_hash_set<std::string>& resources,
102 : SubscriptionCallbacks& callbacks,
103 : OpaqueResourceDecoderSharedPtr resource_decoder, const std::string& type_url,
104 : GrpcMuxImpl& parent, const SubscriptionOptions& options,
105 : const LocalInfo::LocalInfo& local_info,
106 : EdsResourcesCacheOptRef eds_resources_cache)
107 : : callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url),
108 : parent_(parent), subscription_options_(options), local_info_(local_info),
109 : watches_(parent.apiStateFor(type_url).watches_),
110 51 : eds_resources_cache_(eds_resources_cache) {
111 51 : updateResources(resources);
112 : // If eds resources cache is provided, then the type must be ClusterLoadAssignment.
113 51 : ASSERT(
114 51 : !eds_resources_cache_.has_value() ||
115 51 : (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>()));
116 51 : }
117 :
118 51 : ~GrpcMuxWatchImpl() override {
119 51 : watches_.erase(iter_);
120 51 : if (!resources_.empty()) {
121 31 : parent_.queueDiscoveryRequest(type_url_);
122 31 : if (eds_resources_cache_.has_value()) {
123 0 : removeResourcesFromCache(resources_);
124 0 : }
125 31 : }
126 51 : }
127 :
128 0 : void update(const absl::flat_hash_set<std::string>& resources) override {
129 0 : watches_.erase(iter_);
130 0 : if (!resources_.empty()) {
131 0 : parent_.queueDiscoveryRequest(type_url_);
132 0 : }
133 0 : updateResources(resources);
134 0 : parent_.queueDiscoveryRequest(type_url_);
135 0 : }
136 :
137 : // Maintain deterministic wire ordering via ordered std::set.
138 : std::set<std::string> resources_;
139 : SubscriptionCallbacks& callbacks_;
140 : OpaqueResourceDecoderSharedPtr resource_decoder_;
141 : const std::string type_url_;
142 : GrpcMuxImpl& parent_;
143 :
144 : private:
145 51 : void updateResources(const absl::flat_hash_set<std::string>& resources) {
146 : // Finding the list of removed resources by keeping the current resources
147 : // set until the end the function and computing the diff.
148 : // Temporarily keep the resources prior to the update to find which ones
149 : // were removed.
150 51 : std::set<std::string> previous_resources;
151 51 : previous_resources.swap(resources_);
152 51 : std::transform(
153 51 : resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()),
154 51 : [this](const std::string& resource_name) -> std::string {
155 31 : if (XdsResourceIdentifier::hasXdsTpScheme(resource_name)) {
156 0 : auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
157 0 : THROW_IF_STATUS_NOT_OK(xdstp_resource_or_error, throw);
158 0 : auto xdstp_resource = xdstp_resource_or_error.value();
159 0 : if (subscription_options_.add_xdstp_node_context_params_) {
160 0 : const auto context = XdsContextParams::encodeResource(
161 0 : local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
162 0 : xdstp_resource.mutable_context()->CopyFrom(context);
163 0 : }
164 0 : XdsResourceIdentifier::EncodeOptions encode_options;
165 0 : encode_options.sort_context_params_ = true;
166 0 : return XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options);
167 0 : }
168 31 : return resource_name;
169 31 : });
170 51 : if (eds_resources_cache_.has_value()) {
171 : // Compute the removed resources and remove them from the cache.
172 0 : std::set<std::string> removed_resources;
173 0 : std::set_difference(previous_resources.begin(), previous_resources.end(),
174 0 : resources_.begin(), resources_.end(),
175 0 : std::inserter(removed_resources, removed_resources.begin()));
176 0 : removeResourcesFromCache(removed_resources);
177 0 : }
178 : // move this watch to the beginning of the list
179 51 : iter_ = watches_.emplace(watches_.begin(), this);
180 51 : }
181 :
182 0 : void removeResourcesFromCache(const std::set<std::string>& resources_to_remove) {
183 0 : ASSERT(eds_resources_cache_.has_value());
184 : // Iterate over the resources to remove, and if no other watcher
185 : // registered for that resource, remove it from the cache.
186 0 : for (const auto& resource_name : resources_to_remove) {
187 : // Counts the number of watchers that watch the resource.
188 0 : uint32_t resource_watchers_count = 0;
189 0 : for (const auto& watch : watches_) {
190 : // Skip the current watcher as it is intending to remove the resource.
191 0 : if (watch == this) {
192 0 : continue;
193 0 : }
194 0 : if (watch->resources_.find(resource_name) != watch->resources_.end()) {
195 0 : resource_watchers_count++;
196 0 : }
197 0 : }
198 : // Other than "this" watcher, the resource is not watched by any other
199 : // watcher, so it can be removed.
200 0 : if (resource_watchers_count == 0) {
201 0 : eds_resources_cache_->removeResource(resource_name);
202 0 : }
203 0 : }
204 0 : }
205 :
206 : using WatchList = std::list<GrpcMuxWatchImpl*>;
207 : const SubscriptionOptions& subscription_options_;
208 : const LocalInfo::LocalInfo& local_info_;
209 : WatchList& watches_;
210 : WatchList::iterator iter_;
211 : // Optional cache for the specific ClusterLoadAssignments of this watch.
212 : EdsResourcesCacheOptRef eds_resources_cache_;
213 : };
214 :
215 : // Per muxed API state.
216 : struct ApiState {
217 : ApiState(Event::Dispatcher& dispatcher,
218 : std::function<void(const std::vector<std::string>&)> callback)
219 70 : : ttl_(callback, dispatcher, dispatcher.timeSource()) {}
220 :
221 262 : bool paused() const { return pauses_ > 0; }
222 :
223 : // Watches on the returned resources for the API;
224 : std::list<GrpcMuxWatchImpl*> watches_;
225 : // Current DiscoveryRequest for API.
226 : envoy::service::discovery::v3::DiscoveryRequest request_;
227 : // Count of unresumed pause() invocations.
228 : uint32_t pauses_{};
229 : // Was a DiscoveryRequest elided during a pause?
230 : bool pending_{};
231 : // Has this API been tracked in subscriptions_?
232 : bool subscribed_{};
233 : // This resource type must have a Node sent at next request.
234 : bool must_send_node_{};
235 : TtlManager ttl_;
236 : // The identifier for the server that sent the most recent response, or
237 : // empty if there is none.
238 : std::string control_plane_identifier_{};
239 : // If true, xDS resources were previously fetched from an xDS source or an xDS delegate.
240 : bool previously_fetched_data_{false};
241 : };
242 :
243 142 : bool isHeartbeatResource(const std::string& type_url, const DecodedResource& resource) {
244 142 : return !resource.hasResource() &&
245 142 : resource.version() == apiStateFor(type_url).request_.version_info();
246 142 : }
247 : void expiryCallback(absl::string_view type_url, const std::vector<std::string>& expired);
248 : // Request queue management logic.
249 : void queueDiscoveryRequest(absl::string_view queue_item);
250 : // Invoked when dynamic context parameters change for a resource type.
251 : void onDynamicContextUpdate(absl::string_view resource_type_url);
252 : // Must be invoked from the main or test thread.
253 : void loadConfigFromDelegate(const std::string& type_url,
254 : const absl::flat_hash_set<std::string>& resource_names);
255 : // Must be invoked from the main or test thread.
256 : void processDiscoveryResources(const std::vector<DecodedResourcePtr>& resources,
257 : ApiState& api_state, const std::string& type_url,
258 : const std::string& version_info, bool call_delegate);
259 :
260 : GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
261 : envoy::service::discovery::v3::DiscoveryResponse>
262 : grpc_stream_;
263 : const LocalInfo::LocalInfo& local_info_;
264 : const bool skip_subsequent_node_;
265 : CustomConfigValidatorsPtr config_validators_;
266 : XdsConfigTrackerOptRef xds_config_tracker_;
267 : XdsResourcesDelegateOptRef xds_resources_delegate_;
268 : EdsResourcesCachePtr eds_resources_cache_;
269 : const std::string target_xds_authority_;
270 : bool first_stream_request_{true};
271 :
272 : // Helper function for looking up and potentially allocating a new ApiState.
273 : ApiState& apiStateFor(absl::string_view type_url);
274 :
275 : absl::node_hash_map<std::string, std::unique_ptr<ApiState>> api_state_;
276 :
277 : // Envoy's dependency ordering.
278 : std::list<std::string> subscriptions_;
279 :
280 : // A queue to store requests while rate limited. Note that when requests
281 : // cannot be sent due to the gRPC stream being down, this queue does not
282 : // store them; rather, they are simply dropped. This string is a type
283 : // URL.
284 : std::unique_ptr<std::queue<std::string>> request_queue_;
285 :
286 : Event::Dispatcher& dispatcher_;
287 : Common::CallbackHandlePtr dynamic_update_callback_handle_;
288 :
289 : bool started_{false};
290 : // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
291 : // true because it may contain dangling pointers.
292 : std::atomic<bool> shutdown_{false};
293 : };
294 :
295 : using GrpcMuxImplPtr = std::unique_ptr<GrpcMuxImpl>;
296 : using GrpcMuxImplSharedPtr = std::shared_ptr<GrpcMuxImpl>;
297 :
298 : class GrpcMuxFactory;
299 : DECLARE_FACTORY(GrpcMuxFactory);
300 :
301 : } // namespace Config
302 : } // namespace Envoy
|