1
#include "source/extensions/stat_sinks/open_telemetry/open_telemetry_impl.h"
2

            
3
#include "source/common/tracing/null_span_impl.h"
4
#include "source/extensions/stat_sinks/open_telemetry/stat_match_action.h"
5

            
6
namespace Envoy {
7
namespace Extensions {
8
namespace StatSinks {
9
namespace OpenTelemetry {
10

            
11
using ::opentelemetry::proto::metrics::v1::AggregationTemporality;
12
using ::opentelemetry::proto::metrics::v1::HistogramDataPoint;
13
using ::opentelemetry::proto::metrics::v1::Metric;
14
using ::opentelemetry::proto::metrics::v1::NumberDataPoint;
15
using ::opentelemetry::proto::metrics::v1::ResourceMetrics;
16

            
17
MetricAggregator::AttributesMap MetricAggregator::GetAttributesMap(
18
461
    const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attrs) {
19
461
  AttributesMap map;
20
464
  for (const auto& attr : attrs) {
21
372
    map[attr.key()] = attr.value().string_value();
22
372
  }
23
461
  return map;
24
461
}
25

            
26
461
MetricAggregator::MetricData& MetricAggregator::getOrCreateMetric(absl::string_view metric_name) {
27
461
  auto& metric_data = metrics_[metric_name];
28
461
  if (metric_data.metric.name().empty()) {
29
397
    metric_data.metric.set_name(metric_name);
30
397
  }
31
461
  return metric_data;
32
461
}
33

            
34
void MetricAggregator::addGauge(
35
    absl::string_view metric_name, int64_t value,
36
1100
    const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes) {
37
1100
  if (!enable_metric_aggregation_) {
38
881
    Metric metric;
39
881
    metric.set_name(metric_name);
40
881
    NumberDataPoint* data_point = metric.mutable_gauge()->add_data_points();
41
881
    setCommonDataPoint(*data_point, attributes,
42
881
                       AggregationTemporality::AGGREGATION_TEMPORALITY_UNSPECIFIED);
43
881
    data_point->set_as_int(value);
44
881
    non_aggregated_metrics_.push_back(std::move(metric));
45
881
    return;
46
881
  }
47

            
48
219
  MetricData& metric_data = getOrCreateMetric(metric_name);
49
219
  DataPointKey key{GetAttributesMap(attributes)};
50

            
51
219
  auto it = metric_data.gauge_points.find(key);
52
219
  if (it != metric_data.gauge_points.end()) {
53
    // If the data point exists, update it and return.
54
1
    NumberDataPoint* data_point = it->second;
55

            
56
    // Multiple stats are mapped to the same metric and we
57
    // aggregate by summing the new value to the existing one.
58
1
    data_point->set_as_int(data_point->as_int() + value);
59
1
    return;
60
1
  }
61

            
62
  // If the data point does not exist, create a new one.
63
218
  NumberDataPoint* data_point = metric_data.metric.mutable_gauge()->add_data_points();
64
218
  metric_data.gauge_points[key] = data_point;
65
218
  setCommonDataPoint(*data_point, attributes,
66
218
                     AggregationTemporality::AGGREGATION_TEMPORALITY_UNSPECIFIED);
67
218
  data_point->set_as_int(value);
68
218
}
69

            
70
void MetricAggregator::addCounter(
71
    absl::string_view metric_name, uint64_t value, uint64_t delta,
72
    AggregationTemporality temporality,
73
1115
    const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes) {
74
1115
  const uint64_t point_value =
75
1115
      (temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) ? delta : value;
76
1115
  if (point_value == 0 && temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) {
77
1
    return;
78
1
  }
79
1114
  if (!enable_metric_aggregation_) {
80
893
    Metric metric;
81
893
    metric.set_name(metric_name);
82
893
    metric.mutable_sum()->set_is_monotonic(true);
83
893
    metric.mutable_sum()->set_aggregation_temporality(temporality);
84
893
    NumberDataPoint* data_point = metric.mutable_sum()->add_data_points();
85
893
    setCommonDataPoint(*data_point, attributes, temporality);
86
893
    data_point->set_as_int(point_value);
87
893
    non_aggregated_metrics_.push_back(std::move(metric));
88
893
    return;
89
893
  }
90
221
  MetricData& metric_data = getOrCreateMetric(metric_name);
91

            
92
221
  DataPointKey key{GetAttributesMap(attributes)};
93
221
  auto it = metric_data.counter_points.find(key);
94
221
  if (it != metric_data.counter_points.end()) {
95
    // If the data point exists, update it and return.
96
1
    NumberDataPoint* data_point = it->second;
97
    // For DELTA, add the change since the last export. For CUMULATIVE, add the
98
    // total value.
99
1
    data_point->set_as_int(data_point->as_int() + point_value);
100
1
    return;
101
1
  }
102

            
103
  // If the data point does not exist, create a new one.
104
220
  NumberDataPoint* data_point = metric_data.metric.mutable_sum()->add_data_points();
105
220
  metric_data.metric.mutable_sum()->set_is_monotonic(true);
106
220
  metric_data.metric.mutable_sum()->set_aggregation_temporality(temporality);
107
220
  metric_data.counter_points[key] = data_point;
108
220
  setCommonDataPoint(*data_point, attributes, temporality);
109
220
  data_point->set_as_int(point_value);
110
220
}
111

            
112
void MetricAggregator::addHistogram(
113
    absl::string_view stat_name, absl::string_view metric_name,
114
    const Stats::HistogramStatistics& stats, AggregationTemporality temporality,
115
87
    const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>& attributes) {
116
87
  if (stats.sampleCount() == 0 &&
117
87
      temporality == AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA) {
118
1
    return;
119
1
  }
120
86
  if (!enable_metric_aggregation_) {
121
65
    Metric metric;
122
65
    metric.set_name(metric_name);
123
65
    metric.mutable_histogram()->set_aggregation_temporality(temporality);
124
65
    HistogramDataPoint* data_point = metric.mutable_histogram()->add_data_points();
125
65
    setCommonDataPoint(*data_point, attributes, temporality);
126

            
127
65
    data_point->set_count(stats.sampleCount());
128
65
    data_point->set_sum(stats.sampleSum());
129

            
130
65
    std::vector<uint64_t> bucket_counts = stats.computeDisjointBuckets();
131
1300
    for (size_t i = 0; i < stats.supportedBuckets().size(); i++) {
132
1235
      data_point->add_explicit_bounds(stats.supportedBuckets()[i]);
133
1235
      data_point->add_bucket_counts(bucket_counts[i]);
134
1235
    }
135
65
    data_point->add_bucket_counts(stats.outOfBoundCount());
136
65
    non_aggregated_metrics_.push_back(std::move(metric));
137
65
    return;
138
65
  }
139
21
  MetricData& metric_data = getOrCreateMetric(metric_name);
140

            
141
21
  DataPointKey key{GetAttributesMap(attributes)};
142
21
  auto it = metric_data.histogram_points.find(key);
143
21
  if (it != metric_data.histogram_points.end()) {
144
    // If the data point exists, update it and return.
145
5
    HistogramDataPoint* data_point = it->second;
146
5
    std::vector<uint64_t> new_bucket_counts = stats.computeDisjointBuckets();
147
5
    if (static_cast<size_t>(data_point->explicit_bounds_size()) ==
148
5
            stats.supportedBuckets().size() &&
149
5
        static_cast<size_t>(data_point->bucket_counts_size()) == new_bucket_counts.size() + 1) {
150
      // Aggregate count and sum.
151
5
      data_point->set_count(data_point->count() + stats.sampleCount());
152
5
      data_point->set_sum(data_point->sum() + stats.sampleSum());
153

            
154
      // Aggregate bucket_counts.
155
100
      for (size_t i = 0; i < new_bucket_counts.size(); ++i) {
156
95
        data_point->set_bucket_counts(i, data_point->bucket_counts(i) + new_bucket_counts[i]);
157
95
      }
158
5
      data_point->set_bucket_counts(new_bucket_counts.size(),
159
5
                                    data_point->bucket_counts(new_bucket_counts.size()) +
160
5
                                        stats.outOfBoundCount());
161
5
    } else {
162
      ENVOY_LOG(error, "Histogram bounds mismatch for metric {} aggregated from stat {}",
163
                metric_name, stat_name);
164
    }
165
5
    return;
166
5
  }
167

            
168
  // If the data point does not exist, create a new one.
169
16
  HistogramDataPoint* data_point = metric_data.metric.mutable_histogram()->add_data_points();
170
16
  metric_data.metric.mutable_histogram()->set_aggregation_temporality(temporality);
171
16
  metric_data.histogram_points[key] = data_point;
172
  // Set common fields directly here
173
16
  setCommonDataPoint(*data_point, attributes, temporality);
174

            
175
16
  data_point->set_count(stats.sampleCount());
176
16
  data_point->set_sum(stats.sampleSum());
177
  // TODO(ohadvano): support min/max optional fields for
178
  // ``HistogramDataPoint``
179

            
180
16
  std::vector<uint64_t> bucket_counts = stats.computeDisjointBuckets();
181
320
  for (size_t i = 0; i < stats.supportedBuckets().size(); i++) {
182
304
    data_point->add_explicit_bounds(stats.supportedBuckets()[i]);
183
304
    data_point->add_bucket_counts(bucket_counts[i]);
184
304
  }
185
16
  data_point->add_bucket_counts(stats.outOfBoundCount());
186
16
}
187

            
188
Protobuf::RepeatedPtrField<ResourceMetrics> MetricAggregator::getResourceMetrics(
189
    const Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>&
190
37
        resource_attributes) const {
191
37
  Protobuf::RepeatedPtrField<ResourceMetrics> resource_metrics_list;
192
37
  if (metrics_.empty() && non_aggregated_metrics_.empty()) {
193
    return resource_metrics_list;
194
  }
195

            
196
37
  auto* resource_metrics = resource_metrics_list.Add();
197
37
  resource_metrics->mutable_resource()->mutable_attributes()->CopyFrom(resource_attributes);
198
37
  auto* scope_metrics = resource_metrics->add_scope_metrics();
199

            
200
399
  for (auto const& [key, metric_data] : metrics_) {
201
397
    *scope_metrics->add_metrics() = metric_data.metric;
202
397
  }
203
1839
  for (const auto& metric : non_aggregated_metrics_) {
204
1839
    *scope_metrics->add_metrics() = metric;
205
1839
  }
206
37
  return resource_metrics_list;
207
37
}
208

            
209
Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>
210
34
generateResourceAttributes(const Tracers::OpenTelemetry::Resource& resource) {
211
34
  Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue> resource_attributes;
212
61
  for (const auto& attr : resource.attributes_) {
213
44
    auto* attribute = resource_attributes.Add();
214
44
    attribute->set_key(attr.first);
215
44
    attribute->mutable_value()->set_string_value(attr.second);
216
44
  }
217
34
  return resource_attributes;
218
34
}
219

            
220
Matcher::MatchTreePtr<Stats::StatMatchingData>
221
createMatcher(const xds::type::matcher::v3::Matcher& matcher_config,
222
34
              Server::Configuration::ServerFactoryContext& server_factory_context) {
223
34
  ActionValidationVisitor validation_visitor;
224
34
  ActionContext action_context;
225
34
  Matcher::MatchTreeFactory<Stats::StatMatchingData, ActionContext> factory{
226
34
      action_context, server_factory_context, validation_visitor};
227
34
  return factory.create(matcher_config)();
228
34
}
229

            
230
OtlpOptions::OtlpOptions(const SinkConfig& sink_config,
231
                         const Tracers::OpenTelemetry::Resource& resource,
232
                         Server::Configuration::ServerFactoryContext& server)
233
34
    : report_counters_as_deltas_(sink_config.report_counters_as_deltas()),
234
34
      report_histograms_as_deltas_(sink_config.report_histograms_as_deltas()),
235
      emit_tags_as_attributes_(
236
34
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, emit_tags_as_attributes, true)),
237
      use_tag_extracted_name_(
238
34
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, use_tag_extracted_name, true)),
239
34
      stat_prefix_(!sink_config.prefix().empty() ? sink_config.prefix() + "." : ""),
240
34
      enable_metric_aggregation_(sink_config.has_custom_metric_conversions()),
241
34
      resource_attributes_(generateResourceAttributes(resource)),
242
34
      matcher_(createMatcher(sink_config.custom_metric_conversions(), server)) {}
243

            
244
OpenTelemetryGrpcMetricsExporterImpl::OpenTelemetryGrpcMetricsExporterImpl(
245
    const OtlpOptionsSharedPtr config, Grpc::RawAsyncClientSharedPtr raw_async_client)
246
9
    : config_(config), client_(raw_async_client),
247
9
      service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
248
9
          "opentelemetry.proto.collector.metrics.v1.MetricsService."
249
9
          "Export")) {}
