1
#include "source/extensions/tracers/zipkin/zipkin_tracer_impl.h"
2

            
3
#include <memory>
4

            
5
#include "envoy/config/trace/v3/zipkin.pb.h"
6

            
7
#include "source/common/common/enum_to_int.h"
8
#include "source/common/config/utility.h"
9
#include "source/common/http/headers.h"
10
#include "source/common/http/message_impl.h"
11
#include "source/common/http/utility.h"
12
#include "source/extensions/tracers/zipkin/span_context_extractor.h"
13
#include "source/extensions/tracers/zipkin/zipkin_core_constants.h"
14

            
15
#include "absl/strings/string_view.h"
16

            
17
namespace Envoy {
18
namespace Extensions {
19
namespace Tracers {
20
namespace Zipkin {
21

            
22
namespace {
23
// Helper function to parse URI and extract hostname and path
24
6
std::pair<std::string, std::string> parseUri(absl::string_view uri) {
25
  // Find the scheme separator
26
6
  size_t scheme_pos = uri.find("://");
27
6
  if (scheme_pos == std::string::npos) {
28
    // No scheme, treat as path only
29
1
    return {"", std::string(uri)};
30
1
  }
31

            
32
  // Skip past the scheme
33
5
  size_t host_start = scheme_pos + 3;
34

            
35
  // Find the path separator
36
5
  size_t path_pos = uri.find('/', host_start);
37
5
  if (path_pos == std::string::npos) {
38
    // No path, hostname only
39
1
    return {std::string(uri.substr(host_start)), "/"};
40
1
  }
41

            
42
4
  std::string hostname = std::string(uri.substr(host_start, path_pos - host_start));
43
4
  std::string path = std::string(uri.substr(path_pos));
44

            
45
4
  return {hostname, path};
46
5
}
47
} // namespace
48

            
49
Driver::Driver(const envoy::config::trace::v3::ZipkinConfig& zipkin_config,
50
               Server::Configuration::ServerFactoryContext& context)
51
46
    : collector_(std::make_shared<CollectorInfo>()), tls_(context.threadLocal().allocateSlot()),
52
46
      trace_context_option_(zipkin_config.trace_context_option()) {
53

            
54
  // Check if HttpService is configured (preferred over legacy fields)
55
46
  if (zipkin_config.has_collector_service()) {
56
6
    const auto& http_service = zipkin_config.collector_service();
57
    // Extract cluster and endpoint from HttpService
58
6
    const auto& http_uri = http_service.http_uri();
59

            
60
6
    collector_->cluster_ = http_uri.cluster();
61

            
62
    // Parse the URI to extract hostname and path
63
6
    if (auto [hostname, path] = parseUri(http_uri.uri()); !hostname.empty()) {
64
      // Use the hostname from the URI
65
5
      collector_->hostname_ = hostname;
66
5
      collector_->endpoint_ = path;
67
5
    } else {
68
      // Fallback to cluster name if no hostname in URI
69
1
      collector_->hostname_ = collector_->cluster_;
70
1
      collector_->endpoint_ = path;
71
1
    }
72

            
73
    // Parse headers from HttpService
74
7
    for (const auto& header_option : http_service.request_headers_to_add()) {
75
6
      const auto& header_value = header_option.header();
76
6
      collector_->request_headers_.emplace_back(header_value.key(), header_value.value());
77
6
    }
78
40
  } else {
79
40
    if (zipkin_config.collector_cluster().empty() || zipkin_config.collector_endpoint().empty()) {
80
2
      throw EnvoyException(
81
2
          "collector_cluster and collector_endpoint must be specified when not using "
82
2
          "collector_service");
83
2
    }
84

            
85
38
    collector_->cluster_ = zipkin_config.collector_cluster();
86
38
    collector_->hostname_ = !zipkin_config.collector_hostname().empty()
87
38
                                ? zipkin_config.collector_hostname()
88
38
                                : zipkin_config.collector_cluster();
89
38
    collector_->endpoint_ = zipkin_config.collector_endpoint();
90

            
91
    // Legacy configuration has no custom headers support
92
    // Custom headers are only available through HttpService.
93
38
  }
94

            
95
  // Validate cluster exists
96
44
  THROW_IF_NOT_OK_REF(Config::Utility::checkCluster("envoy.tracers.zipkin", collector_->cluster_,
97
44
                                                    context.clusterManager(),
98
44
                                                    /* allow_added_via_api */ true)
99
44
                          .status());
100

            
101
  // The current default version of collector_endpoint_version is HTTP_JSON.
102
43
  collector_->version_ = zipkin_config.collector_endpoint_version();
103
43
  const bool trace_id_128bit = zipkin_config.trace_id_128bit();
104

            
105
43
  const bool shared_span_context = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
106
43
      zipkin_config, shared_span_context, DEFAULT_SHARED_SPAN_CONTEXT);
107
43
  collector_->shared_span_context_ = shared_span_context;
108

            
109
43
  const bool split_spans_for_request = zipkin_config.split_spans_for_request();
110
43
  const bool timestamp_trace_ids = zipkin_config.timestamp_trace_ids();
111

            
112
43
  auto stats = std::make_shared<ZipkinTracerStats>(ZipkinTracerStats{
113
43
      ZIPKIN_TRACER_STATS(POOL_COUNTER_PREFIX(context.scope(), "tracing.zipkin."))});
114

            
115
43
  tls_->set([&context, c = collector_, t = trace_context_option_, stats, trace_id_128bit,
116
43
             split_spans_for_request, timestamp_trace_ids](
117
43
                Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
118
43
    TracerPtr tracer = std::make_unique<Tracer>(
119
43
        context.localInfo().clusterName(), context.localInfo().address(),
120
43
        context.api().randomGenerator(), trace_id_128bit, c->shared_span_context_,
121
43
        context.timeSource(), split_spans_for_request, timestamp_trace_ids);
122
43
    tracer->setTraceContextOption(t);
123
43
    auto reporter = std::make_unique<ReporterImpl>(dispatcher, context.clusterManager(),
124
43
                                                   context.runtime(), stats, c);
125
43
    tracer->setReporter(std::move(reporter));
126
43
    return std::make_shared<Driver::TlsTracer>(std::move(tracer));
127
43
  });
128
43
}
129

            
130
Tracing::SpanPtr Driver::startSpan(const Tracing::Config& config,
131
                                   Tracing::TraceContext& trace_context,
132
                                   const StreamInfo::StreamInfo& stream_info, const std::string&,
133
44
                                   Tracing::Decision tracing_decision) {
134
44
  Tracer& tracer = *tls_->getTyped<TlsTracer>().tracer_;
135
44
  SpanPtr new_zipkin_span;
136

            
137
  // W3C fallback extraction is only enabled when USE_B3_WITH_W3C_PROPAGATION is configured
138
44
  SpanContextExtractor extractor(trace_context, w3cFallbackEnabled());
139
44
  const absl::optional<bool> sampled = extractor.extractSampled();
140
44
  bool use_local_decision = !sampled.has_value();
141
44
  TRY_NEEDS_AUDIT {
142
44
    auto ret_span_context = extractor.extractSpanContext(sampled.value_or(tracing_decision.traced));
143
44
    if (!ret_span_context.second) {
144
      // Create a root Zipkin span. No context was found in the headers.
145
32
      new_zipkin_span =
146
32
          tracer.startSpan(config, std::string(trace_context.host()), stream_info.startTime());
147
32
      new_zipkin_span->setSampled(sampled.value_or(tracing_decision.traced));
148
32
    } else {
149
12
      use_local_decision = false;
150
12
      new_zipkin_span = tracer.startSpan(config, std::string(trace_context.host()),
151
12
                                         stream_info.startTime(), ret_span_context.first);
152
12
    }
153
44
  }
154
44
  END_TRY catch (const ExtractorException& e) { return std::make_unique<Tracing::NullSpan>(); }
155

            
156
41
  new_zipkin_span->setUseLocalDecision(use_local_decision);
157
  // Return the active Zipkin span.
158
41
  return new_zipkin_span;
159
44
}
160

            
161
ReporterImpl::ReporterImpl(Event::Dispatcher& dispatcher, Upstream::ClusterManager& cm,
162
                           Runtime::Loader& runtime, ZipkinTracerStatsSharedPtr tracer_stats,
163
                           CollectorInfoConstSharedPtr collector)
164
43
    : runtime_(runtime), tracer_stats_(std::move(tracer_stats)), collector_(std::move(collector)),
165
43
      span_buffer_{
166
43
          std::make_unique<SpanBuffer>(collector_->version_, collector_->shared_span_context_)},
167
43
      collector_cluster_(cm, collector_->cluster_) {
168
43
  flush_timer_ = dispatcher.createTimer([this]() -> void {
169
1
    tracer_stats_->timer_flushed_.inc();
170
1
    flushSpans();
171
1
    enableTimer();
172
1
  });
173

            
174
43
  const uint64_t min_flush_spans =
175
43
      runtime_.snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U);
176
43
  span_buffer_->allocateBuffer(min_flush_spans);
177

            
178
43
  enableTimer();
179
43
}
180

            
181
17
void ReporterImpl::reportSpan(Span&& span) {
182
17
  span_buffer_->addSpan(std::move(span));
183

            
184
17
  const uint64_t min_flush_spans =
185
17
      runtime_.snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U);
186

            
187
17
  if (span_buffer_->pendingSpans() == min_flush_spans) {
188
13
    flushSpans();
189
13
  }
190
17
}
191

            
192
44
void ReporterImpl::enableTimer() {
193
44
  const uint64_t flush_interval =
194
44
      runtime_.snapshot().getInteger("tracing.zipkin.flush_interval_ms", 5000U);
195
44
  flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
196
44
}
197

            
198
14
void ReporterImpl::flushSpans() {
199
14
  if (span_buffer_->pendingSpans()) {
200
14
    tracer_stats_->spans_sent_.add(span_buffer_->pendingSpans());
201
14
    const std::string request_body = span_buffer_->serialize();
202
14
    Http::RequestMessagePtr message = std::make_unique<Http::RequestMessageImpl>();
203
14
    message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post);
204
    // Set path and hostname - both are stored in collector_
205
14
    message->headers().setPath(collector_->endpoint_);
206
14
    message->headers().setHost(collector_->hostname_);
207

            
208
14
    message->headers().setReferenceContentType(
209
14
        collector_->version_ == envoy::config::trace::v3::ZipkinConfig::HTTP_PROTO
210
14
            ? Http::Headers::get().ContentTypeValues.Protobuf
211
14
            : Http::Headers::get().ContentTypeValues.Json);
212

            
213
    // Add custom headers from collector configuration
214
14
    for (const auto& header : collector_->request_headers_) {
215
      // Replace any existing header with the configured value
216
3
      message->headers().setCopy(header.first, header.second);
217
3
    }
218

            
219
14
    message->body().add(request_body);
220

            
221
14
    const uint64_t timeout =
222
14
        runtime_.snapshot().getInteger("tracing.zipkin.request_timeout", 5000U);
223

            
224
14
    if (collector_cluster_.threadLocalCluster().has_value()) {
225
12
      Http::AsyncClient::Request* request =
226
12
          collector_cluster_.threadLocalCluster()->get().httpAsyncClient().send(
227
12
              std::move(message), *this,
228
12
              Http::AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(timeout)));
229
12
      if (request) {
230
11
        active_requests_.add(*request);
231
11
      }
232
12
    } else {
233
2
      ENVOY_LOG(debug, "collector cluster '{}' does not exist", collector_->cluster_);
234
2
      tracer_stats_->reports_skipped_no_cluster_.inc();
235
2
    }
236

            
237
14
    span_buffer_->clear();
238
14
  }
239
14
}
240

            
241
void ReporterImpl::onFailure(const Http::AsyncClient::Request& request,
242
5
                             Http::AsyncClient::FailureReason) {
243
5
  active_requests_.remove(request);
244
5
  tracer_stats_->reports_failed_.inc();
245
5
}
246

            
247
void ReporterImpl::onSuccess(const Http::AsyncClient::Request& request,
248
7
                             Http::ResponseMessagePtr&& http_response) {
249
7
  active_requests_.remove(request);
250
7
  if (Http::Utility::getResponseStatus(http_response->headers()) !=
251
7
      enumToInt(Http::Code::Accepted)) {
252
2
    tracer_stats_->reports_dropped_.inc();
253
5
  } else {
254
5
    tracer_stats_->reports_sent_.inc();
255
5
  }
256
7
}
257

            
258
} // namespace Zipkin
259
} // namespace Tracers
260
} // namespace Extensions
261
} // namespace Envoy