1
#include "source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h"
2

            
3
#include <chrono>
4

            
5
#include "envoy/config/metrics/v3/metrics_service.pb.h"
6
#include "envoy/event/dispatcher.h"
7
#include "envoy/service/metrics/v3/metrics_service.pb.h"
8
#include "envoy/stats/histogram.h"
9
#include "envoy/stats/stats.h"
10
#include "envoy/upstream/cluster_manager.h"
11

            
12
#include "source/common/common/assert.h"
13
#include "source/common/common/utility.h"
14
#include "source/common/config/utility.h"
15

            
16
namespace Envoy {
17
namespace Extensions {
18
namespace StatSinks {
19
namespace MetricsService {
20

            
21
GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl(Grpc::RawAsyncClientSharedPtr raw_async_client,
22
                                                 const LocalInfo::LocalInfo& local_info,
23
                                                 uint32_t batch_size)
24
15
    : GrpcMetricsStreamer<envoy::service::metrics::v3::StreamMetricsMessage,
25
15
                          envoy::service::metrics::v3::StreamMetricsResponse>(raw_async_client),
26
15
      local_info_(local_info),
27
15
      service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
28
15
          "envoy.service.metrics.v3.MetricsService.StreamMetrics")),
29
15
      batch_size_(batch_size) {}
30

            
31
12
void GrpcMetricsStreamerImpl::send(MetricsPtr&& metrics) {
32
12
  bool send_identifier = false;
33

            
34
12
  if (stream_ == nullptr) {
35
12
    ENVOY_LOG(debug, "Establishing new gRPC metrics service stream");
36
12
    stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
37

            
38
12
    if (stream_ == nullptr) {
39
1
      ENVOY_LOG(error,
40
1
                "unable to establish metrics service stream. Will retry in the next flush cycle");
41
1
      return;
42
1
    }
43
11
    send_identifier = true;
44
11
  }
45

            
46
  // If batch_size is 0 or not set, send all metrics in a single message (default behavior)
47
11
  if (batch_size_ == 0 || metrics->size() <= static_cast<int>(batch_size_)) {
48
9
    sendBatch(*metrics, 0, metrics->size(), send_identifier);
49
9
    return;
50
9
  }
51

            
52
  // Send metrics in batches
53
2
  ENVOY_LOG(debug, "Batching {} metrics into messages of size {}", metrics->size(), batch_size_);
54
2
  int start_idx = 0;
55

            
56
8
  while (start_idx < metrics->size()) {
57
6
    int end_idx = std::min(start_idx + static_cast<int>(batch_size_), metrics->size());
58
6
    sendBatch(*metrics, start_idx, end_idx, send_identifier);
59
6
    send_identifier = false; // Only send with first batch
60
6
    start_idx = end_idx;
61
6
  }
62
2
}
63

            
64
void GrpcMetricsStreamerImpl::sendBatch(
65
    const Envoy::Protobuf::RepeatedPtrField<io::prometheus::client::MetricFamily>& metrics,
66
15
    int start_idx, int end_idx, bool send_identifier) {
67
15
  envoy::service::metrics::v3::StreamMetricsMessage message;
68
15
  int batch_size = end_idx - start_idx;
69
15
  message.mutable_envoy_metrics()->Reserve(batch_size);
70

            
71
  // Copy directly from source metrics to message, avoiding intermediate buffer
72
1484
  for (int i = start_idx; i < end_idx; ++i) {
73
1469
    message.mutable_envoy_metrics()->Add()->CopyFrom(metrics[i]);
74
1469
  }
75

            
76
  // For perf reasons, the identifier is only sent with the first batch on a new stream
77
15
  if (send_identifier) {
78
11
    auto* identifier = message.mutable_identifier();
79
11
    *identifier->mutable_node() = local_info_.node();
80
11
  }
81

            
82
15
  if (stream_ != nullptr) {
83
15
    stream_->sendMessage(message, false);
84
15
  }
85
15
}
86

            
87
18
MetricsPtr MetricsFlusher::flush(Stats::MetricSnapshot& snapshot) const {
88
18
  auto metrics =
89
18
      std::make_unique<Envoy::Protobuf::RepeatedPtrField<io::prometheus::client::MetricFamily>>();
90

            
91
  // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we
92
  // actually preallocate the submessages and then pass ownership to the proto (rather than just
93
  // preallocating the pointer array).
94
18
  metrics->Reserve(snapshot.counters().size() + snapshot.gauges().size() +
95
18
                   snapshot.histograms().size());
96
18
  int64_t snapshot_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
97
18
                                 snapshot.snapshotTime().time_since_epoch())
98
18
                                 .count();
99
1590
  for (const auto& counter : snapshot.counters()) {
100
1590
    if (predicate_(counter.counter_.get())) {
101
225
      flushCounter(*metrics->Add(), counter, snapshot_time_ms);
102
225
    }
103
1590
  }
104

            
105
431
  for (const auto& gauge : snapshot.gauges()) {
106
422
    if (predicate_(gauge)) {
107
215
      flushGauge(*metrics->Add(), gauge.get(), snapshot_time_ms);
108
215
    }
109
422
  }
110

            
111
54
  for (const auto& histogram : snapshot.histograms()) {
112
46
    if (predicate_(histogram.get())) {
113
20
      if (emit_summary_) {
114
19
        flushSummary(*metrics->Add(), histogram.get(), snapshot_time_ms);
115
19
      }
116
20
      if (emit_histogram_) {
117
19
        flushHistogram(*metrics->Add(), histogram.get(), snapshot_time_ms);
118
19
      }
119
20
    }
120
46
  }
121

            
122
18
  return metrics;
123
18
}
124

            
125
void MetricsFlusher::flushCounter(io::prometheus::client::MetricFamily& metrics_family,
126
                                  const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot,
127
225
                                  int64_t snapshot_time_ms) const {
128
225
  auto* metric = populateMetricsFamily(metrics_family, io::prometheus::client::MetricType::COUNTER,
129
225
                                       snapshot_time_ms, counter_snapshot.counter_.get());
130
225
  auto* counter_metric = metric->mutable_counter();
131
225
  if (report_counters_as_deltas_) {
132
5
    counter_metric->set_value(counter_snapshot.delta_);
133
220
  } else {
134
220
    counter_metric->set_value(counter_snapshot.counter_.get().value());
135
220
  }
136
225
}
137

            
138
void MetricsFlusher::flushGauge(io::prometheus::client::MetricFamily& metrics_family,
139
215
                                const Stats::Gauge& gauge, int64_t snapshot_time_ms) const {
140
215
  auto* metric = populateMetricsFamily(metrics_family, io::prometheus::client::MetricType::GAUGE,
141
215
                                       snapshot_time_ms, gauge);
142
215
  auto* gauge_metric = metric->mutable_gauge();
143
215
  gauge_metric->set_value(gauge.value());
144
215
}
145

            
146
void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily& metrics_family,
147
                                    const Stats::ParentHistogram& envoy_histogram,
148
19
                                    int64_t snapshot_time_ms) const {
149

            
150
19
  const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics();
151
19
  auto* histogram_metric =
152
19
      populateMetricsFamily(metrics_family, io::prometheus::client::MetricType::HISTOGRAM,
153
19
                            snapshot_time_ms, envoy_histogram);
154
19
  auto* histogram = histogram_metric->mutable_histogram();
155
19
  histogram->set_sample_count(hist_stats.sampleCount());
156
19
  histogram->set_sample_sum(hist_stats.sampleSum());
157
285
  for (size_t i = 0; i < hist_stats.supportedBuckets().size(); i++) {
158
266
    auto* bucket = histogram->add_bucket();
159
266
    bucket->set_upper_bound(hist_stats.supportedBuckets()[i]);
160
266
    bucket->set_cumulative_count(hist_stats.computedBuckets()[i]);
161
266
  }
162
19
}
163

            
164
void MetricsFlusher::flushSummary(io::prometheus::client::MetricFamily& metrics_family,
165
                                  const Stats::ParentHistogram& envoy_histogram,
166
19
                                  int64_t snapshot_time_ms) const {
167

            
168
19
  const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics();
169
19
  auto* summary_metric =
170
19
      populateMetricsFamily(metrics_family, io::prometheus::client::MetricType::SUMMARY,
171
19
                            snapshot_time_ms, envoy_histogram);
172
19
  auto* summary = summary_metric->mutable_summary();
173
209
  for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) {
174
190
    auto* quantile = summary->add_quantile();
175
190
    quantile->set_quantile(hist_stats.supportedQuantiles()[i]);
176
190
    quantile->set_value(hist_stats.computedQuantiles()[i]);
177
190
  }
