1
#include "source/common/upstream/load_stats_reporter_impl.h"
2

            
3
#include "envoy/service/load_stats/v3/lrs.pb.h"
4
#include "envoy/stats/scope.h"
5

            
6
#include "source/common/network/utility.h"
7
#include "source/common/protobuf/protobuf.h"
8

            
9
namespace Envoy {
10
namespace Upstream {
11

            
12
namespace {
13

            
14
envoy::service::load_stats::v3::LoadStatsRequest
15
39
MakeRequestTemplate(const LocalInfo::LocalInfo& local_info) {
16
39
  envoy::service::load_stats::v3::LoadStatsRequest request;
17
39
  request.mutable_node()->MergeFrom(local_info.node());
18
39
  request.mutable_node()->add_client_features("envoy.lrs.supports_send_all_clusters");
19
39
  return request;
20
39
}
21

            
22
} // namespace
23

            
24
LoadStatsReporterImpl::LoadStatsReporterImpl(const LocalInfo::LocalInfo& local_info,
25
                                             ClusterManager& cluster_manager, Stats::Scope& scope,
26
                                             Grpc::RawAsyncClientSharedPtr async_client,
27
                                             Event::Dispatcher& dispatcher)
28
39
    : cm_(cluster_manager),
29
39
      stats_{ALL_LOAD_REPORTER_STATS(POOL_COUNTER_PREFIX(scope, "load_reporter."))},
30
39
      async_client_(std::move(async_client)),
31
39
      service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
32
39
          "envoy.service.load_stats.v3.LoadReportingService.StreamLoadStats")),
33
39
      request_template_(MakeRequestTemplate(local_info)), time_source_(dispatcher.timeSource()) {
34
39
  retry_timer_ = dispatcher.createTimer([this]() -> void {
35
3
    stats_.retries_.inc();
36
3
    establishNewStream();
37
3
  });
38
65
  response_timer_ = dispatcher.createTimer([this]() -> void { sendLoadStatsRequest(); });
39
39
  establishNewStream();
40
39
}
41

            
42
39
LoadStatsReporterImpl::~LoadStatsReporterImpl() {
43
  // Disable the timer.
44
39
  ENVOY_LOG_MISC(info, "Destroying LoadStatsReporterImpl");
45
39
  retry_timer_->disableTimer();
46
39
  response_timer_->disableTimer();
47
39
  if (stream_ != nullptr) {
48
9
    stream_->resetStream();
49
9
    stream_ = nullptr;
50
9
  }
51
39
}
52

            
53
33
void LoadStatsReporterImpl::setRetryTimer() {
54
33
  ENVOY_LOG(info, "Load reporter stats stream/connection will retry in {} ms.", RETRY_DELAY_MS);
55
33
  retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS));
56
33
}
57

            
58
42
void LoadStatsReporterImpl::establishNewStream() {
59
42
  ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString());
60
42
  stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
61
42
  if (stream_ == nullptr) {
62
1
    ENVOY_LOG(warn, "Unable to establish new stream");
63
1
    handleFailure();
64
1
    return;
65
1
  }
66

            
67
41
  sendLoadStatsRequest();
