LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc - new_grpc_mux_impl.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 14 26 53.8 %
Date: 2024-01-05 06:35:25 Functions: 5 9 55.6 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <memory>
       4             : 
       5             : #include "envoy/common/random_generator.h"
       6             : #include "envoy/common/token_bucket.h"
       7             : #include "envoy/config/endpoint/v3/endpoint.pb.h"
       8             : #include "envoy/config/grpc_mux.h"
       9             : #include "envoy/config/subscription.h"
      10             : #include "envoy/config/xds_config_tracker.h"
      11             : #include "envoy/service/discovery/v3/discovery.pb.h"
      12             : 
      13             : #include "source/common/common/logger.h"
      14             : #include "source/common/config/api_version.h"
      15             : #include "source/common/config/resource_name.h"
      16             : #include "source/common/grpc/common.h"
      17             : #include "source/common/runtime/runtime_features.h"
      18             : #include "source/extensions/config_subscription/grpc/delta_subscription_state.h"
      19             : #include "source/extensions/config_subscription/grpc/grpc_mux_context.h"
      20             : #include "source/extensions/config_subscription/grpc/grpc_stream.h"
      21             : #include "source/extensions/config_subscription/grpc/pausable_ack_queue.h"
      22             : #include "source/extensions/config_subscription/grpc/watch_map.h"
      23             : 
      24             : namespace Envoy {
      25             : namespace Config {
      26             : 
      27             : // Manages subscriptions to one or more type of resource. The logical protocol
      28             : // state of those subscription(s) is handled by DeltaSubscriptionState.
      29             : // This class owns the GrpcStream used to talk to the server, maintains queuing
      30             : // logic to properly order the subscription(s)' various messages, and allows
      31             : // starting/stopping/pausing of the subscriptions.
      32             : class NewGrpcMuxImpl
      33             :     : public GrpcMux,
      34             :       public GrpcStreamCallbacks<envoy::service::discovery::v3::DeltaDiscoveryResponse>,
      35             :       Logger::Loggable<Logger::Id::config> {
      36             : public:
      37             :   NewGrpcMuxImpl(GrpcMuxContext& grpc_mux_context);
      38             : 
      39             :   ~NewGrpcMuxImpl() override;
      40             : 
      41             :   // Causes all NewGrpcMuxImpl objects to stop sending any messages on `grpc_stream_` to fix a crash
      42             :   // on Envoy shutdown due to dangling pointers. This may not be the ideal fix; it is probably
      43             :   // preferable for the `ServerImpl` to cause all configuration subscriptions to be shutdown, which
      44             :   // would then cause all `NewGrpcMuxImpl` to be destructed.
      45             :   // TODO: figure out the correct fix: https://github.com/envoyproxy/envoy/issues/15072.
      46             :   static void shutdownAll();
      47             : 
      48           4 :   void shutdown() { shutdown_ = true; }
      49             : 
      50             :   GrpcMuxWatchPtr addWatch(const std::string& type_url,
      51             :                            const absl::flat_hash_set<std::string>& resources,
      52             :                            SubscriptionCallbacks& callbacks,
      53             :                            OpaqueResourceDecoderSharedPtr resource_decoder,
      54             :                            const SubscriptionOptions& options) override;
      55             : 
      56             :   void requestOnDemandUpdate(const std::string& type_url,
      57             :                              const absl::flat_hash_set<std::string>& for_update) override;
      58             : 
      59           0 :   EdsResourcesCacheOptRef edsResourcesCache() override {
      60           0 :     return makeOptRefFromPtr(eds_resources_cache_.get());
      61           0 :   }
      62             : 
      63             :   ScopedResume pause(const std::string& type_url) override;
      64             :   ScopedResume pause(const std::vector<std::string> type_urls) override;
      65             : 
      66             :   void onDiscoveryResponse(
      67             :       std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryResponse>&& message,
      68             :       ControlPlaneStats& control_plane_stats) override;
      69             : 
      70             :   void onStreamEstablished() override;
      71             : 
      72             :   void onEstablishmentFailure() override;
      73             : 
      74             :   void onWriteable() override;
      75             : 
      76             :   void kickOffAck(UpdateAck ack);
      77             : 
      78             :   // TODO(fredlas) remove this from the GrpcMux interface.
      79             :   void start() override;
      80             : 
      81             :   GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
      82             :              envoy::service::discovery::v3::DeltaDiscoveryResponse>&
      83           0 :   grpcStreamForTest() {
      84           0 :     return grpc_stream_;
      85           0 :   }
      86             : 
      87             :   struct SubscriptionStuff {
      88             :     SubscriptionStuff(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
      89             :                       const bool use_namespace_matching, Event::Dispatcher& dispatcher,
      90             :                       CustomConfigValidators& config_validators,
      91             :                       XdsConfigTrackerOptRef xds_config_tracker,
      92             :                       EdsResourcesCacheOptRef eds_resources_cache)
      93             :         : watch_map_(use_namespace_matching, type_url, config_validators, eds_resources_cache),
      94          15 :           sub_state_(type_url, watch_map_, local_info, dispatcher, xds_config_tracker) {
      95             :       // If eds resources cache is provided, then the type must be ClusterLoadAssignment.
      96          15 :       ASSERT(
      97          15 :           !eds_resources_cache.has_value() ||
      98          15 :           (type_url == Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>()));
      99          15 :     }
     100             : 
     101             :     WatchMap watch_map_;
     102             :     DeltaSubscriptionState sub_state_;
     103             :     std::string control_plane_identifier_{};
     104             : 
     105             :     SubscriptionStuff(const SubscriptionStuff&) = delete;
     106             :     SubscriptionStuff& operator=(const SubscriptionStuff&) = delete;
     107             :   };
     108             : 
     109             :   using SubscriptionStuffPtr = std::unique_ptr<SubscriptionStuff>;
     110             : 
     111             :   // for use in tests only
     112           0 :   const absl::flat_hash_map<std::string, SubscriptionStuffPtr>& subscriptions() {
     113           0 :     return subscriptions_;
     114           0 :   }
     115             : 
     116             : private:
     117             :   class WatchImpl : public GrpcMuxWatch {
     118             :   public:
     119             :     WatchImpl(const std::string& type_url, Watch* watch, NewGrpcMuxImpl& parent,
     120             :               const SubscriptionOptions& options)
     121          17 :         : type_url_(type_url), watch_(watch), parent_(parent), options_(options) {}
     122             : 
     123          17 :     ~WatchImpl() override { remove(); }
     124             : 
     125          17 :     void remove() {
     126          17 :       if (watch_) {
     127          17 :         parent_.removeWatch(type_url_, watch_);
     128          17 :         watch_ = nullptr;
     129          17 :       }
     130          17 :     }
     131             : 
     132           0 :     void update(const absl::flat_hash_set<std::string>& resources) override {
     133           0 :       parent_.updateWatch(type_url_, watch_, resources, options_);
     134           0 :     }
     135             : 
     136             :   private:
     137             :     const std::string type_url_;
     138             :     Watch* watch_;
     139             :     NewGrpcMuxImpl& parent_;
     140             :     const SubscriptionOptions options_;
     141             :   };
     142             : 
     143             :   void removeWatch(const std::string& type_url, Watch* watch);
     144             : 
     145             :   // Updates the list of resource names watched by the given watch. If an added name is new across
     146             :   // the whole subscription, or if a removed name has no other watch interested in it, then the
     147             :   // subscription will enqueue and attempt to send an appropriate discovery request.
     148             :   void updateWatch(const std::string& type_url, Watch* watch,
     149             :                    const absl::flat_hash_set<std::string>& resources,
     150             :                    const SubscriptionOptions& options);
     151             : 
     152             :   // Adds a subscription for the type_url to the subscriptions map and order list.
     153             :   void addSubscription(const std::string& type_url, bool use_namespace_matching);
     154             : 
     155             :   void trySendDiscoveryRequests();
     156             : 
     157             :   // Checks whether external conditions allow sending a DeltaDiscoveryRequest. (Does not check
     158             :   // whether we *want* to send a DeltaDiscoveryRequest).
     159             :   bool canSendDiscoveryRequest(const std::string& type_url);
     160             : 
     161             :   // Checks whether we have something to say in a DeltaDiscoveryRequest, which can be an ACK and/or
     162             :   // a subscription update. (Does not check whether we *can* send that DeltaDiscoveryRequest).
     163             :   // Returns the type_url we should send the DeltaDiscoveryRequest for (if any).
     164             :   // First, prioritizes ACKs over non-ACK subscription interest updates.
     165             :   // Then, prioritizes non-ACK updates in the order the various types
     166             :   // of subscriptions were activated.
     167             :   absl::optional<std::string> whoWantsToSendDiscoveryRequest();
     168             : 
     169             :   // Invoked when dynamic context parameters change for a resource type.
     170             :   void onDynamicContextUpdate(absl::string_view resource_type_url);
     171             : 
     172             :   // Resource (N)ACKs we're waiting to send, stored in the order that they should be sent in. All
     173             :   // of our different resource types' ACKs are mixed together in this queue. See class for
     174             :   // description of how it interacts with pause() and resume().
     175             :   PausableAckQueue pausable_ack_queue_;
     176             : 
     177             :   // Map key is type_url.
     178             :   absl::flat_hash_map<std::string, SubscriptionStuffPtr> subscriptions_;
     179             : 
     180             :   // Determines the order of initial discovery requests. (Assumes that subscriptions are added in
     181             :   // the order of Envoy's dependency ordering).
     182             :   std::list<std::string> subscription_ordering_;
     183             : 
     184             :   GrpcStream<envoy::service::discovery::v3::DeltaDiscoveryRequest,
     185             :              envoy::service::discovery::v3::DeltaDiscoveryResponse>
     186             :       grpc_stream_;
     187             : 
     188             :   const LocalInfo::LocalInfo& local_info_;
     189             :   CustomConfigValidatorsPtr config_validators_;
     190             :   Common::CallbackHandlePtr dynamic_update_callback_handle_;
     191             :   Event::Dispatcher& dispatcher_;
     192             :   XdsConfigTrackerOptRef xds_config_tracker_;
     193             :   EdsResourcesCachePtr eds_resources_cache_;
     194             : 
     195             :   bool started_{false};
     196             :   // True iff Envoy is shutting down; no messages should be sent on the `grpc_stream_` when this is
     197             :   // true because it may contain dangling pointers.
     198             :   std::atomic<bool> shutdown_{false};
     199             : };
     200             : 
     201             : using NewGrpcMuxImplPtr = std::unique_ptr<NewGrpcMuxImpl>;
     202             : using NewGrpcMuxImplSharedPtr = std::shared_ptr<NewGrpcMuxImpl>;
     203             : 
     204             : class NewGrpcMuxFactory;
     205             : DECLARE_FACTORY(NewGrpcMuxFactory);
     206             : 
     207             : } // namespace Config
     208             : } // namespace Envoy

Generated by: LCOV version 1.15