/proc/self/cwd/source/extensions/tracers/opentelemetry/tracer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/tracers/opentelemetry/tracer.h" |
2 | | |
3 | | #include <cstdint> |
4 | | #include <string> |
5 | | |
6 | | #include "envoy/config/trace/v3/opentelemetry.pb.h" |
7 | | |
8 | | #include "source/common/common/empty_string.h" |
9 | | #include "source/common/common/hex.h" |
10 | | |
11 | | #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h" |
12 | | #include "opentelemetry/proto/trace/v1/trace.pb.h" |
13 | | |
14 | | namespace Envoy { |
15 | | namespace Extensions { |
16 | | namespace Tracers { |
17 | | namespace OpenTelemetry { |
18 | | |
19 | | constexpr absl::string_view kTraceParent = "traceparent"; |
20 | | constexpr absl::string_view kTraceState = "tracestate"; |
21 | | constexpr absl::string_view kDefaultVersion = "00"; |
22 | | |
23 | | using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; |
24 | | |
25 | | namespace { |
26 | | |
27 | | void callSampler(SamplerSharedPtr sampler, const absl::optional<SpanContext> span_context, |
28 | 0 | Span& new_span, const std::string& operation_name) { |
29 | 0 | if (!sampler) { |
30 | 0 | return; |
31 | 0 | } |
32 | 0 | const auto sampling_result = sampler->shouldSample( |
33 | 0 | span_context, operation_name, new_span.getTraceIdAsHex(), new_span.spankind(), {}, {}); |
34 | 0 | new_span.setSampled(sampling_result.isSampled()); |
35 | |
|
36 | 0 | if (sampling_result.attributes) { |
37 | 0 | for (auto const& attribute : *sampling_result.attributes) { |
38 | 0 | new_span.setTag(attribute.first, attribute.second); |
39 | 0 | } |
40 | 0 | } |
41 | 0 | if (!sampling_result.tracestate.empty()) { |
42 | 0 | new_span.setTracestate(sampling_result.tracestate); |
43 | 0 | } |
44 | 0 | } |
45 | | |
46 | | } // namespace |
47 | | |
48 | | Span::Span(const Tracing::Config& config, const std::string& name, SystemTime start_time, |
49 | | Envoy::TimeSource& time_source, Tracer& parent_tracer, bool downstream_span) |
50 | 0 | : parent_tracer_(parent_tracer), time_source_(time_source) { |
51 | 0 | span_ = ::opentelemetry::proto::trace::v1::Span(); |
52 | |
|
53 | 0 | if (downstream_span) { |
54 | | // If this is downstream span that be created by 'startSpan' for downstream request, then |
55 | | // set the span type based on the spawnUpstreamSpan flag and traffic direction: |
56 | | // * If separate tracing span will be created for upstream request, then set span type to |
57 | | // SERVER because the downstream span should be server span in trace chain. |
58 | | // * If separate tracing span will not be created for upstream request, that means the |
59 | | // Envoy will not be treated as independent hop in trace chain and then set span type |
60 | | // based on the traffic direction. |
61 | 0 | span_.set_kind(config.spawnUpstreamSpan() |
62 | 0 | ? ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER |
63 | 0 | : config.operationName() == Tracing::OperationName::Egress |
64 | 0 | ? ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_CLIENT |
65 | 0 | : ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_SERVER); |
66 | 0 | } else { |
67 | | // If this is an non-downstream span that be created for upstream request or async HTTP/gRPC |
68 | | // request, then set the span type to client always. |
69 | 0 | span_.set_kind(::opentelemetry::proto::trace::v1::Span::SPAN_KIND_CLIENT); |
70 | 0 | } |
71 | |
|
72 | 0 | span_.set_name(name); |
73 | 0 | span_.set_start_time_unix_nano(std::chrono::nanoseconds(start_time.time_since_epoch()).count()); |
74 | 0 | } |
75 | | |
76 | | Tracing::SpanPtr Span::spawnChild(const Tracing::Config& config, const std::string& name, |
77 | 0 | SystemTime start_time) { |
78 | | // Build span_context from the current span, then generate the child span from that context. |
79 | 0 | SpanContext span_context(kDefaultVersion, getTraceIdAsHex(), spanId(), sampled(), tracestate()); |
80 | 0 | return parent_tracer_.startSpan(config, name, start_time, span_context, |
81 | 0 | /*downstream_span*/ false); |
82 | 0 | } |
83 | | |
84 | 0 | void Span::finishSpan() { |
85 | | // Call into the parent tracer so we can access the shared exporter. |
86 | 0 | span_.set_end_time_unix_nano( |
87 | 0 | std::chrono::nanoseconds(time_source_.systemTime().time_since_epoch()).count()); |
88 | 0 | if (sampled()) { |
89 | 0 | parent_tracer_.sendSpan(span_); |
90 | 0 | } |
91 | 0 | } |
92 | | |
93 | | void Span::injectContext(Tracing::TraceContext& trace_context, |
94 | 0 | const Upstream::HostDescriptionConstSharedPtr&) { |
95 | 0 | std::string trace_id_hex = absl::BytesToHexString(span_.trace_id()); |
96 | 0 | std::string span_id_hex = absl::BytesToHexString(span_.span_id()); |
97 | 0 | std::vector<uint8_t> trace_flags_vec{sampled()}; |
98 | 0 | std::string trace_flags_hex = Hex::encode(trace_flags_vec); |
99 | 0 | std::string traceparent_header_value = |
100 | 0 | absl::StrCat(kDefaultVersion, "-", trace_id_hex, "-", span_id_hex, "-", trace_flags_hex); |
101 | | // Set the traceparent in the trace_context. |
102 | 0 | trace_context.setByReferenceKey(kTraceParent, traceparent_header_value); |
103 | | // Also set the tracestate. |
104 | 0 | trace_context.setByReferenceKey(kTraceState, span_.trace_state()); |
105 | 0 | } |
106 | | |
107 | 0 | void Span::setTag(absl::string_view name, absl::string_view value) { |
108 | | // The attribute key MUST be a non-null and non-empty string. |
109 | 0 | if (name.empty()) { |
110 | 0 | return; |
111 | 0 | } |
112 | | // Attribute keys MUST be unique. |
113 | | // If a value already exists for this key, overwrite it. |
114 | 0 | for (auto& key_value : *span_.mutable_attributes()) { |
115 | 0 | if (key_value.key() == name) { |
116 | 0 | key_value.mutable_value()->set_string_value(std::string{value}); |
117 | 0 | return; |
118 | 0 | } |
119 | 0 | } |
120 | | // If we haven't found an existing match already, we can add a new key/value. |
121 | 0 | opentelemetry::proto::common::v1::KeyValue key_value = |
122 | 0 | opentelemetry::proto::common::v1::KeyValue(); |
123 | 0 | opentelemetry::proto::common::v1::AnyValue value_proto = |
124 | 0 | opentelemetry::proto::common::v1::AnyValue(); |
125 | 0 | value_proto.set_string_value(std::string{value}); |
126 | 0 | key_value.set_key(std::string{name}); |
127 | 0 | *key_value.mutable_value() = value_proto; |
128 | 0 | *span_.add_attributes() = key_value; |
129 | 0 | } |
130 | | |
131 | | Tracer::Tracer(OpenTelemetryTraceExporterPtr exporter, Envoy::TimeSource& time_source, |
132 | | Random::RandomGenerator& random, Runtime::Loader& runtime, |
133 | | Event::Dispatcher& dispatcher, OpenTelemetryTracerStats tracing_stats, |
134 | | const ResourceConstSharedPtr resource, SamplerSharedPtr sampler) |
135 | | : exporter_(std::move(exporter)), time_source_(time_source), random_(random), runtime_(runtime), |
136 | 0 | tracing_stats_(tracing_stats), resource_(resource), sampler_(sampler) { |
137 | 0 | flush_timer_ = dispatcher.createTimer([this]() -> void { |
138 | 0 | tracing_stats_.timer_flushed_.inc(); |
139 | 0 | flushSpans(); |
140 | 0 | enableTimer(); |
141 | 0 | }); |
142 | 0 | enableTimer(); |
143 | 0 | } |
144 | | |
145 | 0 | void Tracer::enableTimer() { |
146 | 0 | const uint64_t flush_interval = |
147 | 0 | runtime_.snapshot().getInteger("tracing.opentelemetry.flush_interval_ms", 5000U); |
148 | 0 | flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval)); |
149 | 0 | } |
150 | | |
151 | 0 | void Tracer::flushSpans() { |
152 | 0 | ExportTraceServiceRequest request; |
153 | | // A request consists of ResourceSpans. |
154 | 0 | ::opentelemetry::proto::trace::v1::ResourceSpans* resource_span = request.add_resource_spans(); |
155 | 0 | resource_span->set_schema_url(resource_->schemaUrl_); |
156 | | |
157 | | // add resource attributes |
158 | 0 | for (auto const& att : resource_->attributes_) { |
159 | 0 | opentelemetry::proto::common::v1::KeyValue key_value = |
160 | 0 | opentelemetry::proto::common::v1::KeyValue(); |
161 | 0 | opentelemetry::proto::common::v1::AnyValue value_proto = |
162 | 0 | opentelemetry::proto::common::v1::AnyValue(); |
163 | 0 | value_proto.set_string_value(std::string{att.second}); |
164 | 0 | key_value.set_key(std::string{att.first}); |
165 | 0 | *key_value.mutable_value() = value_proto; |
166 | 0 | (*resource_span->mutable_resource()->add_attributes()) = key_value; |
167 | 0 | } |
168 | |
|
169 | 0 | ::opentelemetry::proto::trace::v1::ScopeSpans* scope_span = resource_span->add_scope_spans(); |
170 | 0 | for (const auto& pending_span : span_buffer_) { |
171 | 0 | (*scope_span->add_spans()) = pending_span; |
172 | 0 | } |
173 | 0 | if (exporter_) { |
174 | 0 | tracing_stats_.spans_sent_.add(span_buffer_.size()); |
175 | 0 | if (!exporter_->log(request)) { |
176 | | // TODO: should there be any sort of retry or reporting here? |
177 | 0 | ENVOY_LOG(trace, "Unsuccessful log request to OpenTelemetry trace collector."); |
178 | 0 | } |
179 | 0 | } else { |
180 | 0 | ENVOY_LOG(info, "Skipping log request to OpenTelemetry: no exporter configured"); |
181 | 0 | } |
182 | 0 | span_buffer_.clear(); |
183 | 0 | } |
184 | | |
185 | 0 | void Tracer::sendSpan(::opentelemetry::proto::trace::v1::Span& span) { |
186 | 0 | span_buffer_.push_back(span); |
187 | 0 | const uint64_t min_flush_spans = |
188 | 0 | runtime_.snapshot().getInteger("tracing.opentelemetry.min_flush_spans", 5U); |
189 | 0 | if (span_buffer_.size() >= min_flush_spans) { |
190 | 0 | flushSpans(); |
191 | 0 | } |
192 | 0 | } |
193 | | |
194 | | Tracing::SpanPtr Tracer::startSpan(const Tracing::Config& config, const std::string& operation_name, |
195 | | SystemTime start_time, Tracing::Decision tracing_decision, |
196 | 0 | bool downstream_span) { |
197 | | // Create an Tracers::OpenTelemetry::Span class that will contain the OTel span. |
198 | 0 | Span new_span(config, operation_name, start_time, time_source_, *this, downstream_span); |
199 | 0 | uint64_t trace_id_high = random_.random(); |
200 | 0 | uint64_t trace_id = random_.random(); |
201 | 0 | new_span.setTraceId(absl::StrCat(Hex::uint64ToHex(trace_id_high), Hex::uint64ToHex(trace_id))); |
202 | 0 | uint64_t span_id = random_.random(); |
203 | 0 | new_span.setId(Hex::uint64ToHex(span_id)); |
204 | 0 | if (sampler_) { |
205 | 0 | callSampler(sampler_, absl::nullopt, new_span, operation_name); |
206 | 0 | } else { |
207 | 0 | new_span.setSampled(tracing_decision.traced); |
208 | 0 | } |
209 | 0 | return std::make_unique<Span>(new_span); |
210 | 0 | } |
211 | | |
212 | | Tracing::SpanPtr Tracer::startSpan(const Tracing::Config& config, const std::string& operation_name, |
213 | | SystemTime start_time, const SpanContext& previous_span_context, |
214 | 0 | bool downstream_span) { |
215 | | // Create a new span and populate details from the span context. |
216 | 0 | Span new_span(config, operation_name, start_time, time_source_, *this, downstream_span); |
217 | 0 | new_span.setTraceId(previous_span_context.traceId()); |
218 | 0 | if (!previous_span_context.parentId().empty()) { |
219 | 0 | new_span.setParentId(previous_span_context.parentId()); |
220 | 0 | } |
221 | | // Generate a new identifier for the span id. |
222 | 0 | uint64_t span_id = random_.random(); |
223 | 0 | new_span.setId(Hex::uint64ToHex(span_id)); |
224 | 0 | if (sampler_) { |
225 | | // Sampler should make a sampling decision and set tracestate |
226 | 0 | callSampler(sampler_, previous_span_context, new_span, operation_name); |
227 | 0 | } else { |
228 | | // Respect the previous span's sampled flag. |
229 | 0 | new_span.setSampled(previous_span_context.sampled()); |
230 | 0 | if (!previous_span_context.tracestate().empty()) { |
231 | 0 | new_span.setTracestate(std::string{previous_span_context.tracestate()}); |
232 | 0 | } |
233 | 0 | } |
234 | 0 | return std::make_unique<Span>(new_span); |
235 | 0 | } |
236 | | |
237 | | } // namespace OpenTelemetry |
238 | | } // namespace Tracers |
239 | | } // namespace Extensions |
240 | | } // namespace Envoy |