178
19
  summary->set_sample_count(hist_stats.sampleCount());
179
19
  summary->set_sample_sum(hist_stats.sampleSum());
180
19
}
181

            
182
io::prometheus::client::Metric*
183
MetricsFlusher::populateMetricsFamily(io::prometheus::client::MetricFamily& metrics_family,
184
                                      io::prometheus::client::MetricType type,
185
478
                                      int64_t snapshot_time_ms, const Stats::Metric& metric) const {
186
478
  metrics_family.set_type(type);
187
478
  auto* prometheus_metric = metrics_family.add_metric();
188
478
  prometheus_metric->set_timestamp_ms(snapshot_time_ms);
189

            
190
478
  if (emit_labels_) {
191
    // TODO(snowp): Look into the perf implication of this. We need to take a lock on the symbol
192
    // table to stringify the StatNames, which could result in some lock contention. Consider
193
    // caching the conversion between stat handle to extracted tags.
194
8
    metrics_family.set_name(metric.tagExtractedName());
195
8
    for (const auto& tag : metric.tags()) {
196
5
      auto* label = prometheus_metric->add_label();
197
5
      label->set_name(tag.name_);
198
5
      label->set_value(tag.value_);
199
5
    }
200
470
  } else {
201
470
    metrics_family.set_name(metric.name());
202
470
  }
203

            
204
478
  return prometheus_metric;
205
478
}
206
} // namespace MetricsService
207
} // namespace StatSinks
208
} // namespace Extensions
209
} // namespace Envoy