Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/grpc/status.h" 4 : #include "envoy/service/discovery/v3/discovery.pb.h" 5 : 6 : #include "source/common/common/assert.h" 7 : #include "source/common/common/hash.h" 8 : #include "source/common/config/decoded_resource_impl.h" 9 : #include "source/extensions/config_subscription/grpc/xds_mux/subscription_state.h" 10 : 11 : #include "absl/types/optional.h" 12 : 13 : namespace Envoy { 14 : namespace Config { 15 : namespace XdsMux { 16 : 17 : // Tracks the state of a "state-of-the-world" (i.e. not delta) xDS-over-gRPC protocol session. 18 : class SotwSubscriptionState 19 : : public BaseSubscriptionState<envoy::service::discovery::v3::DiscoveryResponse, 20 : envoy::service::discovery::v3::DiscoveryRequest> { 21 : public: 22 : // Note that, outside of tests, we expect callbacks to always be a WatchMap. 23 : SotwSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& callbacks, 24 : Event::Dispatcher& dispatcher, 25 : OpaqueResourceDecoderSharedPtr resource_decoder, 26 : XdsConfigTrackerOptRef xds_config_tracker, 27 : XdsResourcesDelegateOptRef xds_resources_delegate, 28 : const std::string& target_xds_authority); 29 : ~SotwSubscriptionState() override; 30 : 31 : // Update which resources we're interested in subscribing to. 32 : void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added, 33 : const absl::flat_hash_set<std::string>& cur_removed) override; 34 : 35 : // Whether there was a change in our subscription interest we have yet to inform the server of. 36 : bool subscriptionUpdatePending() const override; 37 : 38 : void markStreamFresh() override; 39 : 40 : void ttlExpiryCallback(const std::vector<std::string>& expired) override; 41 : 42 : void handleEstablishmentFailure() override; 43 : 44 : SotwSubscriptionState(const SotwSubscriptionState&) = delete; 45 : SotwSubscriptionState& operator=(const SotwSubscriptionState&) = delete; 46 : 47 : private: 48 : std::unique_ptr<envoy::service::discovery::v3::DiscoveryRequest> 49 : getNextRequestInternal() override; 50 : 51 : void handleGoodResponse(const envoy::service::discovery::v3::DiscoveryResponse& message) override; 52 : void setResourceTtl(const DecodedResourceImpl& decoded_resource); 53 : bool isHeartbeatResource(const DecodedResource& resource, const std::string& version); 54 : 55 : OpaqueResourceDecoderSharedPtr resource_decoder_; 56 : 57 : // The version_info carried by the last accepted DiscoveryResponse. 58 : // Remains empty until one is accepted. 59 : absl::optional<std::string> last_good_version_info_; 60 : // The nonce carried by the last accepted DiscoveryResponse. 61 : // Remains empty until one is accepted. 62 : // Used when it's time to make a spontaneous (i.e. not primarily meant as an ACK) request. 63 : absl::optional<std::string> last_good_nonce_; 64 : 65 : // Starts true because we should send a request upon subscription start. 66 : bool update_pending_{true}; 67 : 68 : absl::flat_hash_set<std::string> names_tracked_; 69 : }; 70 : 71 : class SotwSubscriptionStateFactory : public SubscriptionStateFactory<SotwSubscriptionState> { 72 : public: 73 10 : SotwSubscriptionStateFactory(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} 74 10 : ~SotwSubscriptionStateFactory() override = default; 75 : std::unique_ptr<SotwSubscriptionState> 76 : makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks, 77 : OpaqueResourceDecoderSharedPtr resource_decoder, 78 : XdsConfigTrackerOptRef xds_config_tracker, 79 : XdsResourcesDelegateOptRef xds_resources_delegate, 80 39 : const std::string& target_xds_authority) override { 81 39 : return std::make_unique<SotwSubscriptionState>(type_url, callbacks, dispatcher_, 82 39 : resource_decoder, xds_config_tracker, 83 39 : xds_resources_delegate, target_xds_authority); 84 39 : } 85 : 86 : private: 87 : Event::Dispatcher& dispatcher_; 88 : }; 89 : 90 : } // namespace XdsMux 91 : } // namespace Config 92 : } // namespace Envoy