Line data Source code
1 : #pragma once 2 : 3 : #include <memory> 4 : 5 : #include "envoy/config/metrics/v3/metrics_service.pb.h" 6 : #include "envoy/grpc/async_client.h" 7 : #include "envoy/local_info/local_info.h" 8 : #include "envoy/network/connection.h" 9 : #include "envoy/service/metrics/v3/metrics_service.pb.h" 10 : #include "envoy/singleton/instance.h" 11 : #include "envoy/stats/histogram.h" 12 : #include "envoy/stats/sink.h" 13 : #include "envoy/stats/stats.h" 14 : #include "envoy/upstream/cluster_manager.h" 15 : 16 : #include "source/common/buffer/buffer_impl.h" 17 : #include "source/common/grpc/typed_async_client.h" 18 : 19 : namespace Envoy { 20 : namespace Extensions { 21 : namespace StatSinks { 22 : namespace MetricsService { 23 : 24 : using MetricsPtr = 25 : std::unique_ptr<Envoy::Protobuf::RepeatedPtrField<io::prometheus::client::MetricFamily>>; 26 : using HistogramEmitMode = envoy::config::metrics::v3::HistogramEmitMode; 27 : 28 : /** 29 : * Interface for metrics streamer. 30 : */ 31 : template <class RequestProto, class ResponseProto> 32 : class GrpcMetricsStreamer : public Grpc::AsyncStreamCallbacks<ResponseProto> { 33 : public: 34 : explicit GrpcMetricsStreamer(const Grpc::RawAsyncClientSharedPtr& raw_async_client) 35 0 : : client_(raw_async_client) {} 36 0 : ~GrpcMetricsStreamer() override = default; 37 : 38 : /** 39 : * Send Metrics Message. 40 : * @param message supplies the metrics to send. 41 : */ 42 : virtual void send(MetricsPtr&& metrics) PURE; 43 : 44 : // Grpc::AsyncStreamCallbacks 45 0 : void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} 46 0 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {} 47 0 : void onReceiveMessage(std::unique_ptr<ResponseProto>&&) override {} 48 0 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} 49 0 : void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override{}; 50 : 51 : protected: 52 : Grpc::AsyncStream<RequestProto> stream_{}; 53 : Grpc::AsyncClient<RequestProto, ResponseProto> client_; 54 : }; 55 : 56 : template <class RequestProto, class ResponseProto> 57 : using GrpcMetricsStreamerSharedPtr = 58 : std::shared_ptr<GrpcMetricsStreamer<RequestProto, ResponseProto>>; 59 : 60 : /** 61 : * Production implementation of GrpcMetricsStreamer 62 : */ 63 : class GrpcMetricsStreamerImpl 64 : : public Singleton::Instance, 65 : public GrpcMetricsStreamer<envoy::service::metrics::v3::StreamMetricsMessage, 66 : envoy::service::metrics::v3::StreamMetricsResponse> { 67 : public: 68 : GrpcMetricsStreamerImpl(Grpc::RawAsyncClientSharedPtr raw_async_client, 69 : const LocalInfo::LocalInfo& local_info); 70 : 71 : // GrpcMetricsStreamer 72 : void send(MetricsPtr&& metrics) override; 73 : 74 : // Grpc::AsyncStreamCallbacks 75 0 : void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { stream_ = nullptr; } 76 : 77 : private: 78 : const LocalInfo::LocalInfo& local_info_; 79 : const Protobuf::MethodDescriptor& service_method_; 80 : }; 81 : 82 : using GrpcMetricsStreamerImplPtr = std::unique_ptr<GrpcMetricsStreamerImpl>; 83 : 84 : class MetricsFlusher { 85 : public: 86 : MetricsFlusher( 87 : bool report_counters_as_deltas, bool emit_labels, HistogramEmitMode histogram_emit_mode, 88 : std::function<bool(const Stats::Metric&)> predicate = 89 0 : [](const auto& metric) { return metric.used(); }) 90 : : report_counters_as_deltas_(report_counters_as_deltas), emit_labels_(emit_labels), 91 : emit_summary_(histogram_emit_mode == HistogramEmitMode::SUMMARY_AND_HISTOGRAM || 92 : histogram_emit_mode == HistogramEmitMode::SUMMARY), 93 : emit_histogram_(histogram_emit_mode == HistogramEmitMode::SUMMARY_AND_HISTOGRAM || 94 : histogram_emit_mode == HistogramEmitMode::HISTOGRAM), 95 0 : predicate_(predicate) {} 96 : 97 : MetricsPtr flush(Stats::MetricSnapshot& snapshot) const; 98 : 99 : private: 100 : void flushCounter(io::prometheus::client::MetricFamily& metrics_family, 101 : const Stats::MetricSnapshot::CounterSnapshot& counter_snapshot, 102 : int64_t snapshot_time_ms) const; 103 : void flushGauge(io::prometheus::client::MetricFamily& metrics_family, const Stats::Gauge& gauge, 104 : int64_t snapshot_time_ms) const; 105 : void flushHistogram(io::prometheus::client::MetricFamily& metrics_family, 106 : const Stats::ParentHistogram& envoy_histogram, 107 : int64_t snapshot_time_ms) const; 108 : void flushSummary(io::prometheus::client::MetricFamily& metrics_family, 109 : const Stats::ParentHistogram& envoy_histogram, int64_t snapshot_time_ms) const; 110 : 111 : io::prometheus::client::Metric* 112 : populateMetricsFamily(io::prometheus::client::MetricFamily& metrics_family, 113 : io::prometheus::client::MetricType type, int64_t snapshot_time_ms, 114 : const Stats::Metric& metric) const; 115 : 116 : const bool report_counters_as_deltas_; 117 : const bool emit_labels_; 118 : const bool emit_summary_; 119 : const bool emit_histogram_; 120 : const std::function<bool(const Stats::Metric&)> predicate_; 121 : }; 122 : 123 : /** 124 : * Stat Sink that flushes metrics via a gRPC service. 125 : */ 126 : template <class RequestProto, class ResponseProto> class MetricsServiceSink : public Stats::Sink { 127 : public: 128 : MetricsServiceSink( 129 : const GrpcMetricsStreamerSharedPtr<RequestProto, ResponseProto>& grpc_metrics_streamer, 130 : bool report_counters_as_deltas, bool emit_labels, HistogramEmitMode histogram_emit_mode) 131 : : MetricsServiceSink( 132 : grpc_metrics_streamer, 133 0 : MetricsFlusher(report_counters_as_deltas, emit_labels, histogram_emit_mode)) {} 134 : 135 : MetricsServiceSink( 136 : const GrpcMetricsStreamerSharedPtr<RequestProto, ResponseProto>& grpc_metrics_streamer, 137 : MetricsFlusher&& flusher) 138 0 : : flusher_(std::move(flusher)), grpc_metrics_streamer_(std::move(grpc_metrics_streamer)) {} 139 : 140 : // MetricsService::Sink 141 0 : void flush(Stats::MetricSnapshot& snapshot) override { 142 0 : grpc_metrics_streamer_->send(flusher_.flush(snapshot)); 143 0 : } 144 0 : void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} 145 : 146 : private: 147 : const MetricsFlusher flusher_; 148 : GrpcMetricsStreamerSharedPtr<RequestProto, ResponseProto> grpc_metrics_streamer_; 149 : }; 150 : 151 : } // namespace MetricsService 152 : } // namespace StatSinks 153 : } // namespace Extensions 154 : } // namespace Envoy