250

            
251
13
void OpenTelemetryGrpcMetricsExporterImpl::send(MetricsExportRequestPtr&& export_request) {
252
13
  ENVOY_LOG(debug, "sending a OTLP metric request: {}", export_request->DebugString());
253
13
  client_->send(service_method_, *export_request, *this, Tracing::NullSpan::instance(),
254
13
                Http::AsyncClient::RequestOptions());
255
13
}
256

            
257
void OpenTelemetryGrpcMetricsExporterImpl::onSuccess(
258
7
    Grpc::ResponsePtr<MetricsExportResponse>&& export_response, Tracing::Span&) {
259
7
  if (export_response->has_partial_success()) {
260
1
    ENVOY_LOG(debug,
261
1
              "export response with partial success; {} rejected, collector "
262
1
              "message: {}",
263
1
              export_response->partial_success().rejected_data_points(),
264
1
              export_response->partial_success().error_message());
265
1
  }
266
7
}
267

            
268
void OpenTelemetryGrpcMetricsExporterImpl::onFailure(Grpc::Status::GrpcStatus response_status,
269
                                                     const std::string& response_message,
270
3
                                                     Tracing::Span&) {
271
3
  ENVOY_LOG(debug, "export failure; status: {}, message: {}", response_status, response_message);
272
3
}
273

            
274
template <class StatType>
275
OtlpMetricsFlusherImpl::MetricConfig
276
2308
OtlpMetricsFlusherImpl::getMetricConfig(const StatType& stat) const {
277
2308
  Stats::StatMatchingDataImpl<StatType> data(stat);
278
2308
  const ::Envoy::Matcher::ActionMatchResult result =
279
2308
      Envoy::Matcher::evaluateMatch<Stats::StatMatchingData>(*config_->matcher(), data);
280
2308
  ASSERT(result.isComplete());
281
2308
  if (result.isMatch()) {
282
41
    if (dynamic_cast<const DropAction*>(result.action().get())) {
283
6
      return {true, {}};
284
6
    }
285

            
286
35
    if (const auto* match_action = dynamic_cast<const ConversionAction*>(result.action().get())) {
287
35
      return {false, *match_action->config()};
288
35
    }
289

            
290
    ENVOY_LOG(error, "Unknown action type for custom metric conversion: {}",
291
              result.action()->typeUrl());
292
  }
293

            
294
  // By default, this stat will be converted to the metric without any
295
  // customization.
296
2267
  return {false, {}};
297
2308
}
298

            
299
template <class StatType>
300
std::string OtlpMetricsFlusherImpl::getMetricName(
301
2302
    const StatType& stat, OptRef<const SinkConfig::ConversionAction> conversion_config) const {
302
2302
  if (conversion_config.has_value()) {
303
35
    return conversion_config->metric_name();
304
35
  }
305
2267
  return absl::StrCat(config_->statPrefix(),
306
2267
                      config_->useTagExtractedName() ? stat.tagExtractedName() : stat.name());
307
2302
}
308

            
309
template <class StatType>
310
Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue>
311
OtlpMetricsFlusherImpl::getCombinedAttributes(
312
2302
    const StatType& stat, OptRef<const SinkConfig::ConversionAction> conversion_config) const {
313
2302
  Protobuf::RepeatedPtrField<opentelemetry::proto::common::v1::KeyValue> attributes;
314
2302
  if (config_->emitTagsAsAttributes()) {
315
2299
    for (const auto& tag : stat.tags()) {
316
1850
      auto* attribute = attributes.Add();
317
1850
      attribute->set_key(tag.name_);
318
1850
      attribute->mutable_value()->set_string_value(tag.value_);
319
1850
    }
320
2299
  }
321
2302
  if (conversion_config.has_value()) {
322
35
    for (const auto& attr : conversion_config->static_metric_labels()) {
323
3
      *attributes.Add() = attr;
324
3
    }
325
35
  }
326
2302
  return attributes;
327
2302
}
328

            
329
MetricsExportRequestPtr OtlpMetricsFlusherImpl::flush(Stats::MetricSnapshot& snapshot,
330
                                                      int64_t delta_start_time_ns,
331
37
                                                      int64_t cumulative_start_time_ns) const {
332
37
  auto request = std::make_unique<MetricsExportRequest>();
333
37
  MetricAggregator aggregator =
334
37
      MetricAggregator(config_->enableMetricAggregation(),
335
37
                       std::chrono::duration_cast<std::chrono::nanoseconds>(
336
37
                           snapshot.snapshotTime().time_since_epoch())
337
37
                           .count(),
338
37
                       delta_start_time_ns, cumulative_start_time_ns);
339

            
340
  // Process Gauges
341
2112
  for (const auto& gauge : snapshot.gauges()) {
342
2111
    if (predicate_(gauge)) {
343
1096
      auto metric_config = getMetricConfig(gauge.get());
344
1096
      if (metric_config.drop_stat) {
345
1
        continue;
346
1
      }
347

            
348
1095
      const std::string metric_name = getMetricName(gauge.get(), metric_config.conversion_action);
349
1095
      auto attributes = getCombinedAttributes(gauge.get(), metric_config.conversion_action);
350
1095
      aggregator.addGauge(metric_name, gauge.get().value(), attributes);
351
2110
    };
352
2110
  }
353
37
  for (const auto& gauge : snapshot.hostGauges()) {
354
6
    auto metric_config = getMetricConfig(gauge);
355
6
    if (metric_config.drop_stat) {
356
1
      continue;
357
1
    }
358

            
359
5
    const std::string metric_name = getMetricName(gauge, metric_config.conversion_action);
360
5
    auto attributes = getCombinedAttributes(gauge, metric_config.conversion_action);
361
5
    aggregator.addGauge(metric_name, gauge.value(), attributes);
362
5
  }
363

            
364
  // Process Counters
365
37
  AggregationTemporality counter_temporality =
366
37
      config_->reportCountersAsDeltas()
367
37
          ? AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA
368
37
          : AggregationTemporality::AGGREGATION_TEMPORALITY_CUMULATIVE;
369
7877
  for (const auto& counter : snapshot.counters()) {
370
7877
    if (predicate_(counter.counter_)) {
371
1109
      auto metric_config = getMetricConfig(counter.counter_.get());
372
1109
      if (metric_config.drop_stat) {
373
2
        continue;
374
2
      }
375

            
376
1107
      const std::string metric_name =
377
1107
          getMetricName(counter.counter_.get(), metric_config.conversion_action);
378
1107
      auto attributes =
379
1107
          getCombinedAttributes(counter.counter_.get(), metric_config.conversion_action);
380
1107
      aggregator.addCounter(metric_name, counter.counter_.get().value(), counter.delta_,
381
1107
                            counter_temporality, attributes);
382
1107
    }
383
7877
  }
384
37
  for (const auto& counter : snapshot.hostCounters()) {
385
9
    auto metric_config = getMetricConfig(counter);
386
9
    if (metric_config.drop_stat) {
387
1
      continue;
388
1
    }
389

            
390
8
    const std::string metric_name = getMetricName(counter, metric_config.conversion_action);
391
8
    auto attributes = getCombinedAttributes(counter, metric_config.conversion_action);
392
8
    aggregator.addCounter(metric_name, counter.value(), counter.delta(), counter_temporality,
393
8
                          attributes);
394
8
  }
395

            
396
  // Process Histograms
397
37
  AggregationTemporality histogram_temporality =
398
37
      config_->reportHistogramsAsDeltas()
399
37
          ? AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA
400
37
          : AggregationTemporality::AGGREGATION_TEMPORALITY_CUMULATIVE;
401
218
  for (const auto& histogram : snapshot.histograms()) {
402
218
    if (predicate_(histogram)) {
403
88
      auto metric_config = getMetricConfig(histogram.get());
404
88
      if (metric_config.drop_stat) {
405
1
        continue;
406
1
      }
407

            
408
87
      const std::string metric_name =
409
87
          getMetricName(histogram.get(), metric_config.conversion_action);
410
87
      auto attributes = getCombinedAttributes(histogram.get(), metric_config.conversion_action);
411
87
      const Stats::HistogramStatistics& histogram_stats =
412
87
          config_->reportHistogramsAsDeltas() ? histogram.get().intervalStatistics()
413
87
                                              : histogram.get().cumulativeStatistics();
414
87
      aggregator.addHistogram(histogram.get().name(), metric_name, histogram_stats,
415
87
                              histogram_temporality, attributes);
416
87
    }
417
218
  }
418
  // Add all aggregated metrics to the request.
419
37
  *request->mutable_resource_metrics() =
420
37
      aggregator.getResourceMetrics(config_->resource_attributes());
421
37
  return request;
422
37
}
423
} // namespace OpenTelemetry
424
} // namespace StatSinks
425
} // namespace Extensions
426
} // namespace Envoy