Line data Source code
1 : #pragma once 2 : 3 : #include <memory> 4 : #include <string> 5 : 6 : #include "envoy/common/pure.h" 7 : #include "envoy/config/subscription.h" 8 : #include "envoy/config/xds_config_tracker.h" 9 : #include "envoy/config/xds_resources_delegate.h" 10 : #include "envoy/event/dispatcher.h" 11 : #include "envoy/service/discovery/v3/discovery.pb.h" 12 : 13 : #include "source/common/config/ttl.h" 14 : #include "source/common/config/utility.h" 15 : #include "source/common/protobuf/protobuf.h" 16 : #include "source/extensions/config_subscription/grpc/update_ack.h" 17 : 18 : #include "absl/strings/string_view.h" 19 : 20 : namespace Envoy { 21 : namespace Config { 22 : namespace XdsMux { 23 : 24 : class SubscriptionState {}; 25 : 26 : // Tracks the protocol state of an individual ongoing xDS-over-gRPC session, for a single type_url. 27 : // There can be multiple SubscriptionStates active, one per type_url. They will all be 28 : // blissfully unaware of each other's existence, even when their messages are being multiplexed 29 : // together by ADS. 30 : // This is the abstract parent class for both the delta and state-of-the-world xDS variants. 31 : template <class RS, class RQ> 32 : class BaseSubscriptionState : public SubscriptionState, 33 : public Logger::Loggable<Logger::Id::config> { 34 : public: 35 : // Note that, outside of tests, we expect callbacks to always be a WatchMap. 36 : BaseSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& callbacks, 37 : Event::Dispatcher& dispatcher, 38 : XdsConfigTrackerOptRef xds_config_tracker = absl::nullopt, 39 : XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, 40 : const std::string& target_xds_authority = "") 41 0 : : ttl_([this](const std::vector<std::string>& expired) { ttlExpiryCallback(expired); }, 42 : dispatcher, dispatcher.timeSource()), 43 : type_url_(std::move(type_url)), callbacks_(callbacks), dispatcher_(dispatcher), 44 : xds_config_tracker_(xds_config_tracker), xds_resources_delegate_(xds_resources_delegate), 45 54 : target_xds_authority_(target_xds_authority) {} 46 : 47 54 : virtual ~BaseSubscriptionState() = default; 48 : 49 : // Update which resources we're interested in subscribing to. 50 : virtual void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added, 51 : const absl::flat_hash_set<std::string>& cur_removed) PURE; 52 : 53 0 : void setDynamicContextChanged() { dynamic_context_changed_ = true; } 54 180 : void clearDynamicContextChanged() { dynamic_context_changed_ = false; } 55 1622 : bool dynamicContextChanged() const { return dynamic_context_changed_; } 56 : 57 0 : void setControlPlaneIdentifier(const std::string& id) { control_plane_identifier_ = id; } 58 0 : std::string& controlPlaneIdentifier() { return control_plane_identifier_; } 59 : 60 : // Whether there was a change in our subscription interest we have yet to inform the server of. 61 : virtual bool subscriptionUpdatePending() const PURE; 62 : 63 : virtual void markStreamFresh() PURE; 64 : 65 111 : UpdateAck handleResponse(const RS& response) { 66 : // We *always* copy the response's nonce into the next request, even if we're going to make that 67 : // request a NACK by setting error_detail. 68 111 : UpdateAck ack(response.nonce(), typeUrl()); 69 111 : ENVOY_LOG(debug, "Handling response for {}", typeUrl()); 70 111 : TRY_ASSERT_MAIN_THREAD { handleGoodResponse(response); } 71 111 : END_TRY 72 111 : catch (const EnvoyException& e) { 73 0 : if (xds_config_tracker_.has_value()) { 74 0 : xds_config_tracker_->onConfigRejected(response, 75 0 : Config::Utility::truncateGrpcStatusMessage(e.what())); 76 0 : } 77 0 : handleBadResponse(e, ack); 78 0 : } 79 111 : previously_fetched_data_ = true; 80 111 : return ack; 81 111 : } 82 : 83 54 : virtual void handleEstablishmentFailure() { 84 54 : ENVOY_LOG(debug, "SubscriptionState establishment failed for {}", typeUrl()); 85 54 : callbacks().onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure, 86 54 : nullptr); 87 54 : } 88 : 89 : // Returns the next gRPC request proto to be sent off to the server, based on this object's 90 : // understanding of the current protocol state, and new resources that Envoy wants to request. 91 69 : std::unique_ptr<RQ> getNextRequestAckless() { return getNextRequestInternal(); } 92 : 93 : // The WithAck version first calls the ack-less version, then adds in the passed-in ack. 94 : // Returns a new'd pointer, meant to be owned by the caller, who is expected to know what type the 95 : // pointer actually is. 96 111 : std::unique_ptr<RQ> getNextRequestWithAck(const UpdateAck& ack) { 97 111 : auto request = getNextRequestInternal(); 98 111 : request->set_response_nonce(ack.nonce_); 99 111 : ENVOY_LOG(debug, "ACK for {} will have nonce {}", typeUrl(), ack.nonce_); 100 111 : if (ack.error_detail_.code() != Grpc::Status::WellKnownGrpcStatus::Ok) { 101 : // Don't needlessly make the field present-but-empty if status is ok. 102 0 : request->mutable_error_detail()->CopyFrom(ack.error_detail_); 103 0 : } 104 111 : return request; 105 111 : } 106 : 107 : virtual void ttlExpiryCallback(const std::vector<std::string>& type_url) PURE; 108 : 109 : protected: 110 : virtual std::unique_ptr<RQ> getNextRequestInternal() PURE; 111 : virtual void handleGoodResponse(const RS& message) PURE; 112 0 : void handleBadResponse(const EnvoyException& e, UpdateAck& ack) { 113 : // Note that error_detail being set is what indicates that a (Delta)DiscoveryRequest is a NACK. 114 0 : ack.error_detail_.set_code(Grpc::Status::WellKnownGrpcStatus::Internal); 115 0 : ack.error_detail_.set_message(Config::Utility::truncateGrpcStatusMessage(e.what())); 116 0 : ENVOY_LOG(warn, "Config for {} rejected: {}", typeUrl(), e.what()); 117 0 : callbacks().onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::UpdateRejected, &e); 118 0 : } 119 : 120 291 : std::string typeUrl() const { return type_url_; } 121 165 : UntypedConfigUpdateCallbacks& callbacks() const { return callbacks_; } 122 : 123 : TtlManager ttl_; 124 : const std::string type_url_; 125 : // callbacks_ is expected (outside of tests) to be a WatchMap. 126 : UntypedConfigUpdateCallbacks& callbacks_; 127 : Event::Dispatcher& dispatcher_; 128 : bool dynamic_context_changed_{}; 129 : std::string control_plane_identifier_{}; 130 : XdsConfigTrackerOptRef xds_config_tracker_; 131 : XdsResourcesDelegateOptRef xds_resources_delegate_; 132 : const std::string target_xds_authority_; 133 : bool previously_fetched_data_{}; 134 : }; 135 : 136 : template <class T> class SubscriptionStateFactory { 137 : public: 138 14 : virtual ~SubscriptionStateFactory() = default; 139 : // Note that, outside of tests, we expect callbacks to always be a WatchMap. 140 : virtual std::unique_ptr<T> 141 : makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks, 142 : OpaqueResourceDecoderSharedPtr resource_decoder, 143 : XdsConfigTrackerOptRef xds_config_tracker = absl::nullopt, 144 : XdsResourcesDelegateOptRef xds_resources_delegate = absl::nullopt, 145 : const std::string& target_xds_authority = "") PURE; 146 : }; 147 : 148 : } // namespace XdsMux 149 : } // namespace Config 150 : } // namespace Envoy