LCOV - code coverage report
Current view: top level - source/common/upstream - load_stats_reporter.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 44 193 22.8 %
Date: 2024-01-05 06:35:25 Functions: 8 14 57.1 %

          Line data    Source code
       1             : #include "source/common/upstream/load_stats_reporter.h"
       2             : 
       3             : #include "envoy/service/load_stats/v3/lrs.pb.h"
       4             : #include "envoy/stats/scope.h"
       5             : 
       6             : #include "source/common/protobuf/protobuf.h"
       7             : 
       8             : namespace Envoy {
       9             : namespace Upstream {
      10             : 
      11             : LoadStatsReporter::LoadStatsReporter(const LocalInfo::LocalInfo& local_info,
      12             :                                      ClusterManager& cluster_manager, Stats::Scope& scope,
      13             :                                      Grpc::RawAsyncClientPtr async_client,
      14             :                                      Event::Dispatcher& dispatcher)
      15             :     : cm_(cluster_manager), stats_{ALL_LOAD_REPORTER_STATS(
      16             :                                 POOL_COUNTER_PREFIX(scope, "load_reporter."))},
      17             :       async_client_(std::move(async_client)),
      18             :       service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
      19             :           "envoy.service.load_stats.v3.LoadReportingService.StreamLoadStats")),
      20           1 :       time_source_(dispatcher.timeSource()) {
      21           1 :   request_.mutable_node()->MergeFrom(local_info.node());
      22           1 :   request_.mutable_node()->add_client_features("envoy.lrs.supports_send_all_clusters");
      23           1 :   retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
      24           1 :   response_timer_ = dispatcher.createTimer([this]() -> void { sendLoadStatsRequest(); });
      25           1 :   establishNewStream();
      26           1 : }
      27             : 
      28           1 : void LoadStatsReporter::setRetryTimer() {
      29           1 :   retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS));
      30           1 : }
      31             : 
      32           1 : void LoadStatsReporter::establishNewStream() {
      33           1 :   ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString());
      34           1 :   stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
      35           1 :   if (stream_ == nullptr) {
      36           0 :     ENVOY_LOG(warn, "Unable to establish new stream");
      37           0 :     handleFailure();
      38           0 :     return;
      39           0 :   }
      40             : 
      41           1 :   request_.mutable_cluster_stats()->Clear();
      42           1 :   sendLoadStatsRequest();
      43           1 : }
      44             : 
      45           1 : void LoadStatsReporter::sendLoadStatsRequest() {
      46             :   // TODO(htuch): This sends load reports for only the set of clusters in clusters_, which
      47             :   // was initialized in startLoadReportPeriod() the last time we either sent a load report
      48             :   // or received a new LRS response (whichever happened more recently). The code in
      49             :   // startLoadReportPeriod() adds to clusters_ only those clusters that exist in the
      50             :   // ClusterManager at the moment when startLoadReportPeriod() runs. This means that if
      51             :   // a cluster is selected by the LRS server (either by being explicitly listed or by using
      52             :   // the send_all_clusters field), if that cluster was added to the ClusterManager since the
      53             :   // last time startLoadReportPeriod() was invoked, we will not report its load here. In
      54             :   // practice, this means that for any newly created cluster, we will always drop the data for
      55             :   // the initial load report period. This seems sub-optimal.
      56             :   //
      57             :   // One possible way to deal with this would be to get a notification whenever a new cluster is
      58             :   // added to the cluster manager. When we get the notification, we record the current time in
      59             :   // clusters_ as the start time for the load reporting window for that cluster.
      60           1 :   request_.mutable_cluster_stats()->Clear();
      61           1 :   auto all_clusters = cm_.clusters();
      62           1 :   for (const auto& cluster_name_and_timestamp : clusters_) {
      63           0 :     const std::string& cluster_name = cluster_name_and_timestamp.first;
      64           0 :     auto it = all_clusters.active_clusters_.find(cluster_name);
      65           0 :     if (it == all_clusters.active_clusters_.end()) {
      66           0 :       ENVOY_LOG(debug, "Cluster {} does not exist", cluster_name);
      67           0 :       continue;
      68           0 :     }
      69           0 :     auto& cluster = it->second.get();
      70           0 :     auto* cluster_stats = request_.add_cluster_stats();
      71           0 :     cluster_stats->set_cluster_name(cluster_name);
      72           0 :     if (const auto& name = cluster.info()->edsServiceName(); !name.empty()) {
      73           0 :       cluster_stats->set_cluster_service_name(name);
      74           0 :     }
      75           0 :     for (const HostSetPtr& host_set : cluster.prioritySet().hostSetsPerPriority()) {
      76           0 :       ENVOY_LOG(trace, "Load report locality count {}", host_set->hostsPerLocality().get().size());
      77           0 :       for (const HostVector& hosts : host_set->hostsPerLocality().get()) {
      78           0 :         ASSERT(!hosts.empty());
      79           0 :         uint64_t rq_success = 0;
      80           0 :         uint64_t rq_error = 0;
      81           0 :         uint64_t rq_active = 0;
      82           0 :         uint64_t rq_issued = 0;
      83           0 :         LoadMetricStats::StatMap load_metrics;
      84           0 :         for (const HostSharedPtr& host : hosts) {
      85           0 :           uint64_t host_rq_success = host->stats().rq_success_.latch();
      86           0 :           uint64_t host_rq_error = host->stats().rq_error_.latch();
      87           0 :           uint64_t host_rq_active = host->stats().rq_active_.value();
      88           0 :           uint64_t host_rq_issued = host->stats().rq_total_.latch();
      89           0 :           rq_success += host_rq_success;
      90           0 :           rq_error += host_rq_error;
      91           0 :           rq_active += host_rq_active;
      92           0 :           rq_issued += host_rq_issued;
      93           0 :           if (host_rq_success + host_rq_error + host_rq_active != 0) {
      94           0 :             const std::unique_ptr<LoadMetricStats::StatMap> latched_stats =
      95           0 :                 host->loadMetricStats().latch();
      96           0 :             if (latched_stats != nullptr) {
      97           0 :               for (const auto& metric : *latched_stats) {
      98           0 :                 const std::string& name = metric.first;
      99           0 :                 LoadMetricStats::Stat& stat = load_metrics[name];
     100           0 :                 stat.num_requests_with_metric += metric.second.num_requests_with_metric;
     101           0 :                 stat.total_metric_value += metric.second.total_metric_value;
     102           0 :               }
     103           0 :             }
     104           0 :           }
     105           0 :         }
     106           0 :         if (rq_success + rq_error + rq_active != 0) {
     107           0 :           auto* locality_stats = cluster_stats->add_upstream_locality_stats();
     108           0 :           locality_stats->mutable_locality()->MergeFrom(hosts[0]->locality());
     109           0 :           locality_stats->set_priority(host_set->priority());
     110           0 :           locality_stats->set_total_successful_requests(rq_success);
     111           0 :           locality_stats->set_total_error_requests(rq_error);
     112           0 :           locality_stats->set_total_requests_in_progress(rq_active);
     113           0 :           locality_stats->set_total_issued_requests(rq_issued);
     114           0 :           for (const auto& metric : load_metrics) {
     115           0 :             auto* load_metric_stats = locality_stats->add_load_metric_stats();
     116           0 :             load_metric_stats->set_metric_name(metric.first);
     117           0 :             load_metric_stats->set_num_requests_finished_with_metric(
     118           0 :                 metric.second.num_requests_with_metric);
     119           0 :             load_metric_stats->set_total_metric_value(metric.second.total_metric_value);
     120           0 :           }
     121           0 :         }
     122           0 :       }
     123           0 :     }
     124           0 :     cluster_stats->set_total_dropped_requests(
     125           0 :         cluster.info()->loadReportStats().upstream_rq_dropped_.latch());
     126           0 :     const uint64_t drop_overload_count =
     127           0 :         cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch();
     128           0 :     if (drop_overload_count > 0) {
     129           0 :       auto* dropped_request = cluster_stats->add_dropped_requests();
     130           0 :       dropped_request->set_category("drop_overload");
     131           0 :       dropped_request->set_dropped_count(drop_overload_count);
     132           0 :     }
     133             : 
     134           0 :     const auto now = time_source_.monotonicTime().time_since_epoch();
     135           0 :     const auto measured_interval = now - cluster_name_and_timestamp.second;
     136           0 :     cluster_stats->mutable_load_report_interval()->MergeFrom(
     137           0 :         Protobuf::util::TimeUtil::MicrosecondsToDuration(
     138           0 :             std::chrono::duration_cast<std::chrono::microseconds>(measured_interval).count()));
     139           0 :     clusters_[cluster_name] = now;
     140           0 :   }
     141             : 
     142           1 :   ENVOY_LOG(trace, "Sending LoadStatsRequest: {}", request_.DebugString());
     143           1 :   stream_->sendMessage(request_, false);
     144           1 :   stats_.responses_.inc();
     145             :   // When the connection is established, the message has not yet been read so we
     146             :   // will not have a load reporting period.
     147           1 :   if (message_) {
     148           0 :     startLoadReportPeriod();
     149           0 :   }
     150           1 : }
     151             : 
     152           1 : void LoadStatsReporter::handleFailure() {
     153           1 :   ENVOY_LOG(warn, "Load reporter stats stream/connection failure, will retry in {} ms.",
     154           1 :             RETRY_DELAY_MS);
     155           1 :   stats_.errors_.inc();
     156           1 :   setRetryTimer();
     157           1 : }
     158             : 
     159           1 : void LoadStatsReporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
     160           1 :   UNREFERENCED_PARAMETER(metadata);
     161           1 : }
     162             : 
     163           0 : void LoadStatsReporter::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) {
     164           0 :   UNREFERENCED_PARAMETER(metadata);
     165           0 : }
     166             : 
     167             : void LoadStatsReporter::onReceiveMessage(
     168           0 :     std::unique_ptr<envoy::service::load_stats::v3::LoadStatsResponse>&& message) {
     169           0 :   ENVOY_LOG(debug, "New load report epoch: {}", message->DebugString());
     170           0 :   message_ = std::move(message);
     171           0 :   startLoadReportPeriod();
     172           0 :   stats_.requests_.inc();
     173           0 : }
     174             : 
     175           0 : void LoadStatsReporter::startLoadReportPeriod() {
     176             :   // Once a cluster is tracked, we don't want to reset its stats between reports
     177             :   // to avoid racing between request/response.
     178             :   // TODO(htuch): They key here could be absl::string_view, but this causes
     179             :   // problems due to referencing of temporaries in the below loop with Google's
     180             :   // internal string type. Consider this optimization when the string types
     181             :   // converge.
     182           0 :   const ClusterManager::ClusterInfoMaps all_clusters = cm_.clusters();
     183           0 :   absl::node_hash_map<std::string, std::chrono::steady_clock::duration> existing_clusters;
     184           0 :   if (message_->send_all_clusters()) {
     185           0 :     for (const auto& p : all_clusters.active_clusters_) {
     186           0 :       const std::string& cluster_name = p.first;
     187           0 :       auto it = clusters_.find(cluster_name);
     188           0 :       if (it != clusters_.end()) {
     189           0 :         existing_clusters.emplace(cluster_name, it->second);
     190           0 :       }
     191           0 :     }
     192           0 :   } else {
     193           0 :     for (const std::string& cluster_name : message_->clusters()) {
     194           0 :       auto it = clusters_.find(cluster_name);
     195           0 :       if (it != clusters_.end()) {
     196           0 :         existing_clusters.emplace(cluster_name, it->second);
     197           0 :       }
     198           0 :     }
     199           0 :   }
     200           0 :   clusters_.clear();
     201             :   // Reset stats for all hosts in clusters we are tracking.
     202           0 :   auto handle_cluster_func = [this, &existing_clusters,
     203           0 :                               &all_clusters](const std::string& cluster_name) {
     204           0 :     auto existing_cluster_it = existing_clusters.find(cluster_name);
     205           0 :     clusters_.emplace(cluster_name, existing_cluster_it != existing_clusters.end()
     206           0 :                                         ? existing_cluster_it->second
     207           0 :                                         : time_source_.monotonicTime().time_since_epoch());
     208           0 :     auto it = all_clusters.active_clusters_.find(cluster_name);
     209           0 :     if (it == all_clusters.active_clusters_.end()) {
     210           0 :       return;
     211           0 :     }
     212             :     // Don't reset stats for existing tracked clusters.
     213           0 :     if (existing_cluster_it != existing_clusters.end()) {
     214           0 :       return;
     215           0 :     }
     216           0 :     auto& cluster = it->second.get();
     217           0 :     for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
     218           0 :       for (const auto& host : host_set->hosts()) {
     219           0 :         host->stats().rq_success_.latch();
     220           0 :         host->stats().rq_error_.latch();
     221           0 :         host->stats().rq_total_.latch();
     222           0 :       }
     223           0 :     }
     224           0 :     cluster.info()->loadReportStats().upstream_rq_dropped_.latch();
     225           0 :     cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch();
     226           0 :   };
     227           0 :   if (message_->send_all_clusters()) {
     228           0 :     for (const auto& p : all_clusters.active_clusters_) {
     229           0 :       const std::string& cluster_name = p.first;
     230           0 :       handle_cluster_func(cluster_name);
     231           0 :     }
     232           0 :   } else {
     233           0 :     for (const std::string& cluster_name : message_->clusters()) {
     234           0 :       handle_cluster_func(cluster_name);
     235           0 :     }
     236           0 :   }
     237           0 :   response_timer_->enableTimer(std::chrono::milliseconds(
     238           0 :       DurationUtil::durationToMilliseconds(message_->load_reporting_interval())));
     239           0 : }
     240             : 
     241           1 : void LoadStatsReporter::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) {
     242           1 :   UNREFERENCED_PARAMETER(metadata);
     243           1 : }
     244             : 
     245           1 : void LoadStatsReporter::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
     246           1 :   ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status, message);
     247           1 :   response_timer_->disableTimer();
     248           1 :   stream_ = nullptr;
     249           1 :   handleFailure();
     250           1 : }
     251             : 
     252             : } // namespace Upstream
     253             : } // namespace Envoy

Generated by: LCOV version 1.15