1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/extensions/stat_sinks/open_telemetry/v3/open_telemetry.pb.h"
6
#include "envoy/extensions/stat_sinks/open_telemetry/v3/open_telemetry.pb.validate.h"
7
#include "envoy/grpc/async_client.h"
8
#include "envoy/local_info/local_info.h"
9
#include "envoy/singleton/instance.h"
10
#include "envoy/stats/histogram.h"
11
#include "envoy/stats/sink.h"
12
#include "envoy/stats/stats.h"
13

            
14
#include "source/common/common/matchers.h"
15
#include "source/common/grpc/typed_async_client.h"
16
#include "source/extensions/tracers/opentelemetry/resource_detectors/resource_detector.h"
17

            
18
#include "opentelemetry/proto/collector/metrics/v1/metrics_service.pb.h"
19
#include "opentelemetry/proto/common/v1/common.pb.h"
20
#include "opentelemetry/proto/metrics/v1/metrics.pb.h"
21
#include "opentelemetry/proto/resource/v1/resource.pb.h"
22

            
23
namespace Envoy {
24
namespace Extensions {
25
namespace StatSinks {
26
namespace OpenTelemetry {
27

            
28
using AggregationTemporality = opentelemetry::proto::metrics::v1::AggregationTemporality;
29
using MetricsExportRequest =
30
    opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest;
31
using MetricsExportResponse =
32
    opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceResponse;
33
using KeyValue = opentelemetry::proto::common::v1::KeyValue;
34
using MetricsExportRequestPtr = std::unique_ptr<MetricsExportRequest>;
35
using MetricsExportRequestSharedPtr = std::shared_ptr<MetricsExportRequest>;
36
using SinkConfig = envoy::extensions::stat_sinks::open_telemetry::v3::SinkConfig;
37

            
38
/**
39
 * Aggregates individual metric data points into OTLP Metric protos.
40
 * This class helps to group data points by metric name and attributes,
41
 * which is necessary for creating a valid OTLP request.
42
 */
43
class MetricAggregator : public Logger::Loggable<Logger::Id::stats> {
44
public:
45
  using AttributesMap = absl::flat_hash_map<std::string, std::string>;
46

            
47
  explicit MetricAggregator(bool enable_metric_aggregation, int64_t snapshot_time_ns,
48
                            int64_t delta_start_time_ns, int64_t cumulative_start_time_ns)
49
37
      : enable_metric_aggregation_(enable_metric_aggregation), snapshot_time_ns_(snapshot_time_ns),
50
37
        delta_start_time_ns_(delta_start_time_ns),
51
37
        cumulative_start_time_ns_(cumulative_start_time_ns) {}
52

            
53
  // Key used to group data points by their attributes.
54
  struct DataPointKey {
55
    AttributesMap attributes;
56

            
57
114
    template <typename H> friend H AbslHashValue(H h, const DataPointKey& k) {
58
114
      return H::combine(std::move(h), k.attributes);
59
114
    }
60

            
61
121
    bool operator==(const DataPointKey& other) const { return attributes == other.attributes; }
62
  };
63

            
64
  // Holds the Metric proto and maps for quick lookups of data points.
65
  struct MetricData {
66
    ::opentelemetry::proto::metrics::v1::Metric metric;
67
    absl::flat_hash_map<DataPointKey, ::opentelemetry::proto::metrics::v1::NumberDataPoint*>
68
        gauge_points;
69
    absl::flat_hash_map<DataPointKey, ::opentelemetry::proto::metrics::v1::NumberDataPoint*>
70
        counter_points;
71
    absl::flat_hash_map<DataPointKey, ::opentelemetry::proto::metrics::v1::HistogramDataPoint*>
72
        histogram_points;
73
  };
74

            
75
  // Adds a gauge metric data point. Aggregates by summing if a point with the
76
  // same attributes exists.
77
  void addGauge(
78
      absl::string_view metric_name, int64_t value,
79
      const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes);
80

            
81
  // Adds a counter metric data point. Aggregates by summing the delta or value
82
  // based on temporality if a point with the same attributes exists.
83
  void addCounter(
84
      absl::string_view metric_name, uint64_t value, uint64_t delta,
85
      ::opentelemetry::proto::metrics::v1::AggregationTemporality temporality,
86
      const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes);
87

            
88
  // Adds a histogram metric data point. Aggregates counts and sums if a point
89
  // with the same attributes and compatible bounds exists.
90
  void addHistogram(
91
      absl::string_view stat_name, absl::string_view metric_name,
92
      const Stats::HistogramStatistics& stats,
93
      ::opentelemetry::proto::metrics::v1::AggregationTemporality temporality,
94
      const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes);
95

            
96
  // Returns a RepeatedPtrField of ResourceMetrics containing all aggregated
97
  // metrics.
98
  Protobuf::RepeatedPtrField<::opentelemetry::proto::metrics::v1::ResourceMetrics>
99
  getResourceMetrics(const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>&
100
                         resource_attributes) const;
101

            
102
private:
103
  // Converts a RepeatedPtrField of KeyValue to an AttributesMap.
104
  static AttributesMap GetAttributesMap(
105
      const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attrs);
106

            
107
  // Gets or creates a MetricData object for a given metric name.
108
  MetricData& getOrCreateMetric(absl::string_view metric_name);
109

            
110
  // Sets common fields for a data point.
111
  // For gauge metrics,
112
  // temporality should be AGGREGATION_TEMPORALITY_UNSPECIFIED.
113
  template <typename DataPoint>
114
  void setCommonDataPoint(
115
      DataPoint& data_point,
116
      const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes,
117
2293
      ::opentelemetry::proto::metrics::v1::AggregationTemporality temporality) {
118
2293
    data_point.set_time_unix_nano(snapshot_time_ns_);
119
2293
    data_point.mutable_attributes()->CopyFrom(attributes);
120
2293
    switch (temporality) {
121
7
    case AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA:
122
7
      data_point.set_start_time_unix_nano(delta_start_time_ns_);
123
7
      break;
124
1187
    case AggregationTemporality::AGGREGATION_TEMPORALITY_CUMULATIVE:
125
1187
      data_point.set_start_time_unix_nano(cumulative_start_time_ns_);
126
1187
      break;
127
1099
    default:
128
      // Do not set start time for UNSPECIFIED.
129
1099
      break;
130
2293
    }
131
2293
  }
132

            
133
  const bool enable_metric_aggregation_;
134
  const int64_t snapshot_time_ns_;
135
  const int64_t delta_start_time_ns_;
136
  const int64_t cumulative_start_time_ns_;
137
  absl::flat_hash_map<std::string, MetricData> metrics_;
138

            
139
  // Currently, the metrics without defined in `custom_metric_conversions` won't be aggregated and
140
  // will be directly stored in this list.
141
  std::vector<::opentelemetry::proto::metrics::v1::Metric> non_aggregated_metrics_;
142
};
143

            
144
class OtlpOptions {
145
public:
146
  OtlpOptions(const SinkConfig& sink_config, const Tracers::OpenTelemetry::Resource& resource,
147
              Server::Configuration::ServerFactoryContext& server);
148

            
149
39
  bool reportCountersAsDeltas() { return report_counters_as_deltas_; }
150
126
  bool reportHistogramsAsDeltas() { return report_histograms_as_deltas_; }
151
2304
  bool emitTagsAsAttributes() { return emit_tags_as_attributes_; }
152
2269
  bool useTagExtractedName() { return use_tag_extracted_name_; }
153
2269
  absl::string_view statPrefix() { return stat_prefix_; }
154
  const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>&
155
41
  resource_attributes() const {
156
41
    return resource_attributes_;
157
41
  }
158

            
159
2308
  const Envoy::Matcher::MatchTreeSharedPtr<Stats::StatMatchingData> matcher() const {
160
2308
    return matcher_;
161
2308
  }
162
37
  bool enableMetricAggregation() const { return enable_metric_aggregation_; }
163

            
164
private:
165
  const bool report_counters_as_deltas_;
166
  const bool report_histograms_as_deltas_;
167
  const bool emit_tags_as_attributes_;
168
  const bool use_tag_extracted_name_;
169
  const std::string stat_prefix_;
170
  bool enable_metric_aggregation_;
171
  const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue> resource_attributes_;
172
  const Envoy::Matcher::MatchTreeSharedPtr<Stats::StatMatchingData> matcher_;
173
};
174

            
175
using OtlpOptionsSharedPtr = std::shared_ptr<OtlpOptions>;
176

            
177
class OtlpMetricsFlusher {
178
public:
179
31
  virtual ~OtlpMetricsFlusher() = default;
180

            
181
  /**
182
   * Creates an OTLP export request from metric snapshot.
183
   * @param snapshot supplies the metrics snapshot to send.
184
   */
185
  virtual MetricsExportRequestPtr flush(Stats::MetricSnapshot& snapshot,
186
                                        int64_t delta_start_time_ns,
187
                                        int64_t cumulative_start_time_ns) const PURE;
188
};
189

            
190
using OtlpMetricsFlusherSharedPtr = std::shared_ptr<OtlpMetricsFlusher>;
191

            
192
/**
193
 * Production implementation of OtlpMetricsFlusher
194
 */
195
class OtlpMetricsFlusherImpl : public OtlpMetricsFlusher,
196
                               public Logger::Loggable<Logger::Id::stats> {
197
public:
198
  OtlpMetricsFlusherImpl(
199
      const OtlpOptionsSharedPtr config, std::function<bool(const Stats::Metric&)> predicate =
200
10206
                                             [](const auto& metric) { return metric.used(); })
201
30
      : config_(config), predicate_(predicate) {}
202

            
203
  MetricsExportRequestPtr flush(Stats::MetricSnapshot& snapshot, int64_t delta_start_time_ns,
204
                                int64_t cumulative_start_time_ns) const override;
205

            
206
private:
207
  struct MetricConfig {
208
    bool drop_stat{false};
209
    OptRef<const SinkConfig::ConversionAction> conversion_action;
210
  };
211

            
212
private:
213
  template <class StatType> MetricConfig getMetricConfig(const StatType& stat) const;
214

            
215
  template <class StatType>
216
  std::string getMetricName(const StatType& stat,
217
                            OptRef<const SinkConfig::ConversionAction> conversion_config) const;
218

            
219
  template <class StatType>
220
  Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>
221
  getCombinedAttributes(const StatType& stat,
222
                        OptRef<const SinkConfig::ConversionAction> conversion_config) const;
223
  template <class GaugeType>
224
  void addGaugeDataPoint(opentelemetry::proto::metrics::v1::Metric& metric,
225
                         const GaugeType& gauge_stat, int64_t snapshot_time_ns) const;
226

            
227
  template <class CounterType>
228
  void addCounterDataPoint(opentelemetry::proto::metrics::v1::Metric& metric,
229
                           const CounterType& counter, uint64_t value, uint64_t delta,
230
                           int64_t snapshot_time_ns) const;
231

            
232
  void addHistogramDataPoint(opentelemetry::proto::metrics::v1::Metric& metric,
233
                             const Stats::ParentHistogram& parent_histogram,
234
                             int64_t snapshot_time_ns) const;
235

            
236
  template <class StatType>
237
  void setMetricCommon(opentelemetry::proto::metrics::v1::NumberDataPoint& data_point,
238
                       int64_t snapshot_time_ns, const StatType& stat) const;
239

            
240
  void setMetricCommon(opentelemetry::proto::metrics::v1::HistogramDataPoint& data_point,
241
                       int64_t snapshot_time_ns, const Stats::Metric& stat) const;
242

            
243
  const OtlpOptionsSharedPtr config_;
244
  const std::function<bool(const Stats::Metric&)> predicate_;
245
};
246

            
247
/**
248
 * Abstract base class for OTLP metrics exporters.
249
 */
250
class OtlpMetricsExporter {
251
public:
252
22
  virtual ~OtlpMetricsExporter() = default;
253

            
254
  /**
255
   * Send metrics to the configured OTLP service.
256
   * @param metrics the OTLP metrics export request.
257
   */
258
  virtual void send(MetricsExportRequestPtr&& metrics) PURE;
259
};
260

            
261
using OtlpMetricsExporterSharedPtr = std::shared_ptr<OtlpMetricsExporter>;
262

            
263
/**
264
 * gRPC implementation of OtlpMetricsExporter.
265
 */
266
class OpenTelemetryGrpcMetricsExporter : public OtlpMetricsExporter,
267
                                         public Grpc::AsyncRequestCallbacks<MetricsExportResponse> {
268
public:
269
10
  ~OpenTelemetryGrpcMetricsExporter() override = default;
270

            
271
  // Grpc::AsyncRequestCallbacks
272
12
  void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
273
};
274

            
275
using OpenTelemetryGrpcMetricsExporterSharedPtr = std::shared_ptr<OpenTelemetryGrpcMetricsExporter>;
276

            
277
/**
278
 * Production implementation of OpenTelemetryGrpcMetricsExporter
279
 */
280
class OpenTelemetryGrpcMetricsExporterImpl : public Singleton::Instance,
281
                                             public OpenTelemetryGrpcMetricsExporter,
282
                                             public Logger::Loggable<Logger::Id::stats> {
283
public:
284
  OpenTelemetryGrpcMetricsExporterImpl(const OtlpOptionsSharedPtr config,
285
                                       Grpc::RawAsyncClientSharedPtr raw_async_client);
286

            
287
  // OpenTelemetryGrpcMetricsExporter
288
  void send(MetricsExportRequestPtr&& metrics) override;
289

            
290
  // Grpc::AsyncRequestCallbacks
291
  void onSuccess(Grpc::ResponsePtr<MetricsExportResponse>&&, Tracing::Span&) override;
292
  void onFailure(Grpc::Status::GrpcStatus, const std::string&, Tracing::Span&) override;
293

            
294
private:
295
  const OtlpOptionsSharedPtr config_;
296
  Grpc::AsyncClient<MetricsExportRequest, MetricsExportResponse> client_;
297
  const Protobuf::MethodDescriptor& service_method_;
298
};
299

            
300
using OpenTelemetryGrpcMetricsExporterImplPtr =
301
    std::unique_ptr<OpenTelemetryGrpcMetricsExporterImpl>;
302

            
303
/**
304
 * Stats sink that exports metrics via OTLP (gRPC or HTTP).
305
 */
306
class OpenTelemetrySink : public Stats::Sink {
307
public:
308
  OpenTelemetrySink(const OtlpMetricsFlusherSharedPtr& otlp_metrics_flusher,
309
                    const OtlpMetricsExporterSharedPtr& metrics_exporter, int64_t create_time_ns)
310
15
      : metrics_flusher_(otlp_metrics_flusher), metrics_exporter_(metrics_exporter),
311
        // Use the time when the sink is created as the last flush time for the first flush.
312
15
        last_flush_time_ns_(create_time_ns), proxy_start_time_ns_(create_time_ns) {}
313

            
314
  // Stats::Sink
315
22
  void flush(Stats::MetricSnapshot& snapshot) override {
316
22
    const int64_t current_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
317
22
                                        snapshot.snapshotTime().time_since_epoch())
318
22
                                        .count();
319
22
    metrics_exporter_->send(
320
22
        metrics_flusher_->flush(snapshot, last_flush_time_ns_, proxy_start_time_ns_));
321
22
    last_flush_time_ns_ = current_time_ns;
322
22
  }
323

            
324
152
  void onHistogramComplete(const Stats::Histogram&, uint64_t) override {}
325

            
326
private:
327
  const OtlpMetricsFlusherSharedPtr metrics_flusher_;
328
  const OtlpMetricsExporterSharedPtr metrics_exporter_;
329
  int64_t last_flush_time_ns_;
330
  int64_t proxy_start_time_ns_;
331
};
332
} // namespace OpenTelemetry
333
} // namespace StatSinks
334
} // namespace Extensions
335
} // namespace Envoy