Line data Source code
1 : #include "source/extensions/stat_sinks/metrics_service/grpc_metrics_service_impl.h" 2 : 3 : #include <chrono> 4 : 5 : #include "envoy/common/exception.h" 6 : #include "envoy/config/metrics/v3/metrics_service.pb.h" 7 : #include "envoy/event/dispatcher.h" 8 : #include "envoy/service/metrics/v3/metrics_service.pb.h" 9 : #include "envoy/stats/histogram.h" 10 : #include "envoy/stats/stats.h" 11 : #include "envoy/upstream/cluster_manager.h" 12 : 13 : #include "source/common/common/assert.h" 14 : #include "source/common/common/utility.h" 15 : #include "source/common/config/utility.h" 16 : 17 : namespace Envoy { 18 : namespace Extensions { 19 : namespace StatSinks { 20 : namespace MetricsService { 21 : 22 : GrpcMetricsStreamerImpl::GrpcMetricsStreamerImpl(Grpc::RawAsyncClientSharedPtr raw_async_client, 23 : const LocalInfo::LocalInfo& local_info) 24 : : GrpcMetricsStreamer<envoy::service::metrics::v3::StreamMetricsMessage, 25 : envoy::service::metrics::v3::StreamMetricsResponse>(raw_async_client), 26 : local_info_(local_info), 27 : service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( 28 0 : "envoy.service.metrics.v3.MetricsService.StreamMetrics")) {} 29 : 30 0 : void GrpcMetricsStreamerImpl::send(MetricsPtr&& metrics) { 31 0 : envoy::service::metrics::v3::StreamMetricsMessage message; 32 0 : message.mutable_envoy_metrics()->Reserve(metrics->size()); 33 0 : message.mutable_envoy_metrics()->MergeFrom(*metrics); 34 : 35 0 : if (stream_ == nullptr) { 36 0 : stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions()); 37 : // For perf reasons, the identifier is only sent on establishing the stream. 38 0 : auto* identifier = message.mutable_identifier(); 39 0 : *identifier->mutable_node() = local_info_.node(); 40 0 : } 41 0 : if (stream_ != nullptr) { 42 0 : stream_->sendMessage(message, false); 43 0 : } 44 0 : } 45 : 46 0 : MetricsPtr MetricsFlusher::flush(Stats::MetricSnapshot& snapshot) const { 47 0 : auto metrics = 48 0 : std::make_unique<Envoy::Protobuf::RepeatedPtrField<io::prometheus::client::MetricFamily>>(); 49 : 50 : // TODO(mrice32): there's probably some more sophisticated preallocation we can do here where we 51 : // actually preallocate the submessages and then pass ownership to the proto (rather than just 52 : // preallocating the pointer array). 53 0 : metrics->Reserve(snapshot.counters().size() + snapshot.gauges().size() + 54 0 : snapshot.histograms().size()); 55 0 : int64_t snapshot_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>( 56 0 : snapshot.snapshotTime().time_since_epoch()) 57 0 : .count(); 58 0 : for (const auto& counter : snapshot.counters()) { 59 0 : if (predicate_(counter.counter_.get())) { 60 0 : flushCounter(*metrics->Add(), counter, snapshot_time_ms); 61 0 : } 62 0 : } 63 : 64 0 : for (const auto& gauge : snapshot.gauges()) { 65 0 : if (predicate_(gauge)) { 66 0 : flushGauge(*metrics->Add(), gauge.get(), snapshot_time_ms); 67 0 : } 68 0 : } 69 : 70 0 : for (const auto& histogram : snapshot.histograms()) { 71 0 : if (predicate_(histogram.get())) { 72 0 : if (emit_summary_) { 73 0 : flushSummary(*metrics->Add(), histogram.get(), snapshot_time_ms); 74 0 : } 75 0 : if (emit_histogram_) { 76 0 : flushHistogram(*metrics->Add(), histogram.get(), snapshot_time_ms); 77 0 : } 78 0 : } 79 0 : } 80 : 81 0 : return metrics; 82 0 : } 83 : 84 : void MetricsFlusher::flushCounter(io::prometheus::client::MetricFamily& metrics_family, 85 : const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, 86 0 : int64_t snapshot_time_ms) const { 87 0 : auto* metric = populateMetricsFamily(metrics_family, io::prometheus::client::MetricType::COUNTER, 88 0 : snapshot_time_ms, counter_snapshot.counter_.get()); 89 0 : auto* counter_metric = metric->mutable_counter(); 90 0 : if (report_counters_as_deltas_) { 91 0 : counter_metric->set_value(counter_snapshot.delta_); 92 0 : } else { 93 0 : counter_metric->set_value(counter_snapshot.counter_.get().value()); 94 0 : } 95 0 : } 96 : 97 : void MetricsFlusher::flushGauge(io::prometheus::client::MetricFamily& metrics_family, 98 0 : const Stats::Gauge& gauge, int64_t snapshot_time_ms) const { 99 0 : auto* metric = populateMetricsFamily(metrics_family, io::prometheus::client::MetricType::GAUGE, 100 0 : snapshot_time_ms, gauge); 101 0 : auto* gauge_metric = metric->mutable_gauge(); 102 0 : gauge_metric->set_value(gauge.value()); 103 0 : } 104 : 105 : void MetricsFlusher::flushHistogram(io::prometheus::client::MetricFamily& metrics_family, 106 : const Stats::ParentHistogram& envoy_histogram, 107 0 : int64_t snapshot_time_ms) const { 108 : 109 0 : const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); 110 0 : auto* histogram_metric = 111 0 : populateMetricsFamily(metrics_family, io::prometheus::client::MetricType::HISTOGRAM, 112 0 : snapshot_time_ms, envoy_histogram); 113 0 : auto* histogram = histogram_metric->mutable_histogram(); 114 0 : histogram->set_sample_count(hist_stats.sampleCount()); 115 0 : histogram->set_sample_sum(hist_stats.sampleSum()); 116 0 : for (size_t i = 0; i < hist_stats.supportedBuckets().size(); i++) { 117 0 : auto* bucket = histogram->add_bucket(); 118 0 : bucket->set_upper_bound(hist_stats.supportedBuckets()[i]); 119 0 : bucket->set_cumulative_count(hist_stats.computedBuckets()[i]); 120 0 : } 121 0 : } 122 : 123 : void MetricsFlusher::flushSummary(io::prometheus::client::MetricFamily& metrics_family, 124 : const Stats::ParentHistogram& envoy_histogram, 125 0 : int64_t snapshot_time_ms) const { 126 : 127 0 : const Stats::HistogramStatistics& hist_stats = envoy_histogram.intervalStatistics(); 128 0 : auto* summary_metric = 129 0 : populateMetricsFamily(metrics_family, io::prometheus::client::MetricType::SUMMARY, 130 0 : snapshot_time_ms, envoy_histogram); 131 0 : auto* summary = summary_metric->mutable_summary(); 132 0 : for (size_t i = 0; i < hist_stats.supportedQuantiles().size(); i++) { 133 0 : auto* quantile = summary->add_quantile(); 134 0 : quantile->set_quantile(hist_stats.supportedQuantiles()[i]); 135 0 : quantile->set_value(hist_stats.computedQuantiles()[i]); 136 0 : } 137 0 : summary->set_sample_count(hist_stats.sampleCount()); 138 0 : } 139 : 140 : io::prometheus::client::Metric* 141 : MetricsFlusher::populateMetricsFamily(io::prometheus::client::MetricFamily& metrics_family, 142 : io::prometheus::client::MetricType type, 143 0 : int64_t snapshot_time_ms, const Stats::Metric& metric) const { 144 0 : metrics_family.set_type(type); 145 0 : auto* prometheus_metric = metrics_family.add_metric(); 146 0 : prometheus_metric->set_timestamp_ms(snapshot_time_ms); 147 : 148 0 : if (emit_labels_) { 149 : // TODO(snowp): Look into the perf implication of this. We need to take a lock on the symbol 150 : // table to stringify the StatNames, which could result in some lock contention. Consider 151 : // caching the conversion between stat handle to extracted tags. 152 0 : metrics_family.set_name(metric.tagExtractedName()); 153 0 : for (const auto& tag : metric.tags()) { 154 0 : auto* label = prometheus_metric->add_label(); 155 0 : label->set_name(tag.name_); 156 0 : label->set_value(tag.value_); 157 0 : } 158 0 : } else { 159 0 : metrics_family.set_name(metric.name()); 160 0 : } 161 : 162 0 : return prometheus_metric; 163 0 : } 164 : } // namespace MetricsService 165 : } // namespace StatSinks 166 : } // namespace Extensions 167 : } // namespace Envoy