Line data Source code
1 : #include "source/extensions/stat_sinks/open_telemetry/open_telemetry_impl.h"
2 :
3 : #include "source/common/tracing/null_span_impl.h"
4 :
5 : namespace Envoy {
6 : namespace Extensions {
7 : namespace StatSinks {
8 : namespace OpenTelemetry {
9 :
10 : OtlpOptions::OtlpOptions(const SinkConfig& sink_config)
11 : : report_counters_as_deltas_(sink_config.report_counters_as_deltas()),
12 : report_histograms_as_deltas_(sink_config.report_histograms_as_deltas()),
13 : emit_tags_as_attributes_(
14 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, emit_tags_as_attributes, true)),
15 : use_tag_extracted_name_(
16 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(sink_config, use_tag_extracted_name, true)),
17 0 : stat_prefix_(!sink_config.prefix().empty() ? sink_config.prefix() + "." : "") {}
18 :
19 : OpenTelemetryGrpcMetricsExporterImpl::OpenTelemetryGrpcMetricsExporterImpl(
20 : const OtlpOptionsSharedPtr config, Grpc::RawAsyncClientSharedPtr raw_async_client)
21 : : config_(config), client_(raw_async_client),
22 : service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
23 0 : "opentelemetry.proto.collector.metrics.v1.MetricsService.Export")) {}
24 :
25 0 : void OpenTelemetryGrpcMetricsExporterImpl::send(MetricsExportRequestPtr&& export_request) {
26 0 : client_->send(service_method_, *export_request, *this, Tracing::NullSpan::instance(),
27 0 : Http::AsyncClient::RequestOptions());
28 0 : }
29 :
30 : void OpenTelemetryGrpcMetricsExporterImpl::onSuccess(
31 0 : Grpc::ResponsePtr<MetricsExportResponse>&& export_response, Tracing::Span&) {
32 0 : if (export_response->has_partial_success()) {
33 0 : ENVOY_LOG(debug, "export response with partial success; {} rejected, collector message: {}",
34 0 : export_response->partial_success().rejected_data_points(),
35 0 : export_response->partial_success().error_message());
36 0 : }
37 0 : }
38 :
39 : void OpenTelemetryGrpcMetricsExporterImpl::onFailure(Grpc::Status::GrpcStatus response_status,
40 : const std::string& response_message,
41 0 : Tracing::Span&) {
42 0 : ENVOY_LOG(debug, "export failure; status: {}, message: {}", response_status, response_message);
43 0 : }
44 :
45 0 : MetricsExportRequestPtr OtlpMetricsFlusherImpl::flush(Stats::MetricSnapshot& snapshot) const {
46 0 : auto request = std::make_unique<MetricsExportRequest>();
47 0 : auto* resource_metrics = request->add_resource_metrics();
48 0 : auto* scope_metrics = resource_metrics->add_scope_metrics();
49 :
50 0 : int64_t snapshot_time_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
51 0 : snapshot.snapshotTime().time_since_epoch())
52 0 : .count();
53 :
54 0 : for (const auto& gauge : snapshot.gauges()) {
55 0 : if (predicate_(gauge)) {
56 0 : flushGauge(*scope_metrics->add_metrics(), gauge.get(), snapshot_time_ns);
57 0 : }
58 0 : }
59 :
60 0 : for (const auto& gauge : snapshot.hostGauges()) {
61 0 : flushGauge(*scope_metrics->add_metrics(), gauge, snapshot_time_ns);
62 0 : }
63 :
64 0 : for (const auto& counter : snapshot.counters()) {
65 0 : if (predicate_(counter.counter_)) {
66 0 : flushCounter(*scope_metrics->add_metrics(), counter.counter_.get(),
67 0 : counter.counter_.get().value(), counter.delta_, snapshot_time_ns);
68 0 : }
69 0 : }
70 :
71 0 : for (const auto& counter : snapshot.hostCounters()) {
72 0 : flushCounter(*scope_metrics->add_metrics(), counter, counter.value(), counter.delta(),
73 0 : snapshot_time_ns);
74 0 : }
75 :
76 0 : for (const auto& histogram : snapshot.histograms()) {
77 0 : if (predicate_(histogram)) {
78 0 : flushHistogram(*scope_metrics->add_metrics(), histogram, snapshot_time_ns);
79 0 : }
80 0 : }
81 :
82 0 : return request;
83 0 : }
84 :
85 : template <class GaugeType>
86 : void OtlpMetricsFlusherImpl::flushGauge(opentelemetry::proto::metrics::v1::Metric& metric,
87 : const GaugeType& gauge_stat,
88 0 : int64_t snapshot_time_ns) const {
89 0 : auto* data_point = metric.mutable_gauge()->add_data_points();
90 0 : data_point->set_time_unix_nano(snapshot_time_ns);
91 0 : setMetricCommon(metric, *data_point, snapshot_time_ns, gauge_stat);
92 :
93 0 : data_point->set_as_int(gauge_stat.value());
94 0 : }
95 :
96 : template <class CounterType>
97 : void OtlpMetricsFlusherImpl::flushCounter(opentelemetry::proto::metrics::v1::Metric& metric,
98 : const CounterType& counter, uint64_t value,
99 0 : uint64_t delta, int64_t snapshot_time_ns) const {
100 0 : auto* sum = metric.mutable_sum();
101 0 : sum->set_is_monotonic(true);
102 0 : auto* data_point = sum->add_data_points();
103 0 : setMetricCommon(metric, *data_point, snapshot_time_ns, counter);
104 :
105 0 : if (config_->reportCountersAsDeltas()) {
106 0 : sum->set_aggregation_temporality(AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA);
107 0 : data_point->set_as_int(delta);
108 0 : } else {
109 0 : sum->set_aggregation_temporality(AggregationTemporality::AGGREGATION_TEMPORALITY_CUMULATIVE);
110 0 : data_point->set_as_int(value);
111 0 : }
112 0 : }
113 :
114 : void OtlpMetricsFlusherImpl::flushHistogram(opentelemetry::proto::metrics::v1::Metric& metric,
115 : const Stats::ParentHistogram& parent_histogram,
116 0 : int64_t snapshot_time_ns) const {
117 0 : auto* histogram = metric.mutable_histogram();
118 0 : auto* data_point = histogram->add_data_points();
119 0 : setMetricCommon(metric, *data_point, snapshot_time_ns, parent_histogram);
120 :
121 0 : histogram->set_aggregation_temporality(
122 0 : config_->reportHistogramsAsDeltas()
123 0 : ? AggregationTemporality::AGGREGATION_TEMPORALITY_DELTA
124 0 : : AggregationTemporality::AGGREGATION_TEMPORALITY_CUMULATIVE);
125 :
126 0 : const Stats::HistogramStatistics& histogram_stats = config_->reportHistogramsAsDeltas()
127 0 : ? parent_histogram.intervalStatistics()
128 0 : : parent_histogram.cumulativeStatistics();
129 :
130 0 : data_point->set_count(histogram_stats.sampleCount());
131 0 : data_point->set_sum(histogram_stats.sampleSum());
132 : // TODO(ohadvano): support min/max optional fields for ``HistogramDataPoint``
133 :
134 0 : std::vector<uint64_t> bucket_counts = histogram_stats.computeDisjointBuckets();
135 0 : for (size_t i = 0; i < histogram_stats.supportedBuckets().size(); i++) {
136 0 : data_point->add_explicit_bounds(histogram_stats.supportedBuckets()[i]);
137 0 : data_point->add_bucket_counts(bucket_counts[i]);
138 0 : }
139 :
140 : // According to the spec, the number of bucket counts needs to be one element bigger
141 : // than the size of the explicit bounds, and the last bucket should contain the count
142 : // of values which are outside the explicit boundaries (to +infinity).
143 0 : data_point->add_bucket_counts(histogram_stats.outOfBoundCount());
144 0 : }
145 :
146 : template <class StatType>
147 : void OtlpMetricsFlusherImpl::setMetricCommon(
148 : opentelemetry::proto::metrics::v1::Metric& metric,
149 : opentelemetry::proto::metrics::v1::NumberDataPoint& data_point, int64_t snapshot_time_ns,
150 0 : const StatType& stat) const {
151 0 : data_point.set_time_unix_nano(snapshot_time_ns);
152 : // TODO(ohadvano): support ``start_time_unix_nano`` optional field
153 0 : metric.set_name(absl::StrCat(config_->statPrefix(), config_->useTagExtractedName()
154 0 : ? stat.tagExtractedName()
155 0 : : stat.name()));
156 :
157 0 : if (config_->emitTagsAsAttributes()) {
158 0 : for (const auto& tag : stat.tags()) {
159 0 : auto* attribute = data_point.add_attributes();
160 0 : attribute->set_key(tag.name_);
161 0 : attribute->mutable_value()->set_string_value(tag.value_);
162 0 : }
163 0 : }
164 0 : }
165 :
166 : void OtlpMetricsFlusherImpl::setMetricCommon(
167 : opentelemetry::proto::metrics::v1::Metric& metric,
168 : opentelemetry::proto::metrics::v1::HistogramDataPoint& data_point, int64_t snapshot_time_ns,
169 0 : const Stats::Metric& stat) const {
170 0 : data_point.set_time_unix_nano(snapshot_time_ns);
171 : // TODO(ohadvano): support ``start_time_unix_nano optional`` field
172 0 : metric.set_name(absl::StrCat(config_->statPrefix(), config_->useTagExtractedName()
173 0 : ? stat.tagExtractedName()
174 0 : : stat.name()));
175 :
176 0 : if (config_->emitTagsAsAttributes()) {
177 0 : for (const auto& tag : stat.tags()) {
178 0 : auto* attribute = data_point.add_attributes();
179 0 : attribute->set_key(tag.name_);
180 0 : attribute->mutable_value()->set_string_value(tag.value_);
181 0 : }
182 0 : }
183 0 : }
184 :
185 : } // namespace OpenTelemetry
186 : } // namespace StatSinks
187 : } // namespace Extensions
188 : } // namespace Envoy
|