Line data Source code
1 : #include "source/extensions/tracers/opentelemetry/tracer.h"
2 :
3 : #include <cstdint>
4 : #include <string>
5 :
6 : #include "envoy/config/trace/v3/opentelemetry.pb.h"
7 :
8 : #include "source/common/common/empty_string.h"
9 : #include "source/common/common/hex.h"
10 : #include "source/common/tracing/trace_context_impl.h"
11 :
12 : #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"
13 : #include "opentelemetry/proto/trace/v1/trace.pb.h"
14 :
15 : namespace Envoy {
16 : namespace Extensions {
17 : namespace Tracers {
18 : namespace OpenTelemetry {
19 :
20 : constexpr absl::string_view kDefaultVersion = "00";
21 :
22 : using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
23 :
24 : namespace {
25 :
26 0 : const Tracing::TraceContextHandler& traceParentHeader() {
27 0 : CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "traceparent");
28 0 : }
29 :
30 0 : const Tracing::TraceContextHandler& traceStateHeader() {
31 0 : CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "tracestate");
32 0 : }
33 :
34 : void callSampler(SamplerSharedPtr sampler, const absl::optional<SpanContext> span_context,
35 : Span& new_span, const std::string& operation_name,
36 0 : OptRef<const Tracing::TraceContext> trace_context) {
37 0 : if (!sampler) {
38 0 : return;
39 0 : }
40 0 : const auto sampling_result =
41 0 : sampler->shouldSample(span_context, operation_name, new_span.getTraceIdAsHex(),
42 0 : new_span.spankind(), trace_context, {});
43 0 : new_span.setSampled(sampling_result.isSampled());
44 :
45 0 : if (sampling_result.attributes) {
46 0 : for (auto const& attribute : *sampling_result.attributes) {
47 0 : new_span.setTag(attribute.first, attribute.second);
48 0 : }
49 0 : }
50 0 : if (!sampling_result.tracestate.empty()) {
51 0 : new_span.setTracestate(sampling_result.tracestate);
52 0 : }
53 0 : }
54 :
55 : } // namespace
56 :
57 : Span::Span(const std::string& name, SystemTime start_time, Envoy::TimeSource& time_source,
58 : Tracer& parent_tracer, OTelSpanKind span_kind)
59 0 : : parent_tracer_(parent_tracer), time_source_(time_source) {
60 0 : span_ = ::opentelemetry::proto::trace::v1::Span();
61 :
62 0 : span_.set_kind(span_kind);
63 :
64 0 : span_.set_name(name);
65 0 : span_.set_start_time_unix_nano(std::chrono::nanoseconds(start_time.time_since_epoch()).count());
66 0 : }
67 :
68 : Tracing::SpanPtr Span::spawnChild(const Tracing::Config&, const std::string& name,
69 0 : SystemTime start_time) {
70 : // Build span_context from the current span, then generate the child span from that context.
71 0 : SpanContext span_context(kDefaultVersion, getTraceIdAsHex(), spanId(), sampled(), tracestate());
72 0 : return parent_tracer_.startSpan(name, start_time, span_context, {},
73 0 : ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_CLIENT);
74 0 : }
75 :
76 0 : void Span::finishSpan() {
77 : // Call into the parent tracer so we can access the shared exporter.
78 0 : span_.set_end_time_unix_nano(
79 0 : std::chrono::nanoseconds(time_source_.systemTime().time_since_epoch()).count());
80 0 : if (sampled()) {
81 0 : parent_tracer_.sendSpan(span_);
82 0 : }
83 0 : }
84 :
85 : void Span::injectContext(Tracing::TraceContext& trace_context,
86 0 : const Upstream::HostDescriptionConstSharedPtr&) {
87 0 : std::string trace_id_hex = absl::BytesToHexString(span_.trace_id());
88 0 : std::string span_id_hex = absl::BytesToHexString(span_.span_id());
89 0 : std::vector<uint8_t> trace_flags_vec{sampled()};
90 0 : std::string trace_flags_hex = Hex::encode(trace_flags_vec);
91 0 : std::string traceparent_header_value =
92 0 : absl::StrCat(kDefaultVersion, "-", trace_id_hex, "-", span_id_hex, "-", trace_flags_hex);
93 : // Set the traceparent in the trace_context.
94 0 : traceParentHeader().setRefKey(trace_context, traceparent_header_value);
95 : // Also set the tracestate.
96 0 : traceStateHeader().setRefKey(trace_context, span_.trace_state());
97 0 : }
98 :
99 0 : void Span::setTag(absl::string_view name, absl::string_view value) {
100 : // The attribute key MUST be a non-null and non-empty string.
101 0 : if (name.empty()) {
102 0 : return;
103 0 : }
104 : // Attribute keys MUST be unique.
105 : // If a value already exists for this key, overwrite it.
106 0 : for (auto& key_value : *span_.mutable_attributes()) {
107 0 : if (key_value.key() == name) {
108 0 : key_value.mutable_value()->set_string_value(std::string{value});
109 0 : return;
110 0 : }
111 0 : }
112 : // If we haven't found an existing match already, we can add a new key/value.
113 0 : opentelemetry::proto::common::v1::KeyValue key_value =
114 0 : opentelemetry::proto::common::v1::KeyValue();
115 0 : opentelemetry::proto::common::v1::AnyValue value_proto =
116 0 : opentelemetry::proto::common::v1::AnyValue();
117 0 : value_proto.set_string_value(std::string{value});
118 0 : key_value.set_key(std::string{name});
119 0 : *key_value.mutable_value() = value_proto;
120 0 : *span_.add_attributes() = key_value;
121 0 : }
122 :
123 : Tracer::Tracer(OpenTelemetryTraceExporterPtr exporter, Envoy::TimeSource& time_source,
124 : Random::RandomGenerator& random, Runtime::Loader& runtime,
125 : Event::Dispatcher& dispatcher, OpenTelemetryTracerStats tracing_stats,
126 : const ResourceConstSharedPtr resource, SamplerSharedPtr sampler)
127 : : exporter_(std::move(exporter)), time_source_(time_source), random_(random), runtime_(runtime),
128 0 : tracing_stats_(tracing_stats), resource_(resource), sampler_(sampler) {
129 0 : flush_timer_ = dispatcher.createTimer([this]() -> void {
130 0 : tracing_stats_.timer_flushed_.inc();
131 0 : flushSpans();
132 0 : enableTimer();
133 0 : });
134 0 : enableTimer();
135 0 : }
136 :
137 0 : void Tracer::enableTimer() {
138 0 : const uint64_t flush_interval =
139 0 : runtime_.snapshot().getInteger("tracing.opentelemetry.flush_interval_ms", 5000U);
140 0 : flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
141 0 : }
142 :
143 0 : void Tracer::flushSpans() {
144 0 : ExportTraceServiceRequest request;
145 : // A request consists of ResourceSpans.
146 0 : ::opentelemetry::proto::trace::v1::ResourceSpans* resource_span = request.add_resource_spans();
147 0 : resource_span->set_schema_url(resource_->schema_url_);
148 :
149 : // add resource attributes
150 0 : for (auto const& att : resource_->attributes_) {
151 0 : opentelemetry::proto::common::v1::KeyValue key_value =
152 0 : opentelemetry::proto::common::v1::KeyValue();
153 0 : opentelemetry::proto::common::v1::AnyValue value_proto =
154 0 : opentelemetry::proto::common::v1::AnyValue();
155 0 : value_proto.set_string_value(std::string{att.second});
156 0 : key_value.set_key(std::string{att.first});
157 0 : *key_value.mutable_value() = value_proto;
158 0 : (*resource_span->mutable_resource()->add_attributes()) = key_value;
159 0 : }
160 :
161 0 : ::opentelemetry::proto::trace::v1::ScopeSpans* scope_span = resource_span->add_scope_spans();
162 0 : for (const auto& pending_span : span_buffer_) {
163 0 : (*scope_span->add_spans()) = pending_span;
164 0 : }
165 0 : if (exporter_) {
166 0 : tracing_stats_.spans_sent_.add(span_buffer_.size());
167 0 : if (!exporter_->log(request)) {
168 : // TODO: should there be any sort of retry or reporting here?
169 0 : ENVOY_LOG(trace, "Unsuccessful log request to OpenTelemetry trace collector.");
170 0 : }
171 0 : } else {
172 0 : ENVOY_LOG(info, "Skipping log request to OpenTelemetry: no exporter configured");
173 0 : }
174 0 : span_buffer_.clear();
175 0 : }
176 :
177 0 : void Tracer::sendSpan(::opentelemetry::proto::trace::v1::Span& span) {
178 0 : span_buffer_.push_back(span);
179 0 : const uint64_t min_flush_spans =
180 0 : runtime_.snapshot().getInteger("tracing.opentelemetry.min_flush_spans", 5U);
181 0 : if (span_buffer_.size() >= min_flush_spans) {
182 0 : flushSpans();
183 0 : }
184 0 : }
185 :
186 : Tracing::SpanPtr Tracer::startSpan(const std::string& operation_name, SystemTime start_time,
187 : Tracing::Decision tracing_decision,
188 : OptRef<const Tracing::TraceContext> trace_context,
189 0 : OTelSpanKind span_kind) {
190 : // Create an Tracers::OpenTelemetry::Span class that will contain the OTel span.
191 0 : Span new_span(operation_name, start_time, time_source_, *this, span_kind);
192 0 : uint64_t trace_id_high = random_.random();
193 0 : uint64_t trace_id = random_.random();
194 0 : new_span.setTraceId(absl::StrCat(Hex::uint64ToHex(trace_id_high), Hex::uint64ToHex(trace_id)));
195 0 : uint64_t span_id = random_.random();
196 0 : new_span.setId(Hex::uint64ToHex(span_id));
197 0 : if (sampler_) {
198 0 : callSampler(sampler_, absl::nullopt, new_span, operation_name, trace_context);
199 0 : } else {
200 0 : new_span.setSampled(tracing_decision.traced);
201 0 : }
202 0 : return std::make_unique<Span>(new_span);
203 0 : }
204 :
205 : Tracing::SpanPtr Tracer::startSpan(const std::string& operation_name, SystemTime start_time,
206 : const SpanContext& previous_span_context,
207 : OptRef<const Tracing::TraceContext> trace_context,
208 0 : OTelSpanKind span_kind) {
209 : // Create a new span and populate details from the span context.
210 0 : Span new_span(operation_name, start_time, time_source_, *this, span_kind);
211 0 : new_span.setTraceId(previous_span_context.traceId());
212 0 : if (!previous_span_context.parentId().empty()) {
213 0 : new_span.setParentId(previous_span_context.parentId());
214 0 : }
215 : // Generate a new identifier for the span id.
216 0 : uint64_t span_id = random_.random();
217 0 : new_span.setId(Hex::uint64ToHex(span_id));
218 0 : if (sampler_) {
219 : // Sampler should make a sampling decision and set tracestate
220 0 : callSampler(sampler_, previous_span_context, new_span, operation_name, trace_context);
221 0 : } else {
222 : // Respect the previous span's sampled flag.
223 0 : new_span.setSampled(previous_span_context.sampled());
224 0 : if (!previous_span_context.tracestate().empty()) {
225 0 : new_span.setTracestate(std::string{previous_span_context.tracestate()});
226 0 : }
227 0 : }
228 0 : return std::make_unique<Span>(new_span);
229 0 : }
230 :
231 : } // namespace OpenTelemetry
232 : } // namespace Tracers
233 : } // namespace Extensions
234 : } // namespace Envoy
|