68
41
}
69

            
70
102
void LoadStatsReporterImpl::sendLoadStatsRequest() {
71
  // TODO(htuch): This sends load reports for only the set of clusters in clusters_, which
72
  // was initialized in startLoadReportPeriod() the last time we either sent a load report
73
  // or received a new LRS response (whichever happened more recently). The code in
74
  // startLoadReportPeriod() adds to clusters_ only those clusters that exist in the
75
  // ClusterManager at the moment when startLoadReportPeriod() runs. This means that if
76
  // a cluster is selected by the LRS server (either by being explicitly listed or by using
77
  // the send_all_clusters field), if that cluster was added to the ClusterManager since the
78
  // last time startLoadReportPeriod() was invoked, we will not report its load here. In
79
  // practice, this means that for any newly created cluster, we will always drop the data for
80
  // the initial load report period. This seems sub-optimal.
81
  //
82
  // One possible way to deal with this would be to get a notification whenever a new cluster is
83
  // added to the cluster manager. When we get the notification, we record the current time in
84
  // clusters_ as the start time for the load reporting window for that cluster.
85
102
  Envoy::Protobuf::Arena arena;
86
102
  auto* request =
87
102
      Envoy::Protobuf::Arena::Create<envoy::service::load_stats::v3::LoadStatsRequest>(&arena);
88
102
  request->MergeFrom(request_template_);
89
102
  for (const auto& cluster_name_and_timestamp : clusters_) {
90
77
    const std::string& cluster_name = cluster_name_and_timestamp.first;
91
77
    OptRef<const Upstream::Cluster> active_cluster = cm_.getActiveCluster(cluster_name);
92
77
    if (!active_cluster.has_value()) {
93
12
      ENVOY_LOG(debug, "Cluster {} does not exist", cluster_name);
94
12
      continue;
95
12
    }
96
65
    const Upstream::Cluster& cluster = active_cluster.value();
97
65
    auto* cluster_stats = request->add_cluster_stats();
98
65
    cluster_stats->set_cluster_name(cluster_name);
99
65
    if (const auto& name = cluster.info()->edsServiceName(); !name.empty()) {
100
58
      cluster_stats->set_cluster_service_name(name);
101
58
    }
102
115
    for (const HostSetPtr& host_set : cluster.prioritySet().hostSetsPerPriority()) {
103
115
      ENVOY_LOG(trace, "Load report locality count {}", host_set->hostsPerLocality().get().size());
104
115
      for (const HostVector& hosts : host_set->hostsPerLocality().get()) {
105
93
        ASSERT(!hosts.empty());
106
93
        uint64_t rq_success = 0;
107
93
        uint64_t rq_error = 0;
108
93
        uint64_t rq_active = 0;
109
93
        uint64_t rq_issued = 0;
110
93
        LoadMetricStats::StatMap load_metrics;
111

            
112
93
        envoy::config::endpoint::v3::UpstreamLocalityStats locality_stats;
113
93
        locality_stats.mutable_locality()->MergeFrom(hosts[0]->locality());
114
93
        locality_stats.set_priority(host_set->priority());
115

            
116
103
        for (const HostSharedPtr& host : hosts) {
117
103
          uint64_t host_rq_success = host->stats().rq_success_.latch();
118
103
          uint64_t host_rq_error = host->stats().rq_error_.latch();
119
103
          uint64_t host_rq_active = host->stats().rq_active_.value();
120
103
          uint64_t host_rq_issued = host->stats().rq_total_.latch();
121

            
122
          // Check if the host has any load stats updates. If the host has no load stats updates, we
123
          // skip it.
124
103
          bool endpoint_has_updates =
125
103
              (host_rq_success + host_rq_error + host_rq_active + host_rq_issued) != 0;
126

            
127
103
          if (endpoint_has_updates) {
128
84
            rq_success += host_rq_success;
129
84
            rq_error += host_rq_error;
130
84
            rq_active += host_rq_active;
131
84
            rq_issued += host_rq_issued;
132

            
133
84
            envoy::config::endpoint::v3::UpstreamEndpointStats* upstream_endpoint_stats = nullptr;
134
            // Set the upstream endpoint stats if we are reporting endpoint granularity.
135
84
            if (message_ && message_->report_endpoint_granularity()) {
136
5
              upstream_endpoint_stats = locality_stats.add_upstream_endpoint_stats();
137
5
              Network::Utility::addressToProtobufAddress(
138
5
                  *host->address(), *upstream_endpoint_stats->mutable_address());
139
5
              upstream_endpoint_stats->set_total_successful_requests(host_rq_success);
140
5
              upstream_endpoint_stats->set_total_error_requests(host_rq_error);
141
5
              upstream_endpoint_stats->set_total_requests_in_progress(host_rq_active);
142
5
              upstream_endpoint_stats->set_total_issued_requests(host_rq_issued);
143
5
            }
144

            
145
84
            const std::unique_ptr<LoadMetricStats::StatMap> latched_stats =
146
84
                host->loadMetricStats().latch();
147
84
            if (latched_stats != nullptr) {
148
49
              for (const auto& metric : *latched_stats) {
149
49
                const auto& metric_name = metric.first;
150
49
                const auto& metric_value = metric.second;
151

            
152
                // Add the metric to the load metrics map.
153
49
                LoadMetricStats::Stat& stat = load_metrics[metric_name];
154
49
                stat.num_requests_with_metric += metric_value.num_requests_with_metric;
155
49
                stat.total_metric_value += metric_value.total_metric_value;
156

            
157
                // If we are reporting endpoint granularity, add the metric to the upstream endpoint
158
                // stats.
159
49
                if (upstream_endpoint_stats != nullptr) {
160
3
                  auto* endpoint_load_metric = upstream_endpoint_stats->add_load_metric_stats();
161
3
                  endpoint_load_metric->set_metric_name(metric_name);
162
3
                  endpoint_load_metric->set_num_requests_finished_with_metric(
163
3
                      metric_value.num_requests_with_metric);
164
3
                  endpoint_load_metric->set_total_metric_value(metric_value.total_metric_value);
165
3
                }
166
49
              }
167
25
            }
168
84
          }
169
103
        }
170

            
171
93
        bool should_send_locality_stats = rq_issued != 0;
172
93
        if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features."
173
93
                                           "report_load_when_rq_active_is_non_zero")) {
174
          // If rq_active is non-zero, we should send the locality stats even if
175
          // rq_issued is zero (no new requests have been issued in this poll
176
          // window). This is needed to report long-lived connections/requests (e.g., when
177
          // web-sockets are used).
178
93
          should_send_locality_stats = should_send_locality_stats || (rq_active != 0);
179
93
        }
180

            
181
93
        if (should_send_locality_stats) {
182
72
          locality_stats.set_total_successful_requests(rq_success);
183
72
          locality_stats.set_total_error_requests(rq_error);
184
72
          locality_stats.set_total_requests_in_progress(rq_active);
185
72
          locality_stats.set_total_issued_requests(rq_issued);
186
76
          for (const auto& metric : load_metrics) {
187
42
            auto* load_metric_stats = locality_stats.add_load_metric_stats();
188
42
            load_metric_stats->set_metric_name(metric.first);
189
42
            load_metric_stats->set_num_requests_finished_with_metric(
190
42
                metric.second.num_requests_with_metric);
191
42
            load_metric_stats->set_total_metric_value(metric.second.total_metric_value);
192
42
          }
193
72
          cluster_stats->add_upstream_locality_stats()->MergeFrom(locality_stats);
194
72
        }
195
93
      }
196
115
    }
