1
#include "source/extensions/tracers/opentelemetry/tracer.h"
2

            
3
#include <cstdint>
4
#include <string>
5

            
6
#include "envoy/config/trace/v3/opentelemetry.pb.h"
7

            
8
#include "source/common/common/empty_string.h"
9
#include "source/common/common/hex.h"
10
#include "source/common/tracing/common_values.h"
11
#include "source/common/tracing/trace_context_impl.h"
12
#include "source/common/version/version.h"
13
#include "source/extensions/tracers/opentelemetry/otlp_utils.h"
14

            
15
#include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"
16
#include "opentelemetry/proto/trace/v1/trace.pb.h"
17

            
18
namespace Envoy {
19
namespace Extensions {
20
namespace Tracers {
21
namespace OpenTelemetry {
22

            
23
constexpr absl::string_view kDefaultVersion = "00";
24

            
25
using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
26

            
27
namespace {
28

            
29
35
const Tracing::TraceContextHandler& traceParentHeader() {
30
35
  CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "traceparent");
31
35
}
32

            
33
35
const Tracing::TraceContextHandler& traceStateHeader() {
34
35
  CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "tracestate");
35
35
}
36

            
37
void callSampler(SamplerSharedPtr sampler, const StreamInfo::StreamInfo& stream_info,
38
                 const absl::optional<SpanContext> span_context, Span& new_span,
39
                 const std::string& operation_name,
40
19
                 OptRef<const Tracing::TraceContext> trace_context) {
41
19
  if (!sampler) {
42
    return;
43
  }
44
19
  const auto sampling_result =
45
19
      sampler->shouldSample(stream_info, span_context, new_span.getTraceId(), operation_name,
46
19
                            new_span.spankind(), trace_context, {});
47
19
  new_span.setSampled(sampling_result.isSampled());
48

            
49
19
  if (sampling_result.attributes) {
50
17
    for (auto const& attribute : *sampling_result.attributes) {
51
17
      new_span.setAttribute(attribute.first, attribute.second);
52
17
    }
53
5
  }
54
19
  if (!sampling_result.tracestate.empty()) {
55
10
    new_span.setTracestate(sampling_result.tracestate);
56
10
  }
57
19
}
58

            
59
} // namespace
60

            
61
Span::Span(const std::string& name, const StreamInfo::StreamInfo& stream_info,
62
           SystemTime start_time, Envoy::TimeSource& time_source, Tracer& parent_tracer,
63
           OTelSpanKind span_kind, bool use_local_decision)
64
79
    : stream_info_(stream_info), parent_tracer_(parent_tracer), time_source_(time_source),
65
79
      use_local_decision_(use_local_decision) {
66
79
  span_ = ::opentelemetry::proto::trace::v1::Span();
67

            
68
79
  span_.set_kind(span_kind);
69

            
70
79
  span_.set_name(name);
71
79
  span_.set_start_time_unix_nano(std::chrono::nanoseconds(start_time.time_since_epoch()).count());
72
79
}
73

            
74
Tracing::SpanPtr Span::spawnChild(const Tracing::Config&, const std::string& name,
75
18
                                  SystemTime start_time) {
76
  // Build span_context from the current span, then generate the child span from that context.
77
18
  SpanContext span_context(kDefaultVersion, getTraceId(), spanId(), sampled(),
78
18
                           std::string(tracestate()));
79
18
  return parent_tracer_.startSpan(name, stream_info_, start_time, span_context, {},
80
18
                                  ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_CLIENT);
81
18
}
82

            
83
66
void Span::finishSpan() {
84
  // Call into the parent tracer so we can access the shared exporter.
85
66
  span_.set_end_time_unix_nano(
86
66
      std::chrono::nanoseconds(time_source_.systemTime().time_since_epoch()).count());
87
66
  if (sampled()) {
88
59
    parent_tracer_.sendSpan(span_);
89
59
  }
90
66
}
91

            
92
2
void Span::setOperation(absl::string_view operation) { span_.set_name(operation); };
93

            
94
35
void Span::injectContext(Tracing::TraceContext& trace_context, const Tracing::UpstreamContext&) {
95
35
  std::string trace_id_hex = absl::BytesToHexString(span_.trace_id());
96
35
  std::string span_id_hex = absl::BytesToHexString(span_.span_id());
97
35
  std::vector<uint8_t> trace_flags_vec{sampled()};
98
35
  std::string trace_flags_hex = Hex::encode(trace_flags_vec);
99
35
  std::string traceparent_header_value =
100
35
      absl::StrCat(kDefaultVersion, "-", trace_id_hex, "-", span_id_hex, "-", trace_flags_hex);
101
  // Set the traceparent in the trace_context.
102
35
  traceParentHeader().setRefKey(trace_context, traceparent_header_value);
103
  // Also set the tracestate.
104
35
  traceStateHeader().setRefKey(trace_context, span_.trace_state());
105
35
}
106

            
107
675
void Span::setAttribute(absl::string_view name, const OTelAttribute& attribute_value) {
108
  // The attribute key MUST be a non-null and non-empty string.
109
675
  if (name.empty()) {
110
3
    return;
111
3
  }
112
  // Attribute keys MUST be unique.
113
  // If a value already exists for this key, overwrite it.
114
4537
  for (auto& key_value : *span_.mutable_attributes()) {
115
4537
    if (key_value.key() == name) {
116
3
      OtlpUtils::populateAnyValue(*key_value.mutable_value(), attribute_value);
117
3
      return;
118
3
    }
119
4537
  }
120
  // If we haven't found an existing match already, we can add a new key/value.
121
669
  opentelemetry::proto::common::v1::KeyValue key_value =
122
669
      opentelemetry::proto::common::v1::KeyValue();
123
669
  opentelemetry::proto::common::v1::AnyValue value_proto =
124
669
      opentelemetry::proto::common::v1::AnyValue();
125
669
  OtlpUtils::populateAnyValue(value_proto, attribute_value);
126
669
  key_value.set_key(std::string{name});
127
669
  *key_value.mutable_value() = value_proto;
128
669
  *span_.add_attributes() = key_value;
129
669
}
130

            
131
::opentelemetry::proto::trace::v1::Status_StatusCode
132
convertGrpcStatusToTraceStatusCode(::opentelemetry::proto::trace::v1::Span_SpanKind kind,
133
1
                                   absl::string_view value) {
134
1
  uint64_t grpc_status_code;
135
1
  if (!absl::SimpleAtoi(value, &grpc_status_code)) {
136
    // If the value is not a number, we can't map it to a status code.
137
    // In this case, we should leave the status code unset.
138
    return ::opentelemetry::proto::trace::v1::Status::STATUS_CODE_UNSET;
139
  }
140

            
141
1
  Grpc::Status::GrpcStatus grpc_status = static_cast<Grpc::Status::GrpcStatus>(grpc_status_code);
142
  // Check mapping https://opentelemetry.io/docs/specs/semconv/rpc/grpc/#grpc-status
143
1
  if (kind == ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_CLIENT) {
144
    if (grpc_status == Grpc::Status::WellKnownGrpcStatus::Ok) {
145
      return ::opentelemetry::proto::trace::v1::Status::STATUS_CODE_UNSET;
146
    }
147
    return ::opentelemetry::proto::trace::v1::Status::STATUS_CODE_ERROR;
148
  }
149

            
150
  // SPAN_KIND_SERVER
151
1
  switch (grpc_status) {
152
  case Grpc::Status::WellKnownGrpcStatus::Unknown:
153
  case Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded:
154
  case Grpc::Status::WellKnownGrpcStatus::Unimplemented:
155
1
  case Grpc::Status::WellKnownGrpcStatus::Internal:
156
1
  case Grpc::Status::WellKnownGrpcStatus::Unavailable:
157
1
  case Grpc::Status::WellKnownGrpcStatus::DataLoss:
158
1
    return ::opentelemetry::proto::trace::v1::Status::STATUS_CODE_ERROR;
159
  default:
160
    return ::opentelemetry::proto::trace::v1::Status::STATUS_CODE_UNSET;
161
1
  }
162
1
}
163

            
164
658
void Span::setTag(absl::string_view name, absl::string_view value) {
165
658
  if (name == Tracing::Tags::get().GrpcStatusCode) {
166
1
    span_.mutable_status()->set_code(convertGrpcStatusToTraceStatusCode(span_.kind(), value));
167
657
  } else if (name == Tracing::Tags::get().HttpStatusCode) {
168
50
    uint64_t status_code;
169
    // For HTTP status codes in the 5xx range, as well as any other code the client failed to
170
    // interpret, span status MUST be set to Error.
171
    //
172
    // For HTTP status codes in the 4xx range span status MUST be left unset in case of
173
    // SpanKind.SERVER and MUST be set to Error in case of SpanKind.CLIENT.
174
50
    if (absl::SimpleAtoi(value, &status_code)) {
175
50
      if (status_code >= 500 ||
176
50
          (status_code >= 400 &&
177
49
           span_.kind() == ::opentelemetry::proto::trace::v1::Span::SPAN_KIND_CLIENT)) {
178
2
        span_.mutable_status()->set_code(
179
2
            ::opentelemetry::proto::trace::v1::Status::STATUS_CODE_ERROR);
180
2
      }
181
50
    }
182
50
  }
183
658
  setAttribute(name, value);
184
658
}
185

            
186
4
void Span::log(SystemTime timestamp, const std::string& event) {
187
4
  if (event.empty()) {
188
1
    return;
189
1
  }
190

            
191
3
  ::opentelemetry::proto::trace::v1::Span::Event span_event =
192
3
      ::opentelemetry::proto::trace::v1::Span::Event();
193
3
  span_event.set_time_unix_nano(std::chrono::nanoseconds(timestamp.time_since_epoch()).count());
194
3
  span_event.set_name(std::string{event});
195

            
196
3
  *span_.add_events() = span_event;
197
3
}
198

            
199
Tracer::Tracer(OpenTelemetryTraceExporterPtr exporter, Envoy::TimeSource& time_source,
200
               Random::RandomGenerator& random, Runtime::Loader& runtime,
201
               Event::Dispatcher& dispatcher, OpenTelemetryTracerStats tracing_stats,
202
               const ResourceConstSharedPtr resource, SamplerSharedPtr sampler,
203
               uint64_t max_cache_size)
204
80
    : exporter_(std::move(exporter)), time_source_(time_source), random_(random), runtime_(runtime),
205
80
      tracing_stats_(tracing_stats), resource_(resource), sampler_(sampler),
206
80
      max_cache_size_(max_cache_size) {
207
80
  flush_timer_ = dispatcher.createTimer([this]() -> void {
208
3
    tracing_stats_.timer_flushed_.inc();
209
3
    flushSpans();
210
3
    enableTimer();
211
3
  });
212
80
  enableTimer();
213
80
}
214

            
215
83
void Tracer::enableTimer() {
216
83
  const uint64_t flush_interval =
217
83
      runtime_.snapshot().getInteger("tracing.opentelemetry.flush_interval_ms", 5000U);
218
83
  flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
219
83
}
220

            
221
29
void Tracer::flushSpans() {
222
29
  if (span_buffer_.empty()) {
223
1
    return;
224
1
  }
225

            
226
28
  ExportTraceServiceRequest request;
227
  // A request consists of ResourceSpans.
228
28
  ::opentelemetry::proto::trace::v1::ResourceSpans* resource_span = request.add_resource_spans();
229
28
  resource_span->set_schema_url(resource_->schema_url_);
230

            
231
  // add resource attributes
232
69
  for (auto const& att : resource_->attributes_) {
233
69
    opentelemetry::proto::common::v1::KeyValue key_value =
234
69
        opentelemetry::proto::common::v1::KeyValue();
235
69
    opentelemetry::proto::common::v1::AnyValue value_proto =
236
69
        opentelemetry::proto::common::v1::AnyValue();
237
69
    value_proto.set_string_value(std::string{att.second});
238
69
    key_value.set_key(std::string{att.first});
239
69
    *key_value.mutable_value() = value_proto;
240
69
    (*resource_span->mutable_resource()->add_attributes()) = key_value;
241
69
  }
242

            
243
28
  ::opentelemetry::proto::trace::v1::ScopeSpans* scope_span = resource_span->add_scope_spans();
244

            
245
  // set the instrumentation scope name and version
246
28
  *scope_span->mutable_scope()->mutable_name() = "envoy";
247
28
  *scope_span->mutable_scope()->mutable_version() = Envoy::VersionInfo::version();
248

            
249
46
  for (const auto& pending_span : span_buffer_) {
250
46
    (*scope_span->add_spans()) = pending_span;
251
46
  }
252
28
  if (exporter_) {
253
27
    tracing_stats_.spans_sent_.add(span_buffer_.size());
254
27
    if (!exporter_->log(request)) {
255
      // TODO: should there be any sort of retry or reporting here?
256
1
      ENVOY_LOG(trace, "Unsuccessful log request to OpenTelemetry trace collector.");
257
1
    }
258
27
  } else {
259
1
    ENVOY_LOG(info, "Skipping log request to OpenTelemetry: no exporter configured");
260
1
  }
261
28
  span_buffer_.clear();
262
28
}
263

            
264
59
void Tracer::sendSpan(::opentelemetry::proto::trace::v1::Span& span) {
265
59
  if (span_buffer_.size() >= max_cache_size_) {
266
1
    ENVOY_LOG_EVERY_POW_2(
267
1
        warn,
268
1
        "Span buffer size exceeded maximum limit. Discarding span. Current size: {}, Max size: {}",
269
1
        span_buffer_.size(), max_cache_size_);
270
1
    tracing_stats_.spans_dropped_.inc();
271
1
    flushSpans();
272
1
    return;
273
1
  }
274
58
  span_buffer_.push_back(span);
275
58
  const uint64_t min_flush_spans =
276
58
      runtime_.snapshot().getInteger("tracing.opentelemetry.min_flush_spans", 5U);
277
58
  if (span_buffer_.size() >= min_flush_spans) {
278
25
    flushSpans();
279
25
  }
280
58
}
281

            
282
Tracing::SpanPtr Tracer::startSpan(const std::string& operation_name,
283
                                   const StreamInfo::StreamInfo& stream_info, SystemTime start_time,
284
                                   Tracing::Decision tracing_decision,
285
                                   OptRef<const Tracing::TraceContext> trace_context,
286
48
                                   OTelSpanKind span_kind) {
287
  // If reached here, then this is main span for request and there is no previous span context.
288
  // If the custom sampler is set, then the Envoy tracing decision is ignored and the custom sampler
289
  // should make a sampling decision, otherwise the local Envoy tracing decision is used.
290
48
  const bool use_local_decision = sampler_ == nullptr;
291

            
292
  // Create an Tracers::OpenTelemetry::Span class that will contain the OTel span.
293
48
  auto new_span = std::make_unique<Span>(operation_name, stream_info, start_time, time_source_,
294
48
                                         *this, span_kind, use_local_decision);
295
48
  uint64_t trace_id_high = random_.random();
296
48
  uint64_t trace_id = random_.random();
297
48
  new_span->setTraceId(absl::StrCat(Hex::uint64ToHex(trace_id_high), Hex::uint64ToHex(trace_id)));
298
48
  uint64_t span_id = random_.random();
299
48
  new_span->setId(Hex::uint64ToHex(span_id));
300
48
  if (sampler_) {
301
8
    callSampler(sampler_, stream_info, absl::nullopt, *new_span, operation_name, trace_context);
302
47
  } else {
303
40
    new_span->setSampled(tracing_decision.traced);
304
40
  }
305
48
  return new_span;
306
48
}
307

            
308
Tracing::SpanPtr Tracer::startSpan(const std::string& operation_name,
309
                                   const StreamInfo::StreamInfo& stream_info, SystemTime start_time,
310
                                   const SpanContext& parent_context,
311
                                   OptRef<const Tracing::TraceContext> trace_context,
312
31
                                   OTelSpanKind span_kind) {
313
  // If reached here, then this is main span for request with a parent context or this is
314
  // subsequent spans. Ignore the Envoy tracing decision anyway.
315

            
316
  // Create a new span and populate details from the span context.
317
31
  auto new_span = std::make_unique<Span>(operation_name, stream_info, start_time, time_source_,
318
31
                                         *this, span_kind, false);
319
31
  new_span->setTraceId(parent_context.traceId());
320
31
  if (!parent_context.spanId().empty()) {
321
31
    new_span->setParentId(parent_context.spanId());
322
31
  }
323
  // Generate a new identifier for the span id.
324
31
  uint64_t span_id = random_.random();
325
31
  new_span->setId(Hex::uint64ToHex(span_id));
326
31
  if (sampler_) {
327
    // Sampler should make a sampling decision and set tracestate
328
11
    callSampler(sampler_, stream_info, parent_context, *new_span, operation_name, trace_context);
329
31
  } else {
330
    // Respect the previous span's sampled flag.
331
20
    new_span->setSampled(parent_context.sampled());
332
20
    if (!parent_context.tracestate().empty()) {
333
1
      new_span->setTracestate(parent_context.tracestate());
334
1
    }
335
20
  }
336
31
  return new_span;
337
31
}
338

            
339
} // namespace OpenTelemetry
340
} // namespace Tracers
341
} // namespace Extensions
342
} // namespace Envoy