LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc/xds_mux - subscription_state.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 29 47 61.7 %
Date: 2024-01-05 06:35:25 Functions: 22 32 68.8 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <memory>
       4             : #include <string>
       5             : 
       6             : #include "envoy/common/pure.h"
       7             : #include "envoy/config/subscription.h"
       8             : #include "envoy/config/xds_config_tracker.h"
       9             : #include "envoy/config/xds_resources_delegate.h"
      10             : #include "envoy/event/dispatcher.h"
      11             : #include "envoy/service/discovery/v3/discovery.pb.h"
      12             : 
      13             : #include "source/common/config/ttl.h"
      14             : #include "source/common/config/utility.h"
      15             : #include "source/common/protobuf/protobuf.h"
      16             : #include "source/extensions/config_subscription/grpc/update_ack.h"
      17             : 
      18             : #include "absl/strings/string_view.h"
      19             : 
      20             : namespace Envoy {
      21             : namespace Config {
      22             : namespace XdsMux {
      23             : 
      24             : class SubscriptionState {};
      25             : 
      26             : // Tracks the protocol state of an individual ongoing xDS-over-gRPC session, for a single type_url.
      27             : // There can be multiple SubscriptionStates active, one per type_url. They will all be
      28             : // blissfully unaware of each other's existence, even when their messages are being multiplexed
      29             : // together by ADS.
      30             : // This is the abstract parent class for both the delta and state-of-the-world xDS variants.
      31             : template <class RS, class RQ>
      32             : class BaseSubscriptionState : public SubscriptionState,
      33             :                               public Logger::Loggable<Logger::Id::config> {
      34             : public:
      35             :   // Note that, outside of tests, we expect callbacks to always be a WatchMap.
      36             :   BaseSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& callbacks,
      37             :                         Event::Dispatcher& dispatcher,
      38             :                         XdsConfigTrackerOptRef xds_config_tracker = absl::nullopt,
      39             :                         XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt,
      40             :                         const std::string& target_xds_authority = "")
      41           0 :       : ttl_([this](const std::vector<std::string>& expired) { ttlExpiryCallback(expired); },
      42             :              dispatcher, dispatcher.timeSource()),
      43             :         type_url_(std::move(type_url)), callbacks_(callbacks), dispatcher_(dispatcher),
      44             :         xds_config_tracker_(xds_config_tracker), xds_resources_delegate_(xds_resources_delegate),
      45          54 :         target_xds_authority_(target_xds_authority) {}
      46             : 
      47          54 :   virtual ~BaseSubscriptionState() = default;
      48             : 
      49             :   // Update which resources we're interested in subscribing to.
      50             :   virtual void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added,
      51             :                                           const absl::flat_hash_set<std::string>& cur_removed) PURE;
      52             : 
      53           0 :   void setDynamicContextChanged() { dynamic_context_changed_ = true; }
      54         180 :   void clearDynamicContextChanged() { dynamic_context_changed_ = false; }
      55        1622 :   bool dynamicContextChanged() const { return dynamic_context_changed_; }
      56             : 
      57           0 :   void setControlPlaneIdentifier(const std::string& id) { control_plane_identifier_ = id; }
      58           0 :   std::string& controlPlaneIdentifier() { return control_plane_identifier_; }
      59             : 
      60             :   // Whether there was a change in our subscription interest we have yet to inform the server of.
      61             :   virtual bool subscriptionUpdatePending() const PURE;
      62             : 
      63             :   virtual void markStreamFresh() PURE;
      64             : 
      65         111 :   UpdateAck handleResponse(const RS& response) {
      66             :     // We *always* copy the response's nonce into the next request, even if we're going to make that
      67             :     // request a NACK by setting error_detail.
      68         111 :     UpdateAck ack(response.nonce(), typeUrl());
      69         111 :     ENVOY_LOG(debug, "Handling response for {}", typeUrl());
      70         111 :     TRY_ASSERT_MAIN_THREAD { handleGoodResponse(response); }
      71         111 :     END_TRY
      72         111 :     catch (const EnvoyException& e) {
      73           0 :       if (xds_config_tracker_.has_value()) {
      74           0 :         xds_config_tracker_->onConfigRejected(response,
      75           0 :                                               Config::Utility::truncateGrpcStatusMessage(e.what()));
      76           0 :       }
      77           0 :       handleBadResponse(e, ack);
      78           0 :     }
      79         111 :     previously_fetched_data_ = true;
      80         111 :     return ack;
      81         111 :   }
      82             : 
      83          54 :   virtual void handleEstablishmentFailure() {
      84          54 :     ENVOY_LOG(debug, "SubscriptionState establishment failed for {}", typeUrl());
      85          54 :     callbacks().onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure,
      86          54 :                                      nullptr);
      87          54 :   }
      88             : 
      89             :   // Returns the next gRPC request proto to be sent off to the server, based on this object's
      90             :   // understanding of the current protocol state, and new resources that Envoy wants to request.
      91          69 :   std::unique_ptr<RQ> getNextRequestAckless() { return getNextRequestInternal(); }
      92             : 
      93             :   // The WithAck version first calls the ack-less version, then adds in the passed-in ack.
      94             :   // Returns a new'd pointer, meant to be owned by the caller, who is expected to know what type the
      95             :   // pointer actually is.
      96         111 :   std::unique_ptr<RQ> getNextRequestWithAck(const UpdateAck& ack) {
      97         111 :     auto request = getNextRequestInternal();
      98         111 :     request->set_response_nonce(ack.nonce_);
      99         111 :     ENVOY_LOG(debug, "ACK for {} will have nonce {}", typeUrl(), ack.nonce_);
     100         111 :     if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) {
     101             :       // Don't needlessly make the field present-but-empty if status is ok.
     102           0 :       request->mutable_error_detail()->CopyFrom(ack.error_detail_);
     103           0 :     }
     104         111 :     return request;
     105         111 :   }
     106             : 
     107             :   virtual void ttlExpiryCallback(const std::vector<std::string>& type_url) PURE;
     108             : 
     109             : protected:
     110             :   virtual std::unique_ptr<RQ> getNextRequestInternal() PURE;
     111             :   virtual void handleGoodResponse(const RS& message) PURE;
     112           0 :   void handleBadResponse(const EnvoyException& e, UpdateAck& ack) {
     113             :     // Note that error_detail being set is what indicates that a (Delta)DiscoveryRequest is a NACK.
     114           0 :     ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal);
     115           0 :     ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what()));
     116           0 :     ENVOY_LOG(warn, "Config for {} rejected: {}", typeUrl(), e.what());
     117           0 :     callbacks().onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e);
     118           0 :   }
     119             : 
     120         291 :   std::string typeUrl() const { return type_url_; }
     121         165 :   UntypedConfigUpdateCallbacks& callbacks() const { return callbacks_; }
     122             : 
     123             :   TtlManager ttl_;
     124             :   const std::string type_url_;
     125             :   // callbacks_ is expected (outside of tests) to be a WatchMap.
     126             :   UntypedConfigUpdateCallbacks& callbacks_;
     127             :   Event::Dispatcher& dispatcher_;
     128             :   bool dynamic_context_changed_{};
     129             :   std::string control_plane_identifier_{};
     130             :   XdsConfigTrackerOptRef xds_config_tracker_;
     131             :   XdsResourcesDelegateOptRef xds_resources_delegate_;
     132             :   const std::string target_xds_authority_;
     133             :   bool previously_fetched_data_{};
     134             : };
     135             : 
     136             : template <class T> class SubscriptionStateFactory {
     137             : public:
     138          14 :   virtual ~SubscriptionStateFactory() = default;
     139             :   // Note that, outside of tests, we expect callbacks to always be a WatchMap.
     140             :   virtual std::unique_ptr<T>
     141             :   makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks,
     142             :                         OpaqueResourceDecoderSharedPtr resource_decoder,
     143             :                         XdsConfigTrackerOptRef xds_config_tracker = absl::nullopt,
     144             :                         XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt,
     145             :                         const std::string& target_xds_authority = "") PURE;
     146             : };
     147             : 
     148             : } // namespace XdsMux
     149             : } // namespace Config
     150             : } // namespace Envoy

Generated by: LCOV version 1.15