Line data Source code
1 : #include "source/extensions/config_subscription/rest/http_subscription_impl.h" 2 : 3 : #include <memory> 4 : 5 : #include "envoy/service/discovery/v3/discovery.pb.h" 6 : 7 : #include "source/common/buffer/buffer_impl.h" 8 : #include "source/common/common/assert.h" 9 : #include "source/common/common/macros.h" 10 : #include "source/common/common/utility.h" 11 : #include "source/common/config/decoded_resource_impl.h" 12 : #include "source/common/config/utility.h" 13 : #include "source/common/http/headers.h" 14 : #include "source/common/protobuf/protobuf.h" 15 : #include "source/common/protobuf/utility.h" 16 : 17 : #include "google/api/annotations.pb.h" 18 : 19 : namespace Envoy { 20 : namespace Config { 21 : 22 : HttpSubscriptionImpl::HttpSubscriptionImpl( 23 : const LocalInfo::LocalInfo& local_info, Upstream::ClusterManager& cm, 24 : const std::string& remote_cluster_name, Event::Dispatcher& dispatcher, 25 : Random::RandomGenerator& random, std::chrono::milliseconds refresh_interval, 26 : std::chrono::milliseconds request_timeout, const Protobuf::MethodDescriptor& service_method, 27 : absl::string_view type_url, SubscriptionCallbacks& callbacks, 28 : OpaqueResourceDecoderSharedPtr resource_decoder, SubscriptionStats stats, 29 : std::chrono::milliseconds init_fetch_timeout, 30 : ProtobufMessage::ValidationVisitor& validation_visitor) 31 : : Http::RestApiFetcher(cm, remote_cluster_name, dispatcher, random, refresh_interval, 32 : request_timeout), 33 : callbacks_(callbacks), resource_decoder_(resource_decoder), stats_(stats), 34 : dispatcher_(dispatcher), init_fetch_timeout_(init_fetch_timeout), 35 0 : validation_visitor_(validation_visitor) { 36 0 : request_.mutable_node()->CopyFrom(local_info.node()); 37 0 : request_.set_type_url(std::string(type_url)); 38 0 : ASSERT(service_method.options().HasExtension(google::api::http)); 39 0 : const auto& http_rule = service_method.options().GetExtension(google::api::http); 40 0 : path_ = http_rule.post(); 41 0 : ASSERT(http_rule.body() == "*"); 42 0 : } 43 : 44 : // Config::Subscription 45 0 : void HttpSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) { 46 0 : if (init_fetch_timeout_.count() > 0) { 47 0 : init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void { 48 0 : handleFailure(Config::ConfigUpdateFailureReason::FetchTimedout, nullptr); 49 0 : }); 50 0 : init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_); 51 0 : } 52 : 53 0 : Protobuf::RepeatedPtrField<std::string> resources_vector(resource_names.begin(), 54 0 : resource_names.end()); 55 : // Sort to provide stable wire ordering. 56 0 : std::sort(resources_vector.begin(), resources_vector.end()); 57 0 : request_.mutable_resource_names()->Swap(&resources_vector); 58 0 : initialize(); 59 0 : } 60 : 61 : void HttpSubscriptionImpl::updateResourceInterest( 62 0 : const absl::flat_hash_set<std::string>& update_to_these_names) { 63 0 : Protobuf::RepeatedPtrField<std::string> resources_vector(update_to_these_names.begin(), 64 0 : update_to_these_names.end()); 65 : // Sort to provide stable wire ordering. 66 0 : std::sort(resources_vector.begin(), resources_vector.end()); 67 0 : request_.mutable_resource_names()->Swap(&resources_vector); 68 0 : } 69 : 70 : // Http::RestApiFetcher 71 0 : void HttpSubscriptionImpl::createRequest(Http::RequestMessage& request) { 72 0 : ENVOY_LOG(debug, "Sending REST request for {}", path_); 73 0 : stats_.update_attempt_.inc(); 74 0 : request.headers().setReferenceMethod(Http::Headers::get().MethodValues.Post); 75 0 : request.headers().setPath(path_); 76 0 : request.body().add(MessageUtil::getJsonStringFromMessageOrError(request_)); 77 0 : request.headers().setReferenceContentType(Http::Headers::get().ContentTypeValues.Json); 78 0 : request.headers().setContentLength(request.body().length()); 79 0 : } 80 : 81 0 : void HttpSubscriptionImpl::parseResponse(const Http::ResponseMessage& response) { 82 0 : disableInitFetchTimeoutTimer(); 83 0 : envoy::service::discovery::v3::DiscoveryResponse message; 84 0 : TRY_ASSERT_MAIN_THREAD { 85 0 : MessageUtil::loadFromJson(response.bodyAsString(), message, validation_visitor_); 86 0 : } 87 0 : END_TRY 88 0 : catch (const EnvoyException& e) { 89 0 : handleFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e); 90 0 : return; 91 0 : } 92 0 : TRY_ASSERT_MAIN_THREAD { 93 0 : const auto decoded_resources = 94 0 : DecodedResourcesWrapper(*resource_decoder_, message.resources(), message.version_info()); 95 0 : THROW_IF_NOT_OK(callbacks_.onConfigUpdate(decoded_resources.refvec_, message.version_info())); 96 0 : request_.set_version_info(message.version_info()); 97 0 : stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource())); 98 0 : stats_.version_.set(HashUtil::xxHash64(request_.version_info())); 99 0 : stats_.version_text_.set(request_.version_info()); 100 0 : stats_.update_success_.inc(); 101 0 : } 102 0 : END_TRY 103 0 : catch (const EnvoyException& e) { 104 0 : handleFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e); 105 0 : } 106 0 : } 107 : 108 0 : void HttpSubscriptionImpl::onFetchComplete() {} 109 : 110 : void HttpSubscriptionImpl::onFetchFailure(Config::ConfigUpdateFailureReason reason, 111 0 : const EnvoyException* e) { 112 0 : handleFailure(reason, e); 113 0 : } 114 : 115 : void HttpSubscriptionImpl::handleFailure(Config::ConfigUpdateFailureReason reason, 116 0 : const EnvoyException* e) { 117 : 118 0 : switch (reason) { 119 0 : case Config::ConfigUpdateFailureReason::ConnectionFailure: 120 0 : ENVOY_LOG(warn, "REST update for {} failed", path_); 121 0 : stats_.update_failure_.inc(); 122 0 : break; 123 0 : case Config::ConfigUpdateFailureReason::FetchTimedout: 124 0 : ENVOY_LOG(warn, "REST config: initial fetch timeout for {}", path_); 125 0 : stats_.init_fetch_timeout_.inc(); 126 0 : disableInitFetchTimeoutTimer(); 127 0 : break; 128 0 : case Config::ConfigUpdateFailureReason::UpdateRejected: 129 0 : ASSERT(e != nullptr); 130 0 : ENVOY_LOG(warn, "REST config for {} rejected: {}", path_, e->what()); 131 0 : stats_.update_rejected_.inc(); 132 0 : disableInitFetchTimeoutTimer(); 133 0 : break; 134 0 : } 135 : 136 0 : if (reason == Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure) { 137 : // New requests will be sent again. 138 : // If init_fetch_timeout is non-zero, server will continue startup after it timeout 139 0 : return; 140 0 : } 141 : 142 0 : callbacks_.onConfigUpdateFailed(reason, e); 143 0 : } 144 : 145 0 : void HttpSubscriptionImpl::disableInitFetchTimeoutTimer() { 146 0 : if (init_fetch_timeout_timer_) { 147 0 : init_fetch_timeout_timer_->disableTimer(); 148 0 : init_fetch_timeout_timer_.reset(); 149 0 : } 150 0 : } 151 : 152 : REGISTER_FACTORY(HttpSubscriptionFactory, ConfigSubscriptionFactory); 153 : 154 : } // namespace Config 155 : } // namespace Envoy