Line data Source code
1 : #include "source/extensions/config_subscription/grpc/xds_mux/sotw_subscription_state.h" 2 : 3 : #include "source/common/config/utility.h" 4 : #include "source/extensions/config_subscription/grpc/xds_source_id.h" 5 : 6 : namespace Envoy { 7 : namespace Config { 8 : namespace XdsMux { 9 : 10 : SotwSubscriptionState::SotwSubscriptionState( 11 : std::string type_url, UntypedConfigUpdateCallbacks& callbacks, Event::Dispatcher& dispatcher, 12 : OpaqueResourceDecoderSharedPtr resource_decoder, XdsConfigTrackerOptRef xds_config_tracker, 13 : XdsResourcesDelegateOptRef xds_resources_delegate, const std::string& target_xds_authority) 14 : : BaseSubscriptionState(std::move(type_url), callbacks, dispatcher, xds_config_tracker, 15 : xds_resources_delegate, target_xds_authority), 16 39 : resource_decoder_(resource_decoder) {} 17 : 18 39 : SotwSubscriptionState::~SotwSubscriptionState() = default; 19 : 20 : void SotwSubscriptionState::updateSubscriptionInterest( 21 : const absl::flat_hash_set<std::string>& cur_added, 22 100 : const absl::flat_hash_set<std::string>& cur_removed) { 23 100 : for (const auto& a : cur_added) { 24 30 : names_tracked_.insert(a); 25 30 : } 26 100 : for (const auto& r : cur_removed) { 27 30 : names_tracked_.erase(r); 28 30 : } 29 100 : if (!cur_added.empty() || !cur_removed.empty()) { 30 60 : update_pending_ = true; 31 60 : } 32 100 : } 33 : 34 : // Not having sent any requests yet counts as an "update pending" since you're supposed to resend 35 : // the entirety of your interest at the start of a stream, even if nothing has changed. 36 1312 : bool SotwSubscriptionState::subscriptionUpdatePending() const { 37 1312 : return update_pending_ || dynamicContextChanged(); 38 1312 : } 39 : 40 0 : void SotwSubscriptionState::markStreamFresh() { 41 0 : last_good_nonce_ = absl::nullopt; 42 0 : update_pending_ = true; 43 0 : clearDynamicContextChanged(); 44 0 : } 45 : 46 : void SotwSubscriptionState::handleGoodResponse( 47 88 : const envoy::service::discovery::v3::DiscoveryResponse& message) { 48 88 : std::vector<DecodedResourcePtr> non_heartbeat_resources; 49 : 50 88 : { 51 88 : const auto scoped_update = ttl_.scopedTtlUpdate(); 52 142 : for (const auto& any : message.resources()) { 53 142 : if (!any.Is<envoy::service::discovery::v3::Resource>() && 54 142 : any.type_url() != message.type_url()) { 55 0 : throw EnvoyException(fmt::format("type URL {} embedded in an individual Any does not match " 56 0 : "the message-wide type URL {} in DiscoveryResponse {}", 57 0 : any.type_url(), message.type_url(), 58 0 : message.DebugString())); 59 0 : } 60 : 61 142 : auto decoded_resource = 62 142 : DecodedResourceImpl::fromResource(*resource_decoder_, any, message.version_info()); 63 142 : setResourceTtl(*decoded_resource); 64 142 : if (isHeartbeatResource(*decoded_resource, message.version_info())) { 65 0 : continue; 66 0 : } 67 142 : non_heartbeat_resources.push_back(std::move(decoded_resource)); 68 142 : } 69 88 : } 70 : 71 : // TODO (dmitri-d) to eliminate decoding of resources twice consider expanding the interface to 72 : // support passing of decoded resources. This would also avoid a resource copy above. 73 88 : callbacks().onConfigUpdate(non_heartbeat_resources, message.version_info()); 74 : // Now that we're passed onConfigUpdate() without an exception thrown, we know we're good. 75 88 : last_good_version_info_ = message.version_info(); 76 88 : last_good_nonce_ = message.nonce(); 77 : 78 : // Processing point when resources are successfully ingested. 79 88 : if (xds_config_tracker_.has_value()) { 80 0 : xds_config_tracker_->onConfigAccepted(message.type_url(), non_heartbeat_resources); 81 0 : } 82 : 83 : // Send the resources to the xDS delegate, if configured. 84 88 : if (xds_resources_delegate_.has_value()) { 85 0 : XdsConfigSourceId source_id{target_xds_authority_, message.type_url()}; 86 0 : std::vector<DecodedResourceRef> resource_refs; 87 0 : resource_refs.reserve(non_heartbeat_resources.size()); 88 0 : for (const DecodedResourcePtr& r : non_heartbeat_resources) { 89 0 : resource_refs.emplace_back(*r); 90 0 : } 91 0 : xds_resources_delegate_->onConfigUpdated(source_id, resource_refs); 92 0 : } 93 : 94 88 : ENVOY_LOG(debug, "Config update for {} (version {}) accepted with {} resources", typeUrl(), 95 88 : message.version_info(), message.resources().size()); 96 88 : } 97 : 98 39 : void SotwSubscriptionState::handleEstablishmentFailure() { 99 39 : BaseSubscriptionState::handleEstablishmentFailure(); 100 : 101 39 : if (previously_fetched_data_ || !xds_resources_delegate_.has_value()) { 102 39 : return; 103 39 : } 104 : 105 0 : const XdsConfigSourceId source_id{target_xds_authority_, type_url_}; 106 0 : TRY_ASSERT_MAIN_THREAD { 107 0 : std::vector<envoy::service::discovery::v3::Resource> resources = 108 0 : xds_resources_delegate_->getResources(source_id, names_tracked_); 109 : 110 0 : std::vector<DecodedResourcePtr> decoded_resources; 111 0 : const auto scoped_update = ttl_.scopedTtlUpdate(); 112 0 : std::string version_info; 113 0 : size_t unaccounted = names_tracked_.size(); 114 0 : if (names_tracked_.size() == 1 && names_tracked_.contains(Envoy::Config::Wildcard)) { 115 : // For wildcard requests, there are no expectations for the number of resources returned. 116 0 : unaccounted = 0; 117 0 : } 118 : 119 0 : for (const auto& resource : resources) { 120 0 : if (version_info.empty()) { 121 0 : version_info = resource.version(); 122 0 : } else { 123 0 : ASSERT(version_info == resource.version()); 124 0 : } 125 : 126 0 : TRY_ASSERT_MAIN_THREAD { 127 0 : auto decoded_resource = DecodedResourceImpl::fromResource(*resource_decoder_, resource); 128 0 : setResourceTtl(*decoded_resource); 129 0 : if (unaccounted > 0) { 130 0 : --unaccounted; 131 0 : } 132 0 : decoded_resources.emplace_back(std::move(decoded_resource)); 133 0 : } 134 0 : END_TRY 135 0 : catch (const EnvoyException& e) { 136 0 : xds_resources_delegate_->onResourceLoadFailed(source_id, resource.name(), e); 137 0 : } 138 0 : } 139 : 140 0 : callbacks().onConfigUpdate(decoded_resources, version_info); 141 0 : previously_fetched_data_ = true; 142 0 : if (unaccounted == 0 && !version_info.empty()) { 143 : // All the requested resources were found and validated from the xDS delegate, so set the last 144 : // known good version. 145 0 : last_good_version_info_ = version_info; 146 0 : } 147 0 : } 148 0 : END_TRY 149 0 : catch (const EnvoyException& e) { 150 : // TODO(abeyad): do something more than just logging the error? 151 0 : ENVOY_LOG(warn, "xDS delegate failed onEstablishmentFailure() for {}: {}", source_id.toKey(), 152 0 : e.what()); 153 0 : } 154 0 : } 155 : 156 : std::unique_ptr<envoy::service::discovery::v3::DiscoveryRequest> 157 140 : SotwSubscriptionState::getNextRequestInternal() { 158 140 : auto request = std::make_unique<envoy::service::discovery::v3::DiscoveryRequest>(); 159 140 : request->set_type_url(typeUrl()); 160 140 : std::copy(names_tracked_.begin(), names_tracked_.end(), 161 140 : Protobuf::RepeatedFieldBackInserter(request->mutable_resource_names())); 162 140 : if (last_good_version_info_.has_value()) { 163 101 : request->set_version_info(last_good_version_info_.value()); 164 101 : } 165 : // Default response_nonce to the last known good one. If we are being called by 166 : // getNextRequestWithAck(), this value will be overwritten. 167 140 : if (last_good_nonce_.has_value()) { 168 101 : request->set_response_nonce(last_good_nonce_.value()); 169 101 : } 170 : 171 140 : update_pending_ = false; 172 140 : return request; 173 140 : } 174 : 175 0 : void SotwSubscriptionState::ttlExpiryCallback(const std::vector<std::string>& expired) { 176 0 : Protobuf::RepeatedPtrField<std::string> removed_resources; 177 0 : for (const auto& resource : expired) { 178 0 : removed_resources.Add(std::string(resource)); 179 0 : } 180 0 : callbacks().onConfigUpdate({}, removed_resources, ""); 181 0 : } 182 : 183 142 : void SotwSubscriptionState::setResourceTtl(const DecodedResourceImpl& decoded_resource) { 184 142 : if (decoded_resource.ttl()) { 185 0 : ttl_.add(std::chrono::milliseconds(*decoded_resource.ttl()), decoded_resource.name()); 186 142 : } else { 187 142 : ttl_.clear(decoded_resource.name()); 188 142 : } 189 142 : } 190 : 191 : bool SotwSubscriptionState::isHeartbeatResource(const DecodedResource& resource, 192 142 : const std::string& version) { 193 142 : return !resource.hasResource() && last_good_version_info_.has_value() && 194 142 : version == last_good_version_info_.value(); 195 142 : } 196 : 197 : } // namespace XdsMux 198 : } // namespace Config 199 : } // namespace Envoy