Line data Source code
1 : #pragma once 2 : 3 : #include <memory> 4 : 5 : #include "envoy/extensions/stat_sinks/open_telemetry/v3/open_telemetry.pb.h" 6 : #include "envoy/extensions/stat_sinks/open_telemetry/v3/open_telemetry.pb.validate.h" 7 : #include "envoy/grpc/async_client.h" 8 : #include "envoy/local_info/local_info.h" 9 : #include "envoy/singleton/instance.h" 10 : #include "envoy/stats/histogram.h" 11 : #include "envoy/stats/sink.h" 12 : #include "envoy/stats/stats.h" 13 : 14 : #include "source/common/grpc/typed_async_client.h" 15 : 16 : #include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h" 17 : #include "opentelemetry/proto/common/v1/common.pb.h" 18 : #include "opentelemetry/proto/metrics/v1/metrics.pb.h" 19 : #include "opentelemetry/proto/resource/v1/resource.pb.h" 20 : 21 : namespace Envoy { 22 : namespace Extensions { 23 : namespace StatSinks { 24 : namespace OpenTelemetry { 25 : 26 : using AggregationTemporality = opentelemetry::proto::metrics::v1::AggregationTemporality; 27 : using MetricsExportRequest = 28 : opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest; 29 : using MetricsExportResponse = 30 : opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse; 31 : using KeyValue = opentelemetry::proto::common::v1::KeyValue; 32 : using MetricsExportRequestPtr = std::unique_ptr<MetricsExportRequest>; 33 : using MetricsExportRequestSharedPtr = std::shared_ptr<MetricsExportRequest>; 34 : using SinkConfig = envoy::extensions::stat_sinks::open_telemetry::v3::SinkConfig; 35 : 36 : class OtlpOptions { 37 : public: 38 : OtlpOptions(const SinkConfig& sink_config); 39 : 40 0 : bool reportCountersAsDeltas() { return report_counters_as_deltas_; } 41 0 : bool reportHistogramsAsDeltas() { return report_histograms_as_deltas_; } 42 0 : bool emitTagsAsAttributes() { return emit_tags_as_attributes_; } 43 0 : bool useTagExtractedName() { return use_tag_extracted_name_; } 44 0 : const std::string& statPrefix() { return stat_prefix_; } 45 : 46 : private: 47 : const bool report_counters_as_deltas_; 48 : const bool report_histograms_as_deltas_; 49 : const bool emit_tags_as_attributes_; 50 : const bool use_tag_extracted_name_; 51 : const std::string stat_prefix_; 52 : }; 53 : 54 : using OtlpOptionsSharedPtr = std::shared_ptr<OtlpOptions>; 55 : 56 : class OtlpMetricsFlusher { 57 : public: 58 0 : virtual ~OtlpMetricsFlusher() = default; 59 : 60 : /** 61 : * Creates an OTLP export request from metric snapshot. 62 : * @param snapshot supplies the metrics snapshot to send. 63 : */ 64 : virtual MetricsExportRequestPtr flush(Stats::MetricSnapshot& snapshot) const PURE; 65 : }; 66 : 67 : using OtlpMetricsFlusherSharedPtr = std::shared_ptr<OtlpMetricsFlusher>; 68 : 69 : /** 70 : * Production implementation of OtlpMetricsFlusher 71 : */ 72 : class OtlpMetricsFlusherImpl : public OtlpMetricsFlusher { 73 : public: 74 : OtlpMetricsFlusherImpl( 75 : const OtlpOptionsSharedPtr config, std::function<bool(const Stats::Metric&)> predicate = 76 0 : [](const auto& metric) { return metric.used(); }) 77 0 : : config_(config), predicate_(predicate) {} 78 : 79 : MetricsExportRequestPtr flush(Stats::MetricSnapshot& snapshot) const override; 80 : 81 : private: 82 : template <class GaugeType> 83 : void flushGauge(opentelemetry::proto::metrics::v1::Metric& metric, const GaugeType& gauge, 84 : int64_t snapshot_time_ns) const; 85 : 86 : template <class CounterType> 87 : void flushCounter(opentelemetry::proto::metrics::v1::Metric& metric, const CounterType& counter, 88 : uint64_t value, uint64_t delta, int64_t snapshot_time_ns) const; 89 : 90 : void flushHistogram(opentelemetry::proto::metrics::v1::Metric& metric, 91 : const Stats::ParentHistogram& parent_histogram, 92 : int64_t snapshot_time_ns) const; 93 : 94 : template <class StatType> 95 : void setMetricCommon(opentelemetry::proto::metrics::v1::Metric& metric, 96 : opentelemetry::proto::metrics::v1::NumberDataPoint& data_point, 97 : int64_t snapshot_time_ns, const StatType& stat) const; 98 : 99 : void setMetricCommon(opentelemetry::proto::metrics::v1::Metric& metric, 100 : opentelemetry::proto::metrics::v1::HistogramDataPoint& data_point, 101 : int64_t snapshot_time_ns, const Stats::Metric& stat) const; 102 : 103 : const OtlpOptionsSharedPtr config_; 104 : const std::function<bool(const Stats::Metric&)> predicate_; 105 : }; 106 : 107 : class OpenTelemetryGrpcMetricsExporter : public Grpc::AsyncRequestCallbacks<MetricsExportResponse> { 108 : public: 109 0 : ~OpenTelemetryGrpcMetricsExporter() override = default; 110 : 111 : /** 112 : * Send Metrics Message. 113 : * @param message supplies the metrics to send. 114 : */ 115 : virtual void send(MetricsExportRequestPtr&& metrics) PURE; 116 : 117 : // Grpc::AsyncRequestCallbacks 118 0 : void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} 119 : }; 120 : 121 : using OpenTelemetryGrpcMetricsExporterSharedPtr = std::shared_ptr<OpenTelemetryGrpcMetricsExporter>; 122 : 123 : /** 124 : * Production implementation of OpenTelemetryGrpcMetricsExporter 125 : */ 126 : class OpenTelemetryGrpcMetricsExporterImpl : public Singleton::Instance, 127 : public OpenTelemetryGrpcMetricsExporter, 128 : public Logger::Loggable<Logger::Id::stats> { 129 : public: 130 : OpenTelemetryGrpcMetricsExporterImpl(const OtlpOptionsSharedPtr config, 131 : Grpc::RawAsyncClientSharedPtr raw_async_client); 132 : 133 : // OpenTelemetryGrpcMetricsExporter 134 : void send(MetricsExportRequestPtr&& metrics) override; 135 : 136 : // Grpc::AsyncRequestCallbacks 137 : void onSuccess(Grpc::ResponsePtr<MetricsExportResponse>&&, Tracing::Span&) override; 138 : void onFailure(Grpc::Status::GrpcStatus, const std::string&, Tracing::Span&) override; 139 : 140 : private: 141 : const OtlpOptionsSharedPtr config_; 142 : Grpc::AsyncClient<MetricsExportRequest, MetricsExportResponse> client_; 143 : const Protobuf::MethodDescriptor& service_method_; 144 : }; 145 : 146 : using OpenTelemetryGrpcMetricsExporterImplPtr = 147 : std::unique_ptr<OpenTelemetryGrpcMetricsExporterImpl>; 148 : 149 : class OpenTelemetryGrpcSink : public Stats::Sink { 150 : public: 151 : OpenTelemetryGrpcSink(const OtlpMetricsFlusherSharedPtr& otlp_metrics_flusher, 152 : const OpenTelemetryGrpcMetricsExporterSharedPtr& grpc_metrics_exporter) 153 0 : : metrics_flusher_(otlp_metrics_flusher), metrics_exporter_(grpc_metrics_exporter) {} 154 : 155 : // Stats::Sink 156 0 : void flush(Stats::MetricSnapshot& snapshot) override { 157 0 : metrics_exporter_->send(metrics_flusher_->flush(snapshot)); 158 0 : } 159 : 160 0 : void onHistogramComplete(const Stats::Histogram&, uint64_t) override {} 161 : 162 : private: 163 : const OtlpMetricsFlusherSharedPtr metrics_flusher_; 164 : const OpenTelemetryGrpcMetricsExporterSharedPtr metrics_exporter_; 165 : }; 166 : 167 : } // namespace OpenTelemetry 168 : } // namespace StatSinks 169 : } // namespace Extensions 170 : } // namespace Envoy