LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc - grpc_mux_impl.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 32 85 37.6 %
Date: 2024-01-05 06:35:25 Functions: 8 13 61.5 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <cstdint>
       4             : #include <memory>
       5             : #include <queue>
       6             : 
       7             : #include "envoy/common/random_generator.h"
       8             : #include "envoy/common/time.h"
       9             : #include "envoy/config/custom_config_validators.h"
      10             : #include "envoy/config/endpoint/v3/endpoint.pb.h"
      11             : #include "envoy/config/grpc_mux.h"
      12             : #include "envoy/config/subscription.h"
      13             : #include "envoy/config/xds_config_tracker.h"
      14             : #include "envoy/config/xds_resources_delegate.h"
      15             : #include "envoy/event/dispatcher.h"
      16             : #include "envoy/grpc/status.h"
      17             : #include "envoy/service/discovery/v3/discovery.pb.h"
      18             : #include "envoy/upstream/cluster_manager.h"
      19             : 
      20             : #include "source/common/common/cleanup.h"
      21             : #include "source/common/common/logger.h"
      22             : #include "source/common/common/utility.h"
      23             : #include "source/common/config/api_version.h"
      24             : #include "source/common/config/resource_name.h"
      25             : #include "source/common/config/ttl.h"
      26             : #include "source/common/config/utility.h"
      27             : #include "source/common/config/xds_context_params.h"
      28             : #include "source/common/config/xds_resource.h"
      29             : #include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
      30             : #include "source/extensions/config_subscription/grpc/grpc_stream.h"
      31             : 
      32             : #include "absl/container/node_hash_map.h"
      33             : #include "xds/core/v3/resource_name.pb.h"
      34             : 
      35             : namespace Envoy {
      36             : namespace Config {
      37             : /**
      38             :  * ADS API implementation that fetches via gRPC.
      39             :  */
      40             : class GrpcMuxImpl : public GrpcMux,
      41             :                     public GrpcStreamCallbacks<envoy::service::discovery::v3::DiscoveryResponse>,
      42             :                     public Logger::Loggable<Logger::Id::config> {
      43             : public:
      44             :   GrpcMuxImpl(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node);
      45             : 
      46             :   ~GrpcMuxImpl() override;
      47             : 
      48             :   // Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
      49             :   // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
      50             :   // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
      51             :   // would then cause all `GrpcMuxImpl` to be destructed.
      52             :   // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
      53             :   static void shutdownAll();
      54             : 
      55          11 :   void shutdown() { shutdown_ = true; }
      56             : 
      57             :   void start() override;
      58             : 
      59             :   // GrpcMux
      60             :   ScopedResume pause(const std::string& type_url) override;
      61             :   ScopedResume pause(const std::vector<std::string> type_urls) override;
      62             : 
      63             :   GrpcMuxWatchPtr addWatch(const std::string& type_url,
      64             :                            const absl::flat_hash_set<std::string>& resources,
      65             :                            SubscriptionCallbacks& callbacks,
      66             :                            OpaqueResourceDecoderSharedPtr resource_decoder,
      67             :                            const SubscriptionOptions& options) override;
      68             : 
      69           0 :   void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
      70           0 :   }
      71             : 
      72           0 :   EdsResourcesCacheOptRef edsResourcesCache() override {
      73           0 :     return makeOptRefFromPtr(eds_resources_cache_.get());
      74           0 :   }
      75             : 
      76             :   void handleDiscoveryResponse(
      77             :       std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message);
      78             : 
      79             :   // Config::GrpcStreamCallbacks
      80             :   void onStreamEstablished() override;
      81             :   void onEstablishmentFailure() override;
      82             :   void
      83             :   onDiscoveryResponse(std::unique_ptr<envoy::service::discovery::v3::DiscoveryResponse>&& message,
      84             :                       ControlPlaneStats& control_plane_stats) override;
      85             :   void onWriteable() override;
      86             : 
      87             :   GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
      88             :              envoy::service::discovery::v3::DiscoveryResponse>&
      89           0 :   grpcStreamForTest() {
      90           0 :     return grpc_stream_;
      91           0 :   }
      92             : 
      93             : private:
      94             :   void drainRequests();
      95             :   void setRetryTimer();
      96             :   void sendDiscoveryRequest(absl::string_view type_url);
      97             :   // Clears the nonces of all subscribed types in this gRPC mux.
      98             :   void clearNonce();
      99             : 
     100             :   struct GrpcMuxWatchImpl : public GrpcMuxWatch {
     101             :     GrpcMuxWatchImpl(const absl::flat_hash_set<std::string>& resources,
     102             :                      SubscriptionCallbacks& callbacks,
     103             :                      OpaqueResourceDecoderSharedPtr resource_decoder, const std::string& type_url,
     104             :                      GrpcMuxImpl& parent, const SubscriptionOptions& options,
     105             :                      const LocalInfo::LocalInfo& local_info,
     106             :                      EdsResourcesCacheOptRef eds_resources_cache)
     107             :         : callbacks_(callbacks), resource_decoder_(resource_decoder), type_url_(type_url),
     108             :           parent_(parent), subscription_options_(options), local_info_(local_info),
     109             :           watches_(parent.apiStateFor(type_url).watches_),
     110          51 :           eds_resources_cache_(eds_resources_cache) {
     111          51 :       updateResources(resources);
     112             :       // If eds resources cache is provided, then the type must be ClusterLoadAssignment.
     113          51 :       ASSERT(
     114          51 :           !eds_resources_cache_.has_value() ||
     115          51 :           (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>()));
     116          51 :     }
     117             : 
     118          51 :     ~GrpcMuxWatchImpl() override {
     119          51 :       watches_.erase(iter_);
     120          51 :       if (!resources_.empty()) {
     121          31 :         parent_.queueDiscoveryRequest(type_url_);
     122          31 :         if (eds_resources_cache_.has_value()) {
     123           0 :           removeResourcesFromCache(resources_);
     124           0 :         }
     125          31 :       }
     126          51 :     }
     127             : 
     128           0 :     void update(const absl::flat_hash_set<std::string>& resources) override {
     129           0 :       watches_.erase(iter_);
     130           0 :       if (!resources_.empty()) {
     131           0 :         parent_.queueDiscoveryRequest(type_url_);
     132           0 :       }
     133           0 :       updateResources(resources);
     134           0 :       parent_.queueDiscoveryRequest(type_url_);
     135           0 :     }
     136             : 
     137             :     // Maintain deterministic wire ordering via ordered std::set.
     138             :     std::set<std::string> resources_;
     139             :     SubscriptionCallbacks& callbacks_;
     140             :     OpaqueResourceDecoderSharedPtr resource_decoder_;
     141             :     const std::string type_url_;
     142             :     GrpcMuxImpl& parent_;
     143             : 
     144             :   private:
     145          51 :     void updateResources(const absl::flat_hash_set<std::string>& resources) {
     146             :       // Finding the list of removed resources by keeping the current resources
     147             :       // set until the end the function and computing the diff.
     148             :       // Temporarily keep the resources prior to the update to find which ones
     149             :       // were removed.
     150          51 :       std::set<std::string> previous_resources;
     151          51 :       previous_resources.swap(resources_);
     152          51 :       std::transform(
     153          51 :           resources.begin(), resources.end(), std::inserter(resources_, resources_.begin()),
     154          51 :           [this](const std::string& resource_name) -> std::string {
     155          31 :             if (XdsResourceIdentifier::hasXdsTpScheme(resource_name)) {
     156           0 :               auto xdstp_resource_or_error = XdsResourceIdentifier::decodeUrn(resource_name);
     157           0 :               THROW_IF_STATUS_NOT_OK(xdstp_resource_or_error, throw);
     158           0 :               auto xdstp_resource = xdstp_resource_or_error.value();
     159           0 :               if (subscription_options_.add_xdstp_node_context_params_) {
     160           0 :                 const auto context = XdsContextParams::encodeResource(
     161           0 :                     local_info_.contextProvider().nodeContext(), xdstp_resource.context(), {}, {});
     162           0 :                 xdstp_resource.mutable_context()->CopyFrom(context);
     163           0 :               }
     164           0 :               XdsResourceIdentifier::EncodeOptions encode_options;
     165           0 :               encode_options.sort_context_params_ = true;
     166           0 :               return XdsResourceIdentifier::encodeUrn(xdstp_resource, encode_options);
     167           0 :             }
     168          31 :             return resource_name;
     169          31 :           });
     170          51 :       if (eds_resources_cache_.has_value()) {
     171             :         // Compute the removed resources and remove them from the cache.
     172           0 :         std::set<std::string> removed_resources;
     173           0 :         std::set_difference(previous_resources.begin(), previous_resources.end(),
     174           0 :                             resources_.begin(), resources_.end(),
     175           0 :                             std::inserter(removed_resources, removed_resources.begin()));
     176           0 :         removeResourcesFromCache(removed_resources);
     177           0 :       }
     178             :       // move this watch to the beginning of the list
     179          51 :       iter_ = watches_.emplace(watches_.begin(), this);
     180          51 :     }
     181             : 
     182           0 :     void removeResourcesFromCache(const std::set<std::string>& resources_to_remove) {
     183           0 :       ASSERT(eds_resources_cache_.has_value());
     184             :       // Iterate over the resources to remove, and if no other watcher
     185             :       // registered for that resource, remove it from the cache.
     186           0 :       for (const auto& resource_name : resources_to_remove) {
     187             :         // Counts the number of watchers that watch the resource.
     188           0 :         uint32_t resource_watchers_count = 0;
     189           0 :         for (const auto& watch : watches_) {
     190             :           // Skip the current watcher as it is intending to remove the resource.
     191           0 :           if (watch == this) {
     192           0 :             continue;
     193           0 :           }
     194           0 :           if (watch->resources_.find(resource_name) != watch->resources_.end()) {
     195           0 :             resource_watchers_count++;
     196           0 :           }
     197           0 :         }
     198             :         // Other than "this" watcher, the resource is not watched by any other
     199             :         // watcher, so it can be removed.
     200           0 :         if (resource_watchers_count == 0) {
     201           0 :           eds_resources_cache_->removeResource(resource_name);
     202           0 :         }
     203           0 :       }
     204           0 :     }
     205             : 
     206             :     using WatchList = std::list<GrpcMuxWatchImpl*>;
     207             :     const SubscriptionOptions& subscription_options_;
     208             :     const LocalInfo::LocalInfo& local_info_;
     209             :     WatchList& watches_;
     210             :     WatchList::iterator iter_;
     211             :     // Optional cache for the specific ClusterLoadAssignments of this watch.
     212             :     EdsResourcesCacheOptRef eds_resources_cache_;
     213             :   };
     214             : 
     215             :   // Per muxed API state.
     216             :   struct ApiState {
     217             :     ApiState(Event::Dispatcher& dispatcher,
     218             :              std::function<void(const std::vector<std::string>&)> callback)
     219          70 :         : ttl_(callback, dispatcher, dispatcher.timeSource()) {}
     220             : 
     221         262 :     bool paused() const { return pauses_ > 0; }
     222             : 
     223             :     // Watches on the returned resources for the API;
     224             :     std::list<GrpcMuxWatchImpl*> watches_;
     225             :     // Current DiscoveryRequest for API.
     226             :     envoy::service::discovery::v3::DiscoveryRequest request_;
     227             :     // Count of unresumed pause() invocations.
     228             :     uint32_t pauses_{};
     229             :     // Was a DiscoveryRequest elided during a pause?
     230             :     bool pending_{};
     231             :     // Has this API been tracked in subscriptions_?
     232             :     bool subscribed_{};
     233             :     // This resource type must have a Node sent at next request.
     234             :     bool must_send_node_{};
     235             :     TtlManager ttl_;
     236             :     // The identifier for the server that sent the most recent response, or
     237             :     // empty if there is none.
     238             :     std::string control_plane_identifier_{};
     239             :     // If true, xDS resources were previously fetched from an xDS source or an xDS delegate.
     240             :     bool previously_fetched_data_{false};
     241             :   };
     242             : 
     243         142 :   bool isHeartbeatResource(const std::string& type_url, const DecodedResource& resource) {
     244         142 :     return !resource.hasResource() &&
     245         142 :            resource.version() == apiStateFor(type_url).request_.version_info();
     246         142 :   }
     247             :   void expiryCallback(absl::string_view type_url, const std::vector<std::string>& expired);
     248             :   // Request queue management logic.
     249             :   void queueDiscoveryRequest(absl::string_view queue_item);
     250             :   // Invoked when dynamic context parameters change for a resource type.
     251             :   void onDynamicContextUpdate(absl::string_view resource_type_url);
     252             :   // Must be invoked from the main or test thread.
     253             :   void loadConfigFromDelegate(const std::string& type_url,
     254             :                               const absl::flat_hash_set<std::string>& resource_names);
     255             :   // Must be invoked from the main or test thread.
     256             :   void processDiscoveryResources(const std::vector<DecodedResourcePtr>& resources,
     257             :                                  ApiState& api_state, const std::string& type_url,
     258             :                                  const std::string& version_info, bool call_delegate);
     259             : 
     260             :   GrpcStream<envoy::service::discovery::v3::DiscoveryRequest,
     261             :              envoy::service::discovery::v3::DiscoveryResponse>
     262             :       grpc_stream_;
     263             :   const LocalInfo::LocalInfo& local_info_;
     264             :   const bool skip_subsequent_node_;
     265             :   CustomConfigValidatorsPtr config_validators_;
     266             :   XdsConfigTrackerOptRef xds_config_tracker_;
     267             :   XdsResourcesDelegateOptRef xds_resources_delegate_;
     268             :   EdsResourcesCachePtr eds_resources_cache_;
     269             :   const std::string target_xds_authority_;
     270             :   bool first_stream_request_{true};
     271             : 
     272             :   // Helper function for looking up and potentially allocating a new ApiState.
     273             :   ApiState& apiStateFor(absl::string_view type_url);
     274             : 
     275             :   absl::node_hash_map<std::string, std::unique_ptr<ApiState>> api_state_;
     276             : 
     277             :   // Envoy's dependency ordering.
     278             :   std::list<std::string> subscriptions_;
     279             : 
     280             :   // A queue to store requests while rate limited. Note that when requests
     281             :   // cannot be sent due to the gRPC stream being down, this queue does not
     282             :   // store them; rather, they are simply dropped. This string is a type
     283             :   // URL.
     284             :   std::unique_ptr<std::queue<std::string>> request_queue_;
     285             : 
     286             :   Event::Dispatcher& dispatcher_;
     287             :   Common::CallbackHandlePtr dynamic_update_callback_handle_;
     288             : 
     289             :   bool started_{false};
     290             :   // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
     291             :   // true because it may contain dangling pointers.
     292             :   std::atomic<bool> shutdown_{false};
     293             : };
     294             : 
     295             : using GrpcMuxImplPtr = std::unique_ptr<GrpcMuxImpl>;
     296             : using GrpcMuxImplSharedPtr = std::shared_ptr<GrpcMuxImpl>;
     297             : 
     298             : class GrpcMuxFactory;
     299             : DECLARE_FACTORY(GrpcMuxFactory);
     300             : 
     301             : } // namespace Config
     302             : } // namespace Envoy

Generated by: LCOV version 1.15