Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/config/subscription.h" 4 : #include "envoy/config/xds_config_tracker.h" 5 : #include "envoy/event/dispatcher.h" 6 : #include "envoy/grpc/status.h" 7 : #include "envoy/local_info/local_info.h" 8 : #include "envoy/service/discovery/v3/discovery.pb.h" 9 : 10 : #include "source/common/common/assert.h" 11 : #include "source/common/common/logger.h" 12 : #include "source/common/config/api_version.h" 13 : #include "source/common/config/ttl.h" 14 : #include "source/extensions/config_subscription/grpc/pausable_ack_queue.h" 15 : #include "source/extensions/config_subscription/grpc/watch_map.h" 16 : 17 : #include "absl/container/node_hash_map.h" 18 : 19 : namespace Envoy { 20 : namespace Config { 21 : 22 : // Tracks the xDS protocol state of an individual ongoing delta xDS session, i.e. a single type_url. 23 : // There can be multiple DeltaSubscriptionStates active. They will always all be blissfully 24 : // unaware of each other's existence, even when their messages are being multiplexed together by 25 : // ADS. 26 : // 27 : // There are two scenarios which affect how DeltaSubscriptionState manages the resources. First 28 : // scenario is when we are subscribed to a wildcard resource, and other scenario is when we are not. 29 : // 30 : // Delta subscription state also divides the resources it cached into three categories: requested, 31 : // wildcard and ambiguous. 32 : // 33 : // The "requested" category is for resources that we have explicitly asked for (either through the 34 : // initial set of resources or through the on-demand mechanism). Resources in this category are in 35 : // one of two states: "complete" and "waiting for server". 36 : // 37 : // "Complete" resources are resources about which the server sent us the information we need (for 38 : // now - just resource version). 39 : // 40 : // The "waiting for server" state is either for resources that we have just requested, but we still 41 : // didn't receive any version information from the server, or for the "complete" resources that, 42 : // according to the server, are gone, but we are still interested in them - in such case we strip 43 : // the information from the resource. 44 : // 45 : // The "wildcard" category is for resources that we are not explicitly interested in, but we are 46 : // indirectly interested through the subscription to the wildcard resource. 47 : // 48 : // The "ambiguous" category is for resources that we stopped being interested in, but we may still 49 : // be interested indirectly through the wildcard subscription. This situation happens because of the 50 : // xDS protocol limitation - the server isn't able to tell us that the resource we subscribed to is 51 : // also a part of our wildcard subscription. So when we unsubscribe from the resource, we need to 52 : // receive a confirmation from the server whether to keep the resource (which means that it was a 53 : // part of our wildcard subscription) or to drop it. 54 : // 55 : // Please refer to drawings (non-wildcard-resource-state-machine.png and 56 : // (wildcard-resource-state-machine.png) for visual depictions of the resource state machine. 57 : // 58 : // In the "no wildcard subscription" scenario all the cached resources should be in the "requested" 59 : // category. Resources are added to the category upon the explicit request and dropped when we 60 : // explicitly unsubscribe from it. Transitions between "complete" and "waiting for server" happen 61 : // when we receive messages from the server - if a resource in the message is in "added resources" 62 : // list (thus contains version information), the resource becomes "complete". If the resource in the 63 : // message is in "removed resources" list, it changes into the "waiting for server" state. If a 64 : // server sends us a resource that we didn't request, it's going to be ignored. 65 : // 66 : // In the "wildcard subscription" scenario, "requested" category is the same as in "no wildcard 67 : // subscription" scenario, with one exception - the unsubscribed "complete" resource is not removed 68 : // from the cache, but it's moved to the "ambiguous" resources instead. At this point we are waiting 69 : // for the server to tell us that this resource should be either moved to the "wildcard" resources, 70 : // or dropped. Resources in "wildcard" category are only added there or dropped from there by the 71 : // server. Resources from both "wildcard" and "ambiguous" categories can become "requested" 72 : // "complete" resources if we subscribe to them again. 73 : // 74 : // The delta subscription state transitions between the two scenarios depending on whether we are 75 : // subscribed to wildcard resource or not. Nothing special happens when we transition from "no 76 : // wildcard subscription" to "wildcard subscription" scenario, but when transitioning in the other 77 : // direction, we drop all the resources in "wildcard" and "ambiguous" categories. 78 : class DeltaSubscriptionState : public Logger::Loggable<Logger::Id::config> { 79 : public: 80 : DeltaSubscriptionState(std::string type_url, UntypedConfigUpdateCallbacks& watch_map, 81 : const LocalInfo::LocalInfo& local_info, Event::Dispatcher& dispatcher, 82 : XdsConfigTrackerOptRef xds_config_tracker); 83 : 84 : // Update which resources we're interested in subscribing to. 85 : void updateSubscriptionInterest(const absl::flat_hash_set<std::string>& cur_added, 86 : const absl::flat_hash_set<std::string>& cur_removed); 87 0 : void setMustSendDiscoveryRequest() { must_send_discovery_request_ = true; } 88 : 89 : // Whether there was a change in our subscription interest we have yet to inform the server of. 90 : bool subscriptionUpdatePending() const; 91 : 92 0 : void markStreamFresh() { any_request_sent_yet_in_current_stream_ = false; } 93 : 94 : UpdateAck handleResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message); 95 : 96 : void handleEstablishmentFailure(); 97 : 98 : // Returns the next gRPC request proto to be sent off to the server, based on this object's 99 : // understanding of the current protocol state, and new resources that Envoy wants to request. 100 : envoy::service::discovery::v3::DeltaDiscoveryRequest getNextRequestAckless(); 101 : 102 : // The WithAck version first calls the Ack-less version, then adds in the passed-in ack. 103 : envoy::service::discovery::v3::DeltaDiscoveryRequest getNextRequestWithAck(const UpdateAck& ack); 104 : 105 : DeltaSubscriptionState(const DeltaSubscriptionState&) = delete; 106 : DeltaSubscriptionState& operator=(const DeltaSubscriptionState&) = delete; 107 : 108 : private: 109 : bool isHeartbeatResponse(const envoy::service::discovery::v3::Resource& resource) const; 110 : void handleGoodResponse(const envoy::service::discovery::v3::DeltaDiscoveryResponse& message); 111 : void handleBadResponse(const EnvoyException& e, UpdateAck& ack); 112 : 113 : class ResourceState { 114 : public: 115 : // Builds a ResourceState in the waitingForServer state. 116 17 : ResourceState() = default; 117 : // Builds a ResourceState with a specific version 118 0 : ResourceState(absl::string_view version) : version_(version) {} 119 : // Self-documenting alias of default constructor. 120 17 : static ResourceState waitingForServer() { return {}; } 121 : // Self-documenting alias of constructor with version. 122 0 : static ResourceState withVersion(absl::string_view version) { return {version}; } 123 : 124 : // If true, we currently have no version of this resource - we are waiting for the server to 125 : // provide us with one. 126 15 : bool isWaitingForServer() const { return version_ == absl::nullopt; } 127 : 128 0 : void setAsWaitingForServer() { version_ = absl::nullopt; } 129 12 : void setVersion(absl::string_view version) { version_ = std::string(version); } 130 : 131 : // Must not be called if waitingForServer() == true. 132 0 : std::string version() const { 133 0 : ASSERT(version_.has_value()); 134 0 : return version_.value_or(""); 135 0 : } 136 : 137 : private: 138 : absl::optional<std::string> version_; 139 : }; 140 : 141 : void addResourceStateFromServer(const envoy::service::discovery::v3::Resource& resource); 142 : OptRef<ResourceState> getRequestedResourceState(absl::string_view resource_name); 143 : OptRef<const ResourceState> getRequestedResourceState(absl::string_view resource_name) const; 144 : 145 : bool isInitialRequestForLegacyWildcard(); 146 : 147 : // A map from resource name to per-resource version. The keys of this map are exactly the resource 148 : // names we are currently interested in. Those in the waitingForServer state currently don't have 149 : // any version for that resource: we need to inform the server if we lose interest in them, but we 150 : // also need to *not* include them in the initial_resource_versions map upon a reconnect. 151 : absl::node_hash_map<std::string, ResourceState> requested_resource_state_; 152 : // A map from resource name to per-resource version. The keys of this map are resource names we 153 : // have received as a part of the wildcard subscription. 154 : absl::node_hash_map<std::string, std::string> wildcard_resource_state_; 155 : // Used for storing resources that we lost interest in, but could 156 : // also be a part of wildcard subscription. 157 : absl::node_hash_map<std::string, std::string> ambiguous_resource_state_; 158 : 159 : // Not all xDS resources supports heartbeats due to there being specific information encoded in 160 : // an empty response, which is indistinguishable from a heartbeat in some cases. For now we just 161 : // disable heartbeats for these resources (currently only VHDS). 162 : const bool supports_heartbeats_; 163 : TtlManager ttl_; 164 : 165 : const std::string type_url_; 166 : UntypedConfigUpdateCallbacks& watch_map_; 167 : const LocalInfo::LocalInfo& local_info_; 168 : XdsConfigTrackerOptRef xds_config_tracker_; 169 : 170 : bool in_initial_legacy_wildcard_{true}; 171 : bool any_request_sent_yet_in_current_stream_{}; 172 : bool must_send_discovery_request_{}; 173 : 174 : // Tracks changes in our subscription interest since the previous DeltaDiscoveryRequest we sent. 175 : // TODO: Can't use absl::flat_hash_set due to ordering issues in gTest expectation matching. 176 : // Feel free to change to an unordered container once we figure out how to make it work. 177 : std::set<std::string> names_added_; 178 : std::set<std::string> names_removed_; 179 : }; 180 : 181 : } // namespace Config 182 : } // namespace Envoy