197
65
    cluster_stats->set_total_dropped_requests(
198
65
        cluster.info()->loadReportStats().upstream_rq_dropped_.latch());
199
65
    const uint64_t drop_overload_count =
200
65
        cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch();
201
65
    if (drop_overload_count > 0) {
202
7
      auto* dropped_request = cluster_stats->add_dropped_requests();
203
7
      dropped_request->set_category(cluster.dropCategory());
204
7
      dropped_request->set_dropped_count(drop_overload_count);
205
7
    }
206

            
207
65
    const auto now = time_source_.monotonicTime().time_since_epoch();
208
65
    const auto measured_interval = now - cluster_name_and_timestamp.second;
209
65
    cluster_stats->mutable_load_report_interval()->MergeFrom(
210
65
        Protobuf::util::TimeUtil::MicrosecondsToDuration(
211
65
            std::chrono::duration_cast<std::chrono::microseconds>(measured_interval).count()));
212
65
    clusters_[cluster_name] = now;
213
65
  }
214

            
215
102
  ENVOY_LOG(trace, "Sending LoadStatsRequest: {}", request->DebugString());
216
102
  stream_->sendMessage(*request, false);
217
102
  stats_.responses_.inc();
218
  // When the connection is established, the message has not yet been read so we will not have a
219
  // load reporting period.
220
102
  if (message_) {
221
61
    startLoadReportPeriod();
222
61
  }
223
102
}
224

            
225
32
void LoadStatsReporterImpl::handleFailure() {
226
32
  stats_.errors_.inc();
227
32
  setRetryTimer();
228
32
}
229

            
230
30
void LoadStatsReporterImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
231
30
  UNREFERENCED_PARAMETER(metadata);
