Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/grpc/status.h" 4 : 5 : #include "source/common/common/assert.h" 6 : #include "source/common/common/logger.h" 7 : #include "source/common/config/api_version.h" 8 : #include "source/extensions/config_subscription/grpc/xds_mux/subscription_state.h" 9 : 10 : #include "absl/container/node_hash_map.h" 11 : #include "absl/types/optional.h" 12 : 13 : namespace Envoy { 14 : namespace Config { 15 : namespace XdsMux { 16 : 17 : // Tracks the state of a delta xDS-over-gRPC protocol session. 18 : class DeltaSubscriptionState 19 : : public BaseSubscriptionState<envoy::service::discovery::v3::DeltaDiscoveryResponse, 20 : envoy::service::discovery::v3::DeltaDiscoveryRequest> { 21 : public: 22 : DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, 23 : Event::Dispatcher& dispatcher, XdsConfigTrackerOptRef xds_config_tracker); 24 : 25 : ~DeltaSubscriptionState() override; 26 : 27 : // Update which resources we're interested in subscribing to. 28 : void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added, 29 : const absl::flat_hash_set<std::string>& cur_removed) override; 30 : 31 : // Whether there was a change in our subscription interest we have yet to inform the server of. 32 : bool subscriptionUpdatePending() const override; 33 : 34 0 : void markStreamFresh() override { any_request_sent_yet_in_current_stream_ = false; } 35 : 36 : void ttlExpiryCallback(const std::vector<std::string>& expired) override; 37 : 38 : DeltaSubscriptionState(const DeltaSubscriptionState&) = delete; 39 : DeltaSubscriptionState& operator=(const DeltaSubscriptionState&) = delete; 40 : 41 : private: 42 : std::unique_ptr<envoy::service::discovery::v3::DeltaDiscoveryRequest> 43 : getNextRequestInternal() override; 44 : 45 : void setResourceTtl(const envoy::service::discovery::v3::Resource& resource); 46 : bool isHeartbeatResource(const envoy::service::discovery::v3::Resource& resource) const; 47 : void 48 : handleGoodResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message) override; 49 : void addResourceStateFromServer(const envoy::service::discovery::v3::Resource& resource); 50 : 51 : class ResourceState { 52 : public: 53 : // Builds a ResourceVersion in the waitingForServer state. 54 17 : ResourceState() = default; 55 : // Builds a ResourceState with a specific version 56 0 : ResourceState(absl::string_view version) : version_(version) {} 57 : // Self-documenting alias of default constructor. 58 17 : static ResourceState waitingForServer() { return {}; } 59 : // Self-documenting alias of constructor with version. 60 0 : static ResourceState withVersion(absl::string_view version) { return {version}; } 61 : 62 : // If true, we currently have no version of this resource - we are waiting for the server to 63 : // provide us with one. 64 15 : bool isWaitingForServer() const { return version_ == absl::nullopt; } 65 : 66 0 : void setAsWaitingForServer() { version_ = absl::nullopt; } 67 12 : void setVersion(absl::string_view version) { version_ = std::string(version); } 68 : 69 : // Must not be called if waitingForServer() == true. 70 0 : std::string version() const { 71 0 : ASSERT(version_.has_value()); 72 0 : return version_.value_or(""); 73 0 : } 74 : 75 : private: 76 : absl::optional<std::string> version_; 77 : }; 78 : 79 : OptRef<ResourceState> getRequestedResourceState(absl::string_view resource_name); 80 : OptRef<const ResourceState> getRequestedResourceState(absl::string_view resource_name) const; 81 : 82 : bool isInitialRequestForLegacyWildcard(); 83 : 84 : // Not all xDS resources support heartbeats due to there being specific information encoded in 85 : // an empty response, which is indistinguishable from a heartbeat in some cases. For now we just 86 : // disable heartbeats for these resources (currently only VHDS). 87 : const bool supports_heartbeats_; 88 : 89 : // A map from resource name to per-resource version. The keys of this map are exactly the resource 90 : // names we are currently interested in. Those in the waitingForServer state currently don't have 91 : // any version for that resource: we need to inform the server if we lose interest in them, but we 92 : // also need to *not* include them in the initial_resource_versions map upon a reconnect. 93 : absl::node_hash_map<std::string, ResourceState> requested_resource_state_; 94 : // A map from resource name to per-resource version. The keys of this map are resource names we 95 : // have received as a part of the wildcard subscription. 96 : absl::node_hash_map<std::string, std::string> wildcard_resource_state_; 97 : // Used for storing resources that we lost interest in, but could 98 : // also be a part of wildcard subscription. 99 : absl::node_hash_map<std::string, std::string> ambiguous_resource_state_; 100 : 101 : bool in_initial_legacy_wildcard_{true}; 102 : bool any_request_sent_yet_in_current_stream_{}; 103 : 104 : // Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent. 105 : // TODO: Can't use absl::flat_hash_set due to ordering issues in gTest expectation matching. 106 : // Feel free to change to an unordered container once we figure out how to make it work. 107 : std::set<std::string> names_added_; 108 : std::set<std::string> names_removed_; 109 : }; 110 : 111 : class DeltaSubscriptionStateFactory : public SubscriptionStateFactory<DeltaSubscriptionState> { 112 : public: 113 4 : DeltaSubscriptionStateFactory(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} 114 4 : ~DeltaSubscriptionStateFactory() override = default; 115 : std::unique_ptr<DeltaSubscriptionState> 116 : makeSubscriptionState(const std::string& type_url, UntypedConfigUpdateCallbacks& callbacks, 117 : OpaqueResourceDecoderSharedPtr, XdsConfigTrackerOptRef xds_config_tracker, 118 : XdsResourcesDelegateOptRef /*xds_resources_delegate*/, 119 15 : const std::string& /*target_xds_authority*/) override { 120 15 : return std::make_unique<DeltaSubscriptionState>(type_url, callbacks, dispatcher_, 121 15 : xds_config_tracker); 122 15 : } 123 : 124 : private: 125 : Event::Dispatcher& dispatcher_; 126 : }; 127 : 128 : } // namespace XdsMux 129 : } // namespace Config 130 : } // namespace Envoy