Line data Source code
1 : #include "source/extensions/config_subscription/rest/rest_api_fetcher.h" 2 : 3 : #include <chrono> 4 : #include <cstdint> 5 : #include <string> 6 : 7 : #include "source/common/common/enum_to_int.h" 8 : #include "source/common/config/utility.h" 9 : #include "source/common/http/message_impl.h" 10 : #include "source/common/http/utility.h" 11 : 12 : namespace Envoy { 13 : namespace Http { 14 : 15 : RestApiFetcher::RestApiFetcher(Upstream::ClusterManager& cm, const std::string& remote_cluster_name, 16 : Event::Dispatcher& dispatcher, Random::RandomGenerator& random, 17 : std::chrono::milliseconds refresh_interval, 18 : std::chrono::milliseconds request_timeout) 19 : : remote_cluster_name_(remote_cluster_name), cm_(cm), random_(random), 20 : refresh_interval_(refresh_interval), request_timeout_(request_timeout), 21 0 : refresh_timer_(dispatcher.createTimer([this]() -> void { refresh(); })) {} 22 : 23 0 : RestApiFetcher::~RestApiFetcher() { 24 0 : if (active_request_) { 25 0 : active_request_->cancel(); 26 0 : } 27 0 : } 28 : 29 0 : void RestApiFetcher::initialize() { refresh(); } 30 : 31 : void RestApiFetcher::onSuccess(const Http::AsyncClient::Request& request, 32 0 : Http::ResponseMessagePtr&& response) { 33 0 : uint64_t response_code = Http::Utility::getResponseStatus(response->headers()); 34 0 : if (response_code == enumToInt(Http::Code::NotModified)) { 35 0 : requestComplete(); 36 0 : return; 37 0 : } else if (response_code != enumToInt(Http::Code::OK)) { 38 0 : onFailure(request, Http::AsyncClient::FailureReason::Reset); 39 0 : return; 40 0 : } 41 : 42 0 : TRY_ASSERT_MAIN_THREAD { parseResponse(*response); } 43 0 : END_TRY 44 0 : catch (EnvoyException& e) { 45 0 : onFetchFailure(Config::ConfigUpdateFailureReason::UpdateRejected, &e); 46 0 : } 47 : 48 0 : requestComplete(); 49 0 : } 50 : 51 : void RestApiFetcher::onFailure(const Http::AsyncClient::Request&, 52 0 : Http::AsyncClient::FailureReason reason) { 53 : // Currently Http::AsyncClient::FailureReason only has one value: "Reset". 54 0 : ASSERT(reason == Http::AsyncClient::FailureReason::Reset); 55 0 : onFetchFailure(Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr); 56 0 : requestComplete(); 57 0 : } 58 : 59 0 : void RestApiFetcher::refresh() { 60 0 : RequestMessagePtr message(new RequestMessageImpl()); 61 0 : createRequest(*message); 62 0 : message->headers().setHost(remote_cluster_name_); 63 0 : const auto thread_local_cluster = cm_.getThreadLocalCluster(remote_cluster_name_); 64 0 : if (thread_local_cluster != nullptr) { 65 0 : active_request_ = thread_local_cluster->httpAsyncClient().send( 66 0 : std::move(message), *this, AsyncClient::RequestOptions().setTimeout(request_timeout_)); 67 0 : } else { 68 0 : onFetchFailure(Config::ConfigUpdateFailureReason::ConnectionFailure, nullptr); 69 0 : requestComplete(); 70 0 : } 71 0 : } 72 : 73 0 : void RestApiFetcher::requestComplete() { 74 0 : onFetchComplete(); 75 0 : active_request_ = nullptr; 76 : 77 : // Add refresh jitter based on the configured interval. 78 0 : std::chrono::milliseconds final_delay = 79 0 : refresh_interval_ + std::chrono::milliseconds(random_.random() % refresh_interval_.count()); 80 : 81 0 : refresh_timer_->enableTimer(final_delay); 82 0 : } 83 : 84 : } // namespace Http 85 : } // namespace Envoy