232
30
}
233

            
234
30
void LoadStatsReporterImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) {
235
30
  UNREFERENCED_PARAMETER(metadata);
236
30
}
237

            
238
void LoadStatsReporterImpl::onReceiveMessage(
239
61
    std::unique_ptr<envoy::service::load_stats::v3::LoadStatsResponse>&& message) {
240
61
  ENVOY_LOG(debug, "New load report epoch: {}", message->DebugString());
241
61
  message_ = std::move(message);
242
61
  startLoadReportPeriod();
243
61
  stats_.requests_.inc();
244
61
}
245

            
246
122
void LoadStatsReporterImpl::startLoadReportPeriod() {
247
  // Once a cluster is tracked, we don't want to reset its stats between reports
248
  // to avoid racing between request/response.
249
  // TODO(htuch): They key here could be absl::string_view, but this causes
250
  // problems due to referencing of temporaries in the below loop with Google's
251
  // internal string type. Consider this optimization when the string types
252
  // converge.
253
122
  const ClusterManager::ClusterInfoMaps all_clusters = cm_.clusters();
254
122
  absl::node_hash_map<std::string, std::chrono::steady_clock::duration> existing_clusters;
255
122
  if (message_->send_all_clusters()) {
256
16
    for (const auto& p : all_clusters.active_clusters_) {
257
16
      const std::string& cluster_name = p.first;
258
16
      auto it = clusters_.find(cluster_name);
259
16
      if (it != clusters_.end()) {
260
12
        existing_clusters.emplace(cluster_name, it->second);
261
12
      }
262
16
    }
263
114
  } else {
264
136
    for (const std::string& cluster_name : message_->clusters()) {
265
136
      auto it = clusters_.find(cluster_name);
266
136
      if (it != clusters_.end()) {
267
89
        existing_clusters.emplace(cluster_name, it->second);
268
89
      }
269
136
    }
270
114
  }
271
122
  clusters_.clear();
272
  // Reset stats for all hosts in clusters we are tracking.
273
122
  auto handle_cluster_func = [this, &existing_clusters,
274
152
                              &all_clusters](const std::string& cluster_name) {
275
152
    auto existing_cluster_it = existing_clusters.find(cluster_name);
276
152
    clusters_.emplace(cluster_name, existing_cluster_it != existing_clusters.end()
277
152
                                        ? existing_cluster_it->second
278
152
                                        : time_source_.monotonicTime().time_since_epoch());
279
152
    auto it = all_clusters.active_clusters_.find(cluster_name);
280
152
    if (it == all_clusters.active_clusters_.end()) {
281
46
      return;
282
46
    }
283
    // Don't reset stats for existing tracked clusters.
284
106
    if (existing_cluster_it != existing_clusters.end()) {
285
74
      return;
286
74
    }
287
32
    auto& cluster = it->second.get();
288
60
    for (auto& host_set : cluster.prioritySet().hostSetsPerPriority()) {
289
60
      for (const auto& host : host_set->hosts()) {
290
14
        host->stats().rq_success_.latch();
291
14
        host->stats().rq_error_.latch();
292
14
        host->stats().rq_total_.latch();
293
14
      }
294
60
    }
295
32
    cluster.info()->loadReportStats().upstream_rq_dropped_.latch();
296
32
    cluster.info()->loadReportStats().upstream_rq_drop_overload_.latch();
297
32
  };
298
122
  if (message_->send_all_clusters()) {
299
16
    for (const auto& p : all_clusters.active_clusters_) {
300
16
      const std::string& cluster_name = p.first;
301
16
      handle_cluster_func(cluster_name);
302
16
    }
303
114
  } else {
304
136
    for (const std::string& cluster_name : message_->clusters()) {
305
136
      handle_cluster_func(cluster_name);
306
136
    }
307
114
  }
308
122
  response_timer_->enableTimer(std::chrono::milliseconds(
309
122
      DurationUtil::durationToMilliseconds(message_->load_reporting_interval())));
310
122
}
311

            
312
30
void LoadStatsReporterImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) {
313
30
  UNREFERENCED_PARAMETER(metadata);
314
30
}
315

            
316
void LoadStatsReporterImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
317
32
                                          const std::string& message) {
318
32
  response_timer_->disableTimer();
319
32
  stream_ = nullptr;
320
32
  if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
321
31
    ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status,
322
31
              message);
323
31
    handleFailure();
324
31
  } else {
325
1
    ENVOY_LOG(debug, "{} gRPC config stream closed gracefully, {}", service_method_.name(),
326
1
              message);
327
1
    setRetryTimer();
328
1
  }
329
32
}
330
} // namespace Upstream
331
} // namespace Envoy