1
#include "source/extensions/tracers/fluentd/fluentd_tracer_impl.h"
2

            
3
#include <cstdint>
4

            
5
#include "source/common/buffer/buffer_impl.h"
6
#include "source/common/common/backoff_strategy.h"
7
#include "source/common/common/hex.h"
8
#include "source/common/tracing/trace_context_impl.h"
9

            
10
#include "msgpack.hpp"
11

            
12
namespace Envoy {
13
namespace Extensions {
14
namespace Tracers {
15
namespace Fluentd {
16

            
17
// Handle Span and Trace context extraction and validation
18
// Adapted from OpenTelemetry tracer extension @alexanderellis @yanavlasov
19
// See https://www.w3.org/TR/trace-context/#traceparent-header
20
constexpr int kTraceparentHeaderSize = 55; // 2 + 1 + 32 + 1 + 16 + 1 + 2
21
constexpr int kVersionHexSize = 2;
22
constexpr int kTraceIdHexSize = 32;
23
constexpr int kParentIdHexSize = 16;
24
constexpr int kTraceFlagsHexSize = 2;
25

            
26
37
bool isValidHex(const absl::string_view& input) {
27
37
  return std::all_of(input.begin(), input.end(),
28
469
                     [](const char& c) { return absl::ascii_isxdigit(c); });
29
37
}
30

            
31
17
bool isAllZeros(const absl::string_view& input) {
32
416
  return std::all_of(input.begin(), input.end(), [](const char& c) { return c == '0'; });
33
17
}
34

            
35
SpanContextExtractor::SpanContextExtractor(Tracing::TraceContext& trace_context)
36
20
    : trace_context_(trace_context) {}
37

            
38
20
SpanContextExtractor::~SpanContextExtractor() = default;
39

            
40
5
bool SpanContextExtractor::propagationHeaderPresent() {
41
5
  auto propagation_header = FluentdConstants::get().TRACE_PARENT.get(trace_context_);
42
5
  return propagation_header.has_value();
43
5
}
44

            
45
18
absl::StatusOr<SpanContext> SpanContextExtractor::extractSpanContext() {
46
18
  auto propagation_header = FluentdConstants::get().TRACE_PARENT.get(trace_context_);
47
18
  if (!propagation_header.has_value()) {
48
    // We should have already caught this, but just in case.
49
2
    return absl::InvalidArgumentError("No propagation header found");
50
2
  }
51
16
  auto header_value_string = propagation_header.value();
52

            
53
16
  if (header_value_string.size() != kTraceparentHeaderSize) {
54
4
    return absl::InvalidArgumentError("Invalid traceparent header length");
55
4
  }
56
  // Try to split it into its component parts:
57
12
  std::vector<absl::string_view> propagation_header_components =
58
12
      absl::StrSplit(header_value_string, '-', absl::SkipEmpty());
59
12
  if (propagation_header_components.size() != 4) {
60
1
    return absl::InvalidArgumentError("Invalid traceparent hyphenation");
61
1
  }
62
11
  absl::string_view version = propagation_header_components[0];
63
11
  absl::string_view trace_id = propagation_header_components[1];
64
11
  absl::string_view parent_id = propagation_header_components[2];
65
11
  absl::string_view trace_flags = propagation_header_components[3];
66
11
  if (version.size() != kVersionHexSize || trace_id.size() != kTraceIdHexSize ||
67
11
      parent_id.size() != kParentIdHexSize || trace_flags.size() != kTraceFlagsHexSize) {
68
1
    return absl::InvalidArgumentError("Invalid traceparent field sizes");
69
1
  }
70
10
  if (!isValidHex(version) || !isValidHex(trace_id) || !isValidHex(parent_id) ||
71
10
      !isValidHex(trace_flags)) {
72
1
    return absl::InvalidArgumentError("Invalid header hex");
73
1
  }
74
  // As per the traceparent header definition, if the trace-id or parent-id are all zeros, they are
75
  // invalid and must be ignored.
76
9
  if (isAllZeros(trace_id)) {
77
1
    return absl::InvalidArgumentError("Invalid trace id");
78
1
  }
79
8
  if (isAllZeros(parent_id)) {
80
1
    return absl::InvalidArgumentError("Invalid parent id");
81
1
  }
82

            
83
  // Set whether or not the span is sampled from the trace flags.
84
  // See https://w3c.github.io/trace-context/#trace-flags.
85
7
  char decoded_trace_flags = absl::HexStringToBytes(trace_flags).front();
86
7
  bool sampled = (decoded_trace_flags & 1);
87

            
88
  // If a tracestate header is received without an accompanying traceparent header,
89
  // it is invalid and MUST be discarded. Because we're already checking for the
90
  // traceparent header above, we don't need to check here.
91
  // See https://www.w3.org/TR/trace-context/#processing-model-for-working-with-trace-context
92
7
  const auto tracestate_values = FluentdConstants::get().TRACE_STATE.getAll(trace_context_);
93

            
94
7
  SpanContext parent_context(version, trace_id, parent_id, sampled,
95
7
                             absl::StrJoin(tracestate_values, ","));
96
7
  return parent_context;
97
8
}
98

            
99
// Define default version and trace context construction// Define default version and trace context
100
// construction
101
constexpr absl::string_view kDefaultVersion = "00";
102

            
103
2
const Tracing::TraceContextHandler& traceParentHeader() {
104
2
  CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "traceparent");
105
2
}
106

            
107
1
const Tracing::TraceContextHandler& traceStateHeader() {
108
1
  CONSTRUCT_ON_FIRST_USE(Tracing::TraceContextHandler, "tracestate");
109
1
}
110

            
111
// Initialize the Fluentd driver
112
Driver::Driver(const FluentdConfigSharedPtr fluentd_config,
113
               Server::Configuration::TracerFactoryContext& context,
114
               FluentdTracerCacheSharedPtr tracer_cache)
115
8
    : tls_slot_(context.serverFactoryContext().threadLocal().allocateSlot()),
116
8
      fluentd_config_(fluentd_config), tracer_cache_(tracer_cache) {
117
8
  Random::RandomGenerator& random = context.serverFactoryContext().api().randomGenerator();
118

            
119
8
  uint64_t base_interval_ms = DefaultBaseBackoffIntervalMs;
120
8
  uint64_t max_interval_ms = base_interval_ms * DefaultMaxBackoffIntervalFactor;
121

            
122
8
  if (fluentd_config->has_retry_policy() && fluentd_config->retry_policy().has_retry_back_off()) {
123
1
    base_interval_ms = PROTOBUF_GET_MS_OR_DEFAULT(fluentd_config->retry_policy().retry_back_off(),
124
1
                                                  base_interval, DefaultBaseBackoffIntervalMs);
125
1
    max_interval_ms =
126
1
        PROTOBUF_GET_MS_OR_DEFAULT(fluentd_config->retry_policy().retry_back_off(), max_interval,
127
1
                                   base_interval_ms * DefaultMaxBackoffIntervalFactor);
128
1
  }
129

            
130
  // Create a thread local tracer
131
8
  tls_slot_->set([fluentd_config = fluentd_config_, &random, tracer_cache = tracer_cache_,
132
8
                  base_interval_ms, max_interval_ms](Event::Dispatcher&) {
133
8
    BackOffStrategyPtr backoff_strategy = std::make_unique<JitteredExponentialBackOffStrategy>(
134
8
        base_interval_ms, max_interval_ms, random);
135
8
    return std::make_shared<ThreadLocalTracer>(
136
8
        tracer_cache->getOrCreate(fluentd_config, random, std::move(backoff_strategy)));
137
8
  });
138
8
}
139

            
140
// Handles driver logic for starting a new span
141
Tracing::SpanPtr Driver::startSpan(const Tracing::Config& /*config*/,
142
                                   Tracing::TraceContext& trace_context,
143
                                   const StreamInfo::StreamInfo& stream_info,
144
                                   const std::string& operation_name,
145
5
                                   Tracing::Decision tracing_decision) {
146
  // Get the thread local tracer
147
5
  auto& tracer = tls_slot_->getTyped<ThreadLocalTracer>().tracer();
148

            
149
  // Decide which tracer.startSpan function to call based on available span context
150
5
  SpanContextExtractor extractor(trace_context);
151
5
  if (!extractor.propagationHeaderPresent()) {
152
    // No propagation header, so we can create a fresh span with the given decision.
153
2
    return tracer.startSpan(stream_info.startTime(), operation_name, tracing_decision);
154
3
  } else {
155
    // Try to extract the span context. If we can't, just return a null span.
156
3
    absl::StatusOr<SpanContext> span_context = extractor.extractSpanContext();
157
3
    if (span_context.ok()) {
158
2
      return tracer.startSpan(stream_info.startTime(), operation_name, span_context.value());
159
2
    } else {
160
1
      ENVOY_LOG(trace, "Unable to extract span context: ", span_context.status());
161
1
      return std::make_unique<Tracing::NullSpan>();
162
1
    }
163
3
  }
164
5
}
165

            
166
// Initialize the Fluentd tracer
167
FluentdTracerImpl::FluentdTracerImpl(Upstream::ThreadLocalCluster& cluster,
168
                                     Tcp::AsyncTcpClientPtr client, Event::Dispatcher& dispatcher,
169
                                     const FluentdConfig& config,
170
                                     BackOffStrategyPtr backoff_strategy,
171
                                     Stats::Scope& parent_scope, Random::RandomGenerator& random)
172
23
    : FluentdBase(
173
23
          cluster, std::move(client), dispatcher, config.tag(),
174
23
          config.has_retry_policy() && config.retry_policy().has_num_retries()
175
23
              ? absl::optional<uint32_t>(config.retry_policy().num_retries().value())
176
23
              : absl::nullopt,
177
23
          parent_scope, config.stat_prefix(), std::move(backoff_strategy),
178
23
          PROTOBUF_GET_MS_OR_DEFAULT(config, buffer_flush_interval, DefaultBufferFlushIntervalMs),
179
23
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, buffer_size_bytes, DefaultMaxBufferSize)),
180
23
      option_({{"fluent_signal", "2"}, {"TimeFormat", "DateTime"}}), random_(random),
181
23
      time_source_(dispatcher.timeSource()) {}
182

            
183
// Initialize a span object
184
Span::Span(SystemTime start_time, const std::string& operation_name, FluentdTracerSharedPtr tracer,
185
           SpanContext&& span_context, TimeSource& time_source, bool use_local_decision)
186
5
    : start_time_(start_time), operation_(operation_name), tracer_(std::move(tracer)),
187
5
      span_context_(std::move(span_context)), time_source_(time_source),
188
5
      use_local_decision_(use_local_decision) {}
189

            
190
// Set the operation name for the span
191
1
void Span::setOperation(absl::string_view operation) { operation_ = std::string(operation); }
192

            
193
// Adds a tag to the span
194
1
void Span::setTag(absl::string_view name, absl::string_view value) {
195
1
  tags_[std::string(name)] = std::string(value);
196
1
}
197

            
198
// Log an event as a Fluentd entry
199
1
void Span::log(SystemTime /*timestamp*/, const std::string& event) {
200
1
  uint64_t time =
201
1
      std::chrono::duration_cast<std::chrono::seconds>(time_source_.systemTime().time_since_epoch())
202
1
          .count();
203

            
204
1
  EntryPtr entry =
205
1
      std::make_unique<Entry>(time, std::map<std::string, std::string>{{"event", event}});
206

            
207
1
  tracer_->log(std::move(entry));
208
1
}
209

            
210
// Finish and log a span as a Fluentd entry
211
1
void Span::finishSpan() {
212
1
  uint64_t time =
213
1
      std::chrono::duration_cast<std::chrono::seconds>(time_source_.systemTime().time_since_epoch())
214
1
          .count();
215

            
216
  // Make the record map
217
1
  std::map<std::string, std::string> record_map = std::move(tags_);
218
1
  record_map["operation"] = operation_;
219
1
  record_map["trace_id"] = span_context_.traceId();
220
1
  record_map["span_id"] = span_context_.spanId();
221
1
  record_map["start_time"] = std::to_string(
222
1
      std::chrono::duration_cast<std::chrono::seconds>(start_time_.time_since_epoch()).count());
223
1
  record_map["end_time"] = std::to_string(time);
224

            
225
1
  EntryPtr entry = std::make_unique<Entry>(time, std::move(record_map));
226

            
227
1
  tracer_->log(std::move(entry));
228
1
}
229

            
230
// Inject the span context into the trace context
231
void Span::injectContext(Tracing::TraceContext& trace_context,
232
2
                         const Tracing::UpstreamContext& /*upstream*/) {
233

            
234
2
  std::string trace_id_hex = span_context_.traceId();
235
2
  std::string span_id_hex = span_context_.spanId();
236
2
  std::vector<uint8_t> trace_flags_vec{sampled()};
237
2
  std::string trace_flags_hex = Hex::encode(trace_flags_vec);
238
2
  std::string traceparent_header_value =
239
2
      absl::StrCat(kDefaultVersion, "-", trace_id_hex, "-", span_id_hex, "-", trace_flags_hex);
240

            
241
  // Set the traceparent in the trace_context.
242
2
  traceParentHeader().setRefKey(trace_context, traceparent_header_value);
243
2
  if (!span_context_.tracestate().empty()) {
244
    // Also set the tracestate.
245
1
    traceStateHeader().setRefKey(trace_context, span_context_.tracestate());
246
1
  }
247
2
}
248

            
249
// Spawns a child span
250
Tracing::SpanPtr Span::spawnChild(const Tracing::Config&, const std::string& name,
251
1
                                  SystemTime start_time) {
252
1
  return tracer_->startSpan(start_time, name, span_context_);
253
1
}
254

            
255
1
std::string Span::getBaggage(absl::string_view /*key*/) {
256
  // not implemented
257
1
  return EMPTY_STRING;
258
1
}
259

            
260
1
void Span::setBaggage(absl::string_view /*key*/, absl::string_view /*value*/) {
261
  // not implemented
262
1
}
263

            
264
3
std::string Span::getTraceId() const { return span_context_.traceId(); }
265

            
266
1
std::string Span::getSpanId() const { return span_context_.spanId(); }
267

            
268
// Start a new span with no parent context
269
Tracing::SpanPtr FluentdTracerImpl::startSpan(SystemTime start_time,
270
                                              const std::string& operation_name,
271
2
                                              Tracing::Decision tracing_decision) {
272
  // make a new span context
273
2
  uint64_t trace_id_high = random_.random();
274
2
  uint64_t trace_id = random_.random();
275
2
  uint64_t span_id = random_.random();
276

            
277
2
  SpanContext span_context(
278
2
      kDefaultVersion, absl::StrCat(Hex::uint64ToHex(trace_id_high), Hex::uint64ToHex(trace_id)),
279
2
      Hex::uint64ToHex(span_id), tracing_decision.traced, "");
280

            
281
2
  return std::make_unique<Span>(start_time, operation_name, shared_from_this(),
282
2
                                std::move(span_context), time_source_, true);
283
2
}
284

            
285
// Start a new span with a parent context
286
Tracing::SpanPtr FluentdTracerImpl::startSpan(SystemTime start_time,
287
                                              const std::string& operation_name,
288
3
                                              const SpanContext& parent_context) {
289
  // Generate a new span context with new span id based on the parent context.
290
3
  SpanContext span_context(kDefaultVersion, parent_context.traceId(),
291
3
                           Hex::uint64ToHex(random_.random()), parent_context.sampled(),
292
3
                           parent_context.tracestate());
293
3
  return std::make_unique<Span>(start_time, operation_name, shared_from_this(),
294
3
                                std::move(span_context), time_source_, false);
295
3
}
296

            
297
4
void FluentdTracerImpl::packMessage(MessagePackPacker& packer) {
298
4
  packer.pack_array(3); // 1 - tag field, 2 - entries array, 3 - options map.
299
4
  packer.pack(tag_);
300
4
  packer.pack_array(entries_.size());
301

            
302
5
  for (auto& entry : entries_) {
303
5
    packer.pack_array(2); // 1 - time, 2 - record.
304
5
    packer.pack(entry->time_);
305
5
    packer.pack_map(entry->map_record_.size());
306
5
    for (const auto& pair : entry->map_record_) {
307
5
      packer.pack(pair.first);
308
5
      packer.pack(pair.second);
309
5
    }
310
5
  }
311

            
312
4
  packer.pack(option_);
313
4
}
314

            
315
} // namespace Fluentd
316
} // namespace Tracers
317
} // namespace Extensions
318
} // namespace Envoy