Line data Source code
1 : #include "source/common/upstream/load_stats_reporter.h" 2 : 3 : #include "envoy/service/load_stats/v3/lrs.pb.h" 4 : #include "envoy/stats/scope.h" 5 : 6 : #include "source/common/protobuf/protobuf.h" 7 : 8 : namespace Envoy { 9 : namespace Upstream { 10 : 11 : LoadStatsReporter::LoadStatsReporter(const LocalInfo::LocalInfo& local_info, 12 : ClusterManager& cluster_manager, Stats::Scope& scope, 13 : Grpc::RawAsyncClientPtr async_client, 14 : Event::Dispatcher& dispatcher) 15 : : cm_(cluster_manager), stats_{ALL_LOAD_REPORTER_STATS( 16 : POOL_COUNTER_PREFIX(scope, "load_reporter."))}, 17 : async_client_(std::move(async_client)), 18 : service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( 19 : "envoy.service.load_stats.v3.LoadReportingService.StreamLoadStats")), 20 1 : time_source_(dispatcher.timeSource()) { 21 1 : request_.mutable_node()->MergeFrom(local_info.node()); 22 1 : request_.mutable_node()->add_client_features("envoy.lrs.supports_send_all_clusters"); 23 1 : retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); 24 1 : response_timer_ = dispatcher.createTimer([this]() -> void { sendLoadStatsRequest(); }); 25 1 : establishNewStream(); 26 1 : } 27 : 28 1 : void LoadStatsReporter::setRetryTimer() { 29 1 : retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS)); 30 1 : } 31 : 32 1 : void LoadStatsReporter::establishNewStream() { 33 1 : ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString()); 34 1 : stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions()); 35 1 : if (stream_ == nullptr) { 36 0 : ENVOY_LOG(warn, "Unable to establish new stream"); 37 0 : handleFailure(); 38 0 : return; 39 0 : } 40 : 41 1 : request_.mutable_cluster_stats()->Clear(); 42 1 : sendLoadStatsRequest(); 43 1 : } 44 : 45 1 : void LoadStatsReporter::sendLoadStatsRequest() { 46 : // TODO(htuch): This sends load reports for only the set of clusters in clusters_, which 47 : // was initialized in startLoadReportPeriod() the last time we either sent a load report 48 : // or received a new LRS response (whichever happened more recently). The code in 49 : // startLoadReportPeriod() adds to clusters_ only those clusters that exist in the 50 : // ClusterManager at the moment when startLoadReportPeriod() runs. This means that if 51 : // a cluster is selected by the LRS server (either by being explicitly listed or by using 52 : // the send_all_clusters field), if that cluster was added to the ClusterManager since the 53 : // last time startLoadReportPeriod() was invoked, we will not report its load here. In 54 : // practice, this means that for any newly created cluster, we will always drop the data for 55 : // the initial load report period. This seems sub-optimal. 56 : // 57 : // One possible way to deal with this would be to get a notification whenever a new cluster is 58 : // added to the cluster manager. When we get the notification, we record the current time in 59 : // clusters_ as the start time for the load reporting window for that cluster. 60 1 : request_.mutable_cluster_stats()->Clear(); 61 1 : auto all_clusters = cm_.clusters(); 62 1 : for (const auto& cluster_name_and_timestamp : clusters_) { 63 0 : const std::string& cluster_name = cluster_name_and_timestamp.first; 64 0 : auto it = all_clusters.active_clusters_.find(cluster_name); 65 0 : if (it == all_clusters.active_clusters_.end()) { 66 0 : ENVOY_LOG(debug, "Cluster {} does not exist", cluster_name); 67 0 : continue; 68 0 : } 69 0 : auto& cluster = it->second.get(); 70 0 : auto* cluster_stats = request_.add_cluster_stats(); 71 0 : cluster_stats->set_cluster_name(cluster_name); 72 0 : if (const auto& name = cluster.info()->edsServiceName(); !name.empty()) { 73 0 : cluster_stats->set_cluster_service_name(name); 74 0 : } 75 0 : for (const HostSetPtr& host_set : cluster.prioritySet().hostSetsPerPriority()) { 76 0 : ENVOY_LOG(trace, "Load report locality count {}", host_set->hostsPerLocality().get().size()); 77 0 : for (const HostVector& hosts : host_set->hostsPerLocality().get()) { 78 0 : ASSERT(!hosts.empty()); 79 0 : uint64_t rq_success = 0; 80 0 : uint64_t rq_error = 0; 81 0 : uint64_t rq_active = 0; 82 0 : uint64_t rq_issued = 0; 83 0 : LoadMetricStats::StatMap load_metrics; 84 0 : for (const HostSharedPtr& host : hosts) { 85 0 : uint64_t host_rq_success = host->stats().rq_success_.latch(); 86 0 : uint64_t host_rq_error = host->stats().rq_error_.latch(); 87 0 : uint64_t host_rq_active = host->stats().rq_active_.value(); 88 0 : uint64_t host_rq_issued = host->stats().rq_total_.latch(); 89 0 : rq_success += host_rq_success; 90 0 : rq_error += host_rq_error; 91 0 : rq_active += host_rq_active; 92 0 : rq_issued += host_rq_issued; 93 0 : if (host_rq_success + host_rq_error + host_rq_active != 0) { 94 0 : const std::unique_ptr<LoadMetricStats::StatMap> latched_stats = 95 0 : host->loadMetricStats().latch(); 96 0 : if (latched_stats != nullptr) { 97 0 : for (const auto& metric : *latched_stats) { 98 0 : const std::string& name = metric.first; 99 0 : LoadMetricStats::Stat& stat = load_metrics[name]; 100 0 : stat.num_requests_with_metric += metric.second.num_requests_with_metric; 101 0 : stat.total_metric_value += metric.second.total_metric_value; 102 0 : } 103 0 : } 104 0 : } 105 0 : } 106 0 : if (rq_success + rq_error + rq_active != 0) { 107 0 : auto* locality_stats = cluster_stats->add_upstream_locality_stats(); 108 0 : locality_stats->mutable_locality()->MergeFrom(hosts[0]->locality()); 109 0 : locality_stats->set_priority(host_set->priority()); 110 0 : locality_stats->set_total_successful_requests(rq_success); 111 0 : locality_stats->set_total_error_requests(rq_error); 112 0 : locality_stats->set_total_requests_in_progress(rq_active); 113 0 : locality_stats->set_total_issued_requests(rq_issued); 114 0 : for (const auto& metric : load_metrics) { 115 0 : auto* load_metric_stats = locality_stats->add_load_metric_stats(); 116 0 : load_metric_stats->set_metric_name(metric.first); 117 0 : load_metric_stats->set_num_requests_finished_with_metric( 118 0 : metric.second.num_requests_with_metric); 119 0 : load_metric_stats->set_total_metric_value(metric.second.total_metric_value); 120 0 : } 121 0 : } 122 0 : } 123 0 : } 124 0 : cluster_stats->set_total_dropped_requests( 125 0 : cluster.info()->loadReportStats().upstream_rq_dropped_.latch()); 126 0 : const uint64_t drop_overload_count = 127 0 : cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch(); 128 0 : if (drop_overload_count > 0) { 129 0 : auto* dropped_request = cluster_stats->add_dropped_requests(); 130 0 : dropped_request->set_category("drop_overload"); 131 0 : dropped_request->set_dropped_count(drop_overload_count); 132 0 : } 133 : 134 0 : const auto now = time_source_.monotonicTime().time_since_epoch(); 135 0 : const auto measured_interval = now - cluster_name_and_timestamp.second; 136 0 : cluster_stats->mutable_load_report_interval()->MergeFrom( 137 0 : Protobuf::util::TimeUtil::MicrosecondsToDuration( 138 0 : std::chrono::duration_cast<std::chrono::microseconds>(measured_interval).count())); 139 0 : clusters_[cluster_name] = now; 140 0 : } 141 : 142 1 : ENVOY_LOG(trace, "Sending LoadStatsRequest: {}", request_.DebugString()); 143 1 : stream_->sendMessage(request_, false); 144 1 : stats_.responses_.inc(); 145 : // When the connection is established, the message has not yet been read so we 146 : // will not have a load reporting period. 147 1 : if (message_) { 148 0 : startLoadReportPeriod(); 149 0 : } 150 1 : } 151 : 152 1 : void LoadStatsReporter::handleFailure() { 153 1 : ENVOY_LOG(warn, "Load reporter stats stream/connection failure, will retry in {} ms.", 154 1 : RETRY_DELAY_MS); 155 1 : stats_.errors_.inc(); 156 1 : setRetryTimer(); 157 1 : } 158 : 159 1 : void LoadStatsReporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) { 160 1 : UNREFERENCED_PARAMETER(metadata); 161 1 : } 162 : 163 0 : void LoadStatsReporter::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) { 164 0 : UNREFERENCED_PARAMETER(metadata); 165 0 : } 166 : 167 : void LoadStatsReporter::onReceiveMessage( 168 0 : std::unique_ptr<envoy::service::load_stats::v3::LoadStatsResponse>&& message) { 169 0 : ENVOY_LOG(debug, "New load report epoch: {}", message->DebugString()); 170 0 : message_ = std::move(message); 171 0 : startLoadReportPeriod(); 172 0 : stats_.requests_.inc(); 173 0 : } 174 : 175 0 : void LoadStatsReporter::startLoadReportPeriod() { 176 : // Once a cluster is tracked, we don't want to reset its stats between reports 177 : // to avoid racing between request/response. 178 : // TODO(htuch): They key here could be absl::string_view, but this causes 179 : // problems due to referencing of temporaries in the below loop with Google's 180 : // internal string type. Consider this optimization when the string types 181 : // converge. 182 0 : const ClusterManager::ClusterInfoMaps all_clusters = cm_.clusters(); 183 0 : absl::node_hash_map<std::string, std::chrono::steady_clock::duration> existing_clusters; 184 0 : if (message_->send_all_clusters()) { 185 0 : for (const auto& p : all_clusters.active_clusters_) { 186 0 : const std::string& cluster_name = p.first; 187 0 : auto it = clusters_.find(cluster_name); 188 0 : if (it != clusters_.end()) { 189 0 : existing_clusters.emplace(cluster_name, it->second); 190 0 : } 191 0 : } 192 0 : } else { 193 0 : for (const std::string& cluster_name : message_->clusters()) { 194 0 : auto it = clusters_.find(cluster_name); 195 0 : if (it != clusters_.end()) { 196 0 : existing_clusters.emplace(cluster_name, it->second); 197 0 : } 198 0 : } 199 0 : } 200 0 : clusters_.clear(); 201 : // Reset stats for all hosts in clusters we are tracking. 202 0 : auto handle_cluster_func = [this, &existing_clusters, 203 0 : &all_clusters](const std::string& cluster_name) { 204 0 : auto existing_cluster_it = existing_clusters.find(cluster_name); 205 0 : clusters_.emplace(cluster_name, existing_cluster_it != existing_clusters.end() 206 0 : ? existing_cluster_it->second 207 0 : : time_source_.monotonicTime().time_since_epoch()); 208 0 : auto it = all_clusters.active_clusters_.find(cluster_name); 209 0 : if (it == all_clusters.active_clusters_.end()) { 210 0 : return; 211 0 : } 212 : // Don't reset stats for existing tracked clusters. 213 0 : if (existing_cluster_it != existing_clusters.end()) { 214 0 : return; 215 0 : } 216 0 : auto& cluster = it->second.get(); 217 0 : for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) { 218 0 : for (const auto& host : host_set->hosts()) { 219 0 : host->stats().rq_success_.latch(); 220 0 : host->stats().rq_error_.latch(); 221 0 : host->stats().rq_total_.latch(); 222 0 : } 223 0 : } 224 0 : cluster.info()->loadReportStats().upstream_rq_dropped_.latch(); 225 0 : cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch(); 226 0 : }; 227 0 : if (message_->send_all_clusters()) { 228 0 : for (const auto& p : all_clusters.active_clusters_) { 229 0 : const std::string& cluster_name = p.first; 230 0 : handle_cluster_func(cluster_name); 231 0 : } 232 0 : } else { 233 0 : for (const std::string& cluster_name : message_->clusters()) { 234 0 : handle_cluster_func(cluster_name); 235 0 : } 236 0 : } 237 0 : response_timer_->enableTimer(std::chrono::milliseconds( 238 0 : DurationUtil::durationToMilliseconds(message_->load_reporting_interval()))); 239 0 : } 240 : 241 1 : void LoadStatsReporter::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) { 242 1 : UNREFERENCED_PARAMETER(metadata); 243 1 : } 244 : 245 1 : void LoadStatsReporter::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { 246 1 : ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status, message); 247 1 : response_timer_->disableTimer(); 248 1 : stream_ = nullptr; 249 1 : handleFailure(); 250 1 : } 251 : 252 : } // namespace Upstream 253 : } // namespace Envoy