1
#pragma once
2

            
3
#include <chrono>
4

            
5
#include "envoy/extensions/tracers/fluentd/v3/fluentd.pb.h"
6
#include "envoy/extensions/tracers/fluentd/v3/fluentd.pb.validate.h"
7
#include "envoy/thread_local/thread_local.h"
8
#include "envoy/tracing/trace_driver.h"
9

            
10
#include "source/common/common/logger.h"
11
#include "source/common/common/statusor.h"
12
#include "source/common/http/header_map_impl.h"
13
#include "source/common/tracing/trace_context_impl.h"
14
#include "source/extensions/common/fluentd/fluentd_base.h"
15
#include "source/extensions/tracers/common/factory_base.h"
16

            
17
#include "absl/strings/string_view.h"
18

            
19
namespace Envoy {
20
namespace Extensions {
21
namespace Tracers {
22
namespace Fluentd {
23

            
24
using namespace Envoy::Extensions::Common::Fluentd;
25
using FluentdConfig = envoy::extensions::tracers::fluentd::v3::FluentdConfig;
26
using FluentdConfigSharedPtr = std::shared_ptr<FluentdConfig>;
27

            
28
// Span context definitions
29
class SpanContext {
30
public:
31
  SpanContext() = default;
32
  SpanContext(absl::string_view version, absl::string_view trace_id, absl::string_view span_id,
33
              bool sampled, std::string tracestate)
34
12
      : version_(version), trace_id_(trace_id), span_id_(span_id), sampled_(sampled),
35
12
        tracestate_(std::move(tracestate)) {}
36

            
37
2
  const std::string& version() const { return version_; }
38

            
39
11
  const std::string& traceId() const { return trace_id_; }
40

            
41
6
  const std::string& spanId() const { return span_id_; }
42

            
43
7
  bool sampled() const { return sampled_; }
44
  void setSampled(bool sampled) { sampled_ = sampled; }
45

            
46
9
  const std::string& tracestate() const { return tracestate_; }
47

            
48
private:
49
  std::string version_;
50
  std::string trace_id_;
51
  std::string span_id_;
52
  bool sampled_{false};
53
  std::string tracestate_;
54
};
55

            
56
// Trace context definitions
57
class FluentdConstantValues {
58
public:
59
  const Tracing::TraceContextHandler TRACE_PARENT{"traceparent"};
60
  const Tracing::TraceContextHandler TRACE_STATE{"tracestate"};
61
};
62

            
63
using FluentdConstants = ConstSingleton<FluentdConstantValues>;
64

            
65
// SpanContextExtractor extracts the span context from the trace context
66
class SpanContextExtractor {
67
public:
68
  SpanContextExtractor(Tracing::TraceContext& trace_context);
69
  ~SpanContextExtractor();
70
  absl::StatusOr<SpanContext> extractSpanContext();
71
  bool propagationHeaderPresent();
72

            
73
private:
74
  const Tracing::TraceContext& trace_context_;
75
};
76

            
77
// FluentdTracerImpl implements a FluentdTracer, handling tracing and buffer/connection logic
78
class FluentdTracerImpl : public FluentdBase,
79
                          public std::enable_shared_from_this<FluentdTracerImpl> {
80
public:
81
  FluentdTracerImpl(Upstream::ThreadLocalCluster& cluster, Tcp::AsyncTcpClientPtr client,
82
                    Event::Dispatcher& dispatcher, const FluentdConfig& config,
83
                    BackOffStrategyPtr backoff_strategy, Stats::Scope& parent_scope,
84
                    Random::RandomGenerator& random);
85

            
86
  Tracing::SpanPtr startSpan(SystemTime start_time, const std::string& operation_name,
87
                             Tracing::Decision tracing_decision);
88

            
89
  Tracing::SpanPtr startSpan(SystemTime start_time, const std::string& operation_name,
90
                             const SpanContext& parent_context);
91

            
92
  void packMessage(MessagePackPacker& packer) override;
93

            
94
private:
95
  std::map<std::string, std::string> option_;
96
  Random::RandomGenerator& random_;
97
  TimeSource& time_source_;
98
};
99

            
100
using FluentdTracerWeakPtr = std::weak_ptr<FluentdTracerImpl>;
101
using FluentdTracerSharedPtr = std::shared_ptr<FluentdTracerImpl>;
102

            
103
// FluentdTracerCache is used to cache entries before they are sent to the Fluentd server
104
class FluentdTracerCacheImpl
105
    : public FluentdCacheBase<FluentdTracerImpl, FluentdConfig, FluentdTracerSharedPtr,
106
                              FluentdTracerWeakPtr>,
107
      public Singleton::Instance {
108
public:
109
  FluentdTracerCacheImpl(Upstream::ClusterManager& cluster_manager, Stats::Scope& parent_scope,
110
                         ThreadLocal::SlotAllocator& tls)
111
12
      : FluentdCacheBase(cluster_manager, parent_scope, tls, "tracing.fluentd") {}
112

            
113
protected:
114
  FluentdTracerSharedPtr createInstance(Upstream::ThreadLocalCluster& cluster,
115
                                        Tcp::AsyncTcpClientPtr client,
116
                                        Event::Dispatcher& dispatcher, const FluentdConfig& config,
117
                                        BackOffStrategyPtr backoff_strategy,
118
10
                                        Random::RandomGenerator& random) override {
119
10
    return std::make_shared<FluentdTracerImpl>(cluster, std::move(client), dispatcher, config,
120
10
                                               std::move(backoff_strategy), *stats_scope_, random);
121
10
  }
122
};
123

            
124
using FluentdTracerCacheSharedPtr = std::shared_ptr<FluentdTracerCacheImpl>;
125

            
126
using TracerPtr = std::unique_ptr<FluentdTracerImpl>;
127

            
128
// Driver manages and creates Fluentd tracers
129
class Driver : Logger::Loggable<Logger::Id::tracing>, public Tracing::Driver {
130
public:
131
  Driver(const FluentdConfigSharedPtr fluentd_config,
132
         Server::Configuration::TracerFactoryContext& context,
133
         FluentdTracerCacheSharedPtr tracer_cache);
134

            
135
  // Tracing::Driver
136
  Tracing::SpanPtr startSpan(const Tracing::Config& config, Tracing::TraceContext& trace_context,
137
                             const StreamInfo::StreamInfo& stream_info,
138
                             const std::string& operation_name,
139
                             Tracing::Decision tracing_decision) override;
140

            
141
private:
142
  class ThreadLocalTracer : public ThreadLocal::ThreadLocalObject {
143
  public:
144
8
    ThreadLocalTracer(FluentdTracerSharedPtr tracer) : tracer_(std::move(tracer)) {}
145

            
146
5
    FluentdTracerImpl& tracer() { return *tracer_; }
147

            
148
    FluentdTracerSharedPtr tracer_;
149
  };
150

            
151
private:
152
  ThreadLocal::SlotPtr tls_slot_;
153
  const FluentdConfigSharedPtr fluentd_config_;
154
  FluentdTracerCacheSharedPtr tracer_cache_;
155
};
156

            
157
// Span holds the span context and handles span operations
158
class Span : public Tracing::Span {
159
public:
160
  Span(SystemTime start_time, const std::string& operation_name, FluentdTracerSharedPtr tracer,
161
       SpanContext&& span_context, TimeSource& time_source, bool use_local_decision);
162

            
163
  // Tracing::Span
164
  void setOperation(absl::string_view operation) override;
165
  void setTag(absl::string_view name, absl::string_view value) override;
166
  void log(SystemTime timestamp, const std::string& event) override;
167
  void finishSpan() override;
168
  void injectContext(Tracing::TraceContext& trace_context,
169
                     const Tracing::UpstreamContext& upstream) override;
170
  Tracing::SpanPtr spawnChild(const Tracing::Config& config, const std::string& name,
171
                              SystemTime start_time) override;
172
  void setSampled(bool sampled) override { span_context_.setSampled(sampled); }
173
2
  bool sampled() const { return span_context_.sampled(); }
174
3
  bool useLocalDecision() const override { return use_local_decision_; }
175

            
176
  std::string getBaggage(absl::string_view key) override;
177
  void setBaggage(absl::string_view key, absl::string_view value) override;
178
  std::string getTraceId() const override;
179
  std::string getSpanId() const override;
180

            
181
private:
182
  // config
183
  SystemTime start_time_;
184
  std::string operation_;
185

            
186
  FluentdTracerSharedPtr tracer_;
187
  SpanContext span_context_;
188
  std::map<std::string, std::string> tags_;
189
  Envoy::TimeSource& time_source_;
190
  const bool use_local_decision_{false};
191
};
192

            
193
} // namespace Fluentd
194
} // namespace Tracers
195
} // namespace Extensions
196
} // namespace Envoy