LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc/xds_mux - grpc_mux_impl.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 25 51 49.0 %
Date: 2024-01-05 06:35:25 Functions: 31 51 60.8 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <cstdint>
       4             : #include <memory>
       5             : #include <queue>
       6             : 
       7             : #include "envoy/common/random_generator.h"
       8             : #include "envoy/common/time.h"
       9             : #include "envoy/common/token_bucket.h"
      10             : #include "envoy/config/custom_config_validators.h"
      11             : #include "envoy/config/grpc_mux.h"
      12             : #include "envoy/config/subscription.h"
      13             : #include "envoy/config/xds_config_tracker.h"
      14             : #include "envoy/config/xds_resources_delegate.h"
      15             : #include "envoy/event/dispatcher.h"
      16             : #include "envoy/grpc/status.h"
      17             : #include "envoy/service/discovery/v3/discovery.pb.h"
      18             : #include "envoy/upstream/cluster_manager.h"
      19             : 
      20             : #include "source/common/common/logger.h"
      21             : #include "source/common/common/utility.h"
      22             : #include "source/common/config/api_version.h"
      23             : #include "source/common/grpc/common.h"
      24             : #include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
      25             : #include "source/extensions/config_subscription/grpc/grpc_stream.h"
      26             : #include "source/extensions/config_subscription/grpc/pausable_ack_queue.h"
      27             : #include "source/extensions/config_subscription/grpc/watch_map.h"
      28             : #include "source/extensions/config_subscription/grpc/xds_mux/delta_subscription_state.h"
      29             : #include "source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h"
      30             : 
      31             : #include "absl/container/node_hash_map.h"
      32             : 
      33             : namespace Envoy {
      34             : namespace Config {
      35             : namespace XdsMux {
      36             : 
      37             : class ShutdownableMux {
      38             : public:
      39          14 :   virtual ~ShutdownableMux() = default;
      40             :   virtual void shutdown() PURE;
      41             : };
      42             : 
      43             : // Manages subscriptions to one or more type of resource. The logical protocol
      44             : // state of those subscription(s) is handled by SubscriptionState.
      45             : // This class owns the GrpcStream used to talk to the server, maintains queuing
      46             : // logic to properly order the subscription(s)' various messages, and allows
      47             : // starting/stopping/pausing of the subscriptions.
      48             : //
      49             : // @tparam S SubscriptionState state type, either SotwSubscriptionState or DeltaSubscriptionState
      50             : // @tparam F SubscriptionStateFactory type, either SotwSubscriptionStateFactory or
      51             : // DeltaSubscriptionStateFactory
      52             : // @tparam RQ Xds request type, either envoy::service::discovery::v3::DiscoveryRequest or
      53             : // envoy::service::discovery::v3::DeltaDiscoveryRequest
      54             : // @tparam RS Xds response type, either envoy::service::discovery::v3::DiscoveryResponse or
      55             : // envoy::service::discovery::v3::DeltaDiscoveryResponse
      56             : //
      57             : template <class S, class F, class RQ, class RS>
      58             : class GrpcMuxImpl : public GrpcStreamCallbacks<RS>,
      59             :                     public GrpcMux,
      60             :                     public ShutdownableMux,
      61             :                     Logger::Loggable<Logger::Id::config> {
      62             : public:
      63             :   GrpcMuxImpl(std::unique_ptr<F> subscription_state_factory, GrpcMuxContext& grpc_mux_context,
      64             :               bool skip_subsequent_node);
      65             : 
      66             :   ~GrpcMuxImpl() override;
      67             : 
      68             :   // Causes all GrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
      69             :   // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
      70             :   // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
      71             :   // would then cause all `GrpcMuxImpl` to be destructed.
      72             :   // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
      73             :   static void shutdownAll();
      74             : 
      75          28 :   void shutdown() override { shutdown_ = true; }
      76           0 :   bool isShutdown() { return shutdown_; }
      77             : 
      78             :   // TODO (dmitri-d) return a naked pointer instead of the wrapper once the legacy mux has been
      79             :   // removed and the mux interface can be changed
      80             :   Config::GrpcMuxWatchPtr addWatch(const std::string& type_url,
      81             :                                    const absl::flat_hash_set<std::string>& resources,
      82             :                                    SubscriptionCallbacks& callbacks,
      83             :                                    OpaqueResourceDecoderSharedPtr resource_decoder,
      84             :                                    const SubscriptionOptions& options) override;
      85             :   void updateWatch(const std::string& type_url, Watch* watch,
      86             :                    const absl::flat_hash_set<std::string>& resources,
      87             :                    const SubscriptionOptions& options);
      88             :   void removeWatch(const std::string& type_url, Watch* watch);
      89             : 
      90             :   ScopedResume pause(const std::string& type_url) override;
      91             :   ScopedResume pause(const std::vector<std::string> type_urls) override;
      92             :   void start() override;
      93           0 :   const absl::flat_hash_map<std::string, std::unique_ptr<S>>& subscriptions() const {
      94           0 :     return subscriptions_;
      95           0 :   }
      96             : 
      97             :   // GrpcStreamCallbacks
      98          14 :   void onStreamEstablished() override { handleEstablishedStream(); }
      99          14 :   void onEstablishmentFailure() override { handleStreamEstablishmentFailure(); }
     100           0 :   void onWriteable() override { trySendDiscoveryRequests(); }
     101             :   void onDiscoveryResponse(std::unique_ptr<RS>&& message,
     102         111 :                            ControlPlaneStats& control_plane_stats) override {
     103         111 :     genericHandleResponse(message->type_url(), *message, control_plane_stats);
     104         111 :   }
     105             : 
     106           0 :   EdsResourcesCacheOptRef edsResourcesCache() override {
     107           0 :     return makeOptRefFromPtr(eds_resources_cache_.get());
     108           0 :   }
     109             : 
     110           0 :   GrpcStream<RQ, RS>& grpcStreamForTest() { return grpc_stream_; }
     111             : 
     112             : protected:
     113             :   class WatchImpl : public Envoy::Config::GrpcMuxWatch {
     114             :   public:
     115             :     WatchImpl(const std::string& type_url, Watch* watch, GrpcMuxImpl& parent,
     116             :               const SubscriptionOptions& options)
     117          67 :         : type_url_(type_url), watch_(watch), parent_(parent), options_(options) {}
     118             : 
     119          67 :     ~WatchImpl() override { remove(); }
     120             : 
     121          67 :     void remove() {
     122          67 :       if (watch_) {
     123          67 :         parent_.removeWatch(type_url_, watch_);
     124          67 :         watch_ = nullptr;
     125          67 :       }
     126          67 :     }
     127             : 
     128           0 :     void update(const absl::flat_hash_set<std::string>& resources) override {
     129           0 :       parent_.updateWatch(type_url_, watch_, resources, options_);
     130           0 :     }
     131             : 
     132             :   private:
     133             :     const std::string type_url_;
     134             :     Watch* watch_;
     135             :     GrpcMuxImpl& parent_;
     136             :     const SubscriptionOptions options_;
     137             :   };
     138             : 
     139             :   void sendGrpcMessage(RQ& msg_proto, S& sub_state);
     140         452 :   void maybeUpdateQueueSizeStat(uint64_t size) { grpc_stream_.maybeUpdateQueueSizeStat(size); }
     141         180 :   bool grpcStreamAvailable() { return grpc_stream_.grpcStreamAvailable(); }
     142         180 :   bool rateLimitAllowsDrain() { return grpc_stream_.checkRateLimitAllowsDrain(); }
     143         180 :   void sendMessage(RQ& msg_proto) { grpc_stream_.sendMessage(msg_proto); }
     144             : 
     145             :   S& subscriptionStateFor(const std::string& type_url);
     146             :   WatchMap& watchMapFor(const std::string& type_url);
     147             :   void handleEstablishedStream();
     148             :   void handleStreamEstablishmentFailure();
     149             :   void genericHandleResponse(const std::string& type_url, const RS& response_proto,
     150             :                              ControlPlaneStats& control_plane_stats);
     151             :   void trySendDiscoveryRequests();
     152         166 :   bool skipSubsequentNode() const { return skip_subsequent_node_; }
     153         180 :   bool anyRequestSentYetInCurrentStream() const { return any_request_sent_yet_in_current_stream_; }
     154         194 :   void setAnyRequestSentYetInCurrentStream(bool value) {
     155         194 :     any_request_sent_yet_in_current_stream_ = value;
     156         194 :   }
     157          14 :   const LocalInfo::LocalInfo& localInfo() const { return local_info_; }
     158             : 
     159             : private:
     160             :   // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
     161             :   // whether we *want* to send a (Delta)DiscoveryRequest).
     162             :   bool canSendDiscoveryRequest(const std::string& type_url);
     163             : 
     164             :   // Checks whether we have something to say in a (Delta)DiscoveryRequest, which can be an ACK
     165             :   // and/or a subscription update. (Does not check whether we *can* send that
     166             :   // (Delta)DiscoveryRequest). Returns the type_url we should send the DeltaDiscoveryRequest for (if
     167             :   // any). First, prioritizes ACKs over non-ACK subscription interest updates. Then, prioritizes
     168             :   // non-ACK updates in the order the various types of subscriptions were activated (as tracked by
     169             :   // subscription_ordering_).
     170             :   absl::optional<std::string> whoWantsToSendDiscoveryRequest();
     171             : 
     172             :   // Invoked when dynamic context parameters change for a resource type.
     173             :   void onDynamicContextUpdate(absl::string_view resource_type_url);
     174             : 
     175             :   GrpcStream<RQ, RS> grpc_stream_;
     176             : 
     177             :   // Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All
     178             :   // of our different resource types' ACKs are mixed together in this queue. See class for
     179             :   // description of how it interacts with pause() and resume().
     180             :   PausableAckQueue pausable_ack_queue_;
     181             : 
     182             :   // Makes SubscriptionStates, to be held in the subscriptions_ map. Whether this GrpcMux is doing
     183             :   // delta or state of the world xDS is determined by which concrete subclass this variable gets.
     184             :   std::unique_ptr<F> subscription_state_factory_;
     185             : 
     186             :   // Map key is type_url.
     187             :   // Only addWatch() should insert into these maps.
     188             :   absl::flat_hash_map<std::string, std::unique_ptr<S>> subscriptions_;
     189             :   absl::flat_hash_map<std::string, std::unique_ptr<WatchMap>> watch_maps_;
     190             : 
     191             :   // Determines the order of initial discovery requests. (Assumes that subscriptions are added
     192             :   // to this GrpcMux in the order of Envoy's dependency ordering).
     193             :   std::list<std::string> subscription_ordering_;
     194             : 
     195             :   // Whether to enable the optimization of only including the node field in the very first
     196             :   // discovery request in an xDS gRPC stream (really just one: *not* per-type_url).
     197             :   const bool skip_subsequent_node_;
     198             : 
     199             :   // State to help with skip_subsequent_node's logic.
     200             :   bool any_request_sent_yet_in_current_stream_{};
     201             : 
     202             :   // Used to populate the (Delta)DiscoveryRequest's node field. That field is the same across
     203             :   // all type_urls, and moreover, the 'skip_subsequent_node' logic needs to operate across all
     204             :   // the type_urls. So, while the SubscriptionStates populate every other field of these messages,
     205             :   // this one is up to GrpcMux.
     206             :   const LocalInfo::LocalInfo& local_info_;
     207             :   Common::CallbackHandlePtr dynamic_update_callback_handle_;
     208             :   CustomConfigValidatorsPtr config_validators_;
     209             :   XdsConfigTrackerOptRef xds_config_tracker_;
     210             :   XdsResourcesDelegateOptRef xds_resources_delegate_;
     211             :   EdsResourcesCachePtr eds_resources_cache_;
     212             :   const std::string target_xds_authority_;
     213             : 
     214             :   bool started_{false};
     215             :   // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
     216             :   // true because it may contain dangling pointers.
     217             :   std::atomic<bool> shutdown_{false};
     218             : };
     219             : 
     220             : class GrpcMuxDelta : public GrpcMuxImpl<DeltaSubscriptionState, DeltaSubscriptionStateFactory,
     221             :                                         envoy::service::discovery::v3::DeltaDiscoveryRequest,
     222             :                                         envoy::service::discovery::v3::DeltaDiscoveryResponse> {
     223             : public:
     224             :   GrpcMuxDelta(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node);
     225             : 
     226             :   // GrpcStreamCallbacks
     227             :   void requestOnDemandUpdate(const std::string& type_url,
     228             :                              const absl::flat_hash_set<std::string>& for_update) override;
     229             : };
     230             : 
     231             : class GrpcMuxSotw : public GrpcMuxImpl<SotwSubscriptionState, SotwSubscriptionStateFactory,
     232             :                                        envoy::service::discovery::v3::DiscoveryRequest,
     233             :                                        envoy::service::discovery::v3::DiscoveryResponse> {
     234             : public:
     235             :   GrpcMuxSotw(GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node);
     236             : 
     237             :   // GrpcStreamCallbacks
     238           0 :   void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
     239           0 :     ENVOY_BUG(false, "unexpected request for on demand update");
     240           0 :   }
     241             : };
     242             : 
     243             : class NullGrpcMuxImpl : public GrpcMux {
     244             : public:
     245           0 :   void start() override {}
     246             : 
     247           0 :   ScopedResume pause(const std::string&) override {
     248           0 :     return std::make_unique<Cleanup>([]() {});
     249           0 :   }
     250           0 :   ScopedResume pause(const std::vector<std::string>) override {
     251           0 :     return std::make_unique<Cleanup>([]() {});
     252           0 :   }
     253             : 
     254             :   Config::GrpcMuxWatchPtr addWatch(const std::string&, const absl::flat_hash_set<std::string>&,
     255             :                                    SubscriptionCallbacks&, OpaqueResourceDecoderSharedPtr,
     256             :                                    const SubscriptionOptions&) override;
     257             : 
     258           0 :   void requestOnDemandUpdate(const std::string&, const absl::flat_hash_set<std::string>&) override {
     259           0 :     ENVOY_BUG(false, "unexpected request for on demand update");
     260           0 :   }
     261             : 
     262           0 :   EdsResourcesCacheOptRef edsResourcesCache() override { return {}; }
     263             : };
     264             : 
     265             : } // namespace XdsMux
     266             : } // namespace Config
     267             : } // namespace Envoy

Generated by: LCOV version 1.15