Line data Source code
1 : #include "source/extensions/tracers/zipkin/zipkin_tracer_impl.h"
2 :
3 : #include "envoy/config/trace/v3/zipkin.pb.h"
4 :
5 : #include "source/common/common/empty_string.h"
6 : #include "source/common/common/enum_to_int.h"
7 : #include "source/common/common/fmt.h"
8 : #include "source/common/common/utility.h"
9 : #include "source/common/config/utility.h"
10 : #include "source/common/http/headers.h"
11 : #include "source/common/http/message_impl.h"
12 : #include "source/common/http/utility.h"
13 : #include "source/common/tracing/http_tracer_impl.h"
14 : #include "source/extensions/tracers/zipkin/span_context_extractor.h"
15 : #include "source/extensions/tracers/zipkin/zipkin_core_constants.h"
16 :
17 : namespace Envoy {
18 : namespace Extensions {
19 : namespace Tracers {
20 : namespace Zipkin {
21 :
22 0 : ZipkinSpan::ZipkinSpan(Zipkin::Span& span, Zipkin::Tracer& tracer) : span_(span), tracer_(tracer) {}
23 :
24 0 : void ZipkinSpan::finishSpan() { span_.finish(); }
25 :
26 0 : void ZipkinSpan::setOperation(absl::string_view operation) {
27 0 : span_.setName(std::string(operation));
28 0 : }
29 :
30 0 : void ZipkinSpan::setTag(absl::string_view name, absl::string_view value) {
31 0 : span_.setTag(name, value);
32 0 : }
33 :
34 0 : void ZipkinSpan::log(SystemTime timestamp, const std::string& event) {
35 0 : span_.log(timestamp, event);
36 0 : }
37 :
38 : // TODO(#11622): Implement baggage storage for zipkin spans
39 0 : void ZipkinSpan::setBaggage(absl::string_view, absl::string_view) {}
40 0 : std::string ZipkinSpan::getBaggage(absl::string_view) { return EMPTY_STRING; }
41 :
42 : void ZipkinSpan::injectContext(Tracing::TraceContext& trace_context,
43 0 : const Upstream::HostDescriptionConstSharedPtr&) {
44 : // Set the trace-id and span-id headers properly, based on the newly-created span structure.
45 0 : ZipkinCoreConstants::get().X_B3_TRACE_ID.setRefKey(trace_context, span_.traceIdAsHexString());
46 0 : ZipkinCoreConstants::get().X_B3_SPAN_ID.setRefKey(trace_context, span_.idAsHexString());
47 :
48 : // Set the parent-span header properly, based on the newly-created span structure.
49 0 : if (span_.isSetParentId()) {
50 0 : ZipkinCoreConstants::get().X_B3_PARENT_SPAN_ID.setRefKey(trace_context,
51 0 : span_.parentIdAsHexString());
52 0 : }
53 :
54 : // Set the sampled header.
55 0 : ZipkinCoreConstants::get().X_B3_SAMPLED.setRefKey(trace_context,
56 0 : span_.sampled() ? SAMPLED : NOT_SAMPLED);
57 0 : }
58 :
59 0 : void ZipkinSpan::setSampled(bool sampled) { span_.setSampled(sampled); }
60 :
61 : Tracing::SpanPtr ZipkinSpan::spawnChild(const Tracing::Config& config, const std::string& name,
62 0 : SystemTime start_time) {
63 0 : SpanContext previous_context(span_);
64 0 : return std::make_unique<ZipkinSpan>(
65 0 : *tracer_.startSpan(config, name, start_time, previous_context), tracer_);
66 0 : }
67 :
68 : Driver::TlsTracer::TlsTracer(TracerPtr&& tracer, Driver& driver)
69 0 : : tracer_(std::move(tracer)), driver_(driver) {}
70 :
71 : Driver::Driver(const envoy::config::trace::v3::ZipkinConfig& zipkin_config,
72 : Upstream::ClusterManager& cluster_manager, Stats::Scope& scope,
73 : ThreadLocal::SlotAllocator& tls, Runtime::Loader& runtime,
74 : const LocalInfo::LocalInfo& local_info, Random::RandomGenerator& random_generator,
75 : TimeSource& time_source)
76 : : cm_(cluster_manager), tracer_stats_{ZIPKIN_TRACER_STATS(
77 : POOL_COUNTER_PREFIX(scope, "tracing.zipkin."))},
78 : tls_(tls.allocateSlot()), runtime_(runtime), local_info_(local_info),
79 0 : time_source_(time_source) {
80 0 : Config::Utility::checkCluster("envoy.tracers.zipkin", zipkin_config.collector_cluster(), cm_,
81 0 : /* allow_added_via_api */ true);
82 0 : cluster_ = zipkin_config.collector_cluster();
83 0 : hostname_ = !zipkin_config.collector_hostname().empty() ? zipkin_config.collector_hostname()
84 0 : : zipkin_config.collector_cluster();
85 :
86 0 : CollectorInfo collector;
87 0 : if (!zipkin_config.collector_endpoint().empty()) {
88 0 : collector.endpoint_ = zipkin_config.collector_endpoint();
89 0 : }
90 : // The current default version of collector_endpoint_version is HTTP_JSON.
91 0 : collector.version_ = zipkin_config.collector_endpoint_version();
92 0 : const bool trace_id_128bit = zipkin_config.trace_id_128bit();
93 :
94 0 : const bool shared_span_context = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
95 0 : zipkin_config, shared_span_context, DEFAULT_SHARED_SPAN_CONTEXT);
96 0 : collector.shared_span_context_ = shared_span_context;
97 :
98 0 : const bool split_spans_for_request = zipkin_config.split_spans_for_request();
99 :
100 0 : tls_->set([this, collector, &random_generator, trace_id_128bit, shared_span_context,
101 0 : split_spans_for_request](
102 0 : Event::Dispatcher& dispatcher) -> ThreadLocal::ThreadLocalObjectSharedPtr {
103 0 : TracerPtr tracer = std::make_unique<Tracer>(
104 0 : local_info_.clusterName(), local_info_.address(), random_generator, trace_id_128bit,
105 0 : shared_span_context, time_source_, split_spans_for_request);
106 0 : tracer->setReporter(
107 0 : ReporterImpl::newInstance(std::ref(*this), std::ref(dispatcher), collector));
108 0 : return std::make_shared<TlsTracer>(std::move(tracer), *this);
109 0 : });
110 0 : }
111 :
112 : Tracing::SpanPtr Driver::startSpan(const Tracing::Config& config,
113 : Tracing::TraceContext& trace_context,
114 : const StreamInfo::StreamInfo& stream_info, const std::string&,
115 0 : Tracing::Decision tracing_decision) {
116 0 : Tracer& tracer = *tls_->getTyped<TlsTracer>().tracer_;
117 0 : SpanPtr new_zipkin_span;
118 0 : SpanContextExtractor extractor(trace_context);
119 0 : bool sampled{extractor.extractSampled(tracing_decision)};
120 0 : TRY_NEEDS_AUDIT {
121 0 : auto ret_span_context = extractor.extractSpanContext(sampled);
122 0 : if (!ret_span_context.second) {
123 : // Create a root Zipkin span. No context was found in the headers.
124 0 : new_zipkin_span =
125 0 : tracer.startSpan(config, std::string(trace_context.host()), stream_info.startTime());
126 0 : new_zipkin_span->setSampled(sampled);
127 0 : } else {
128 0 : new_zipkin_span = tracer.startSpan(config, std::string(trace_context.host()),
129 0 : stream_info.startTime(), ret_span_context.first);
130 0 : }
131 0 : }
132 0 : END_TRY catch (const ExtractorException& e) { return std::make_unique<Tracing::NullSpan>(); }
133 :
134 : // Return the active Zipkin span.
135 0 : return std::make_unique<ZipkinSpan>(*new_zipkin_span, tracer);
136 0 : }
137 :
138 : ReporterImpl::ReporterImpl(Driver& driver, Event::Dispatcher& dispatcher,
139 : const CollectorInfo& collector)
140 : : driver_(driver),
141 : collector_(collector), span_buffer_{std::make_unique<SpanBuffer>(
142 : collector.version_, collector.shared_span_context_)},
143 0 : collector_cluster_(driver_.clusterManager(), driver_.cluster()) {
144 0 : flush_timer_ = dispatcher.createTimer([this]() -> void {
145 0 : driver_.tracerStats().timer_flushed_.inc();
146 0 : flushSpans();
147 0 : enableTimer();
148 0 : });
149 :
150 0 : const uint64_t min_flush_spans =
151 0 : driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U);
152 0 : span_buffer_->allocateBuffer(min_flush_spans);
153 :
154 0 : enableTimer();
155 0 : }
156 :
157 : ReporterPtr ReporterImpl::newInstance(Driver& driver, Event::Dispatcher& dispatcher,
158 0 : const CollectorInfo& collector) {
159 0 : return std::make_unique<ReporterImpl>(driver, dispatcher, collector);
160 0 : }
161 :
162 0 : void ReporterImpl::reportSpan(Span&& span) {
163 0 : span_buffer_->addSpan(std::move(span));
164 :
165 0 : const uint64_t min_flush_spans =
166 0 : driver_.runtime().snapshot().getInteger("tracing.zipkin.min_flush_spans", 5U);
167 :
168 0 : if (span_buffer_->pendingSpans() == min_flush_spans) {
169 0 : flushSpans();
170 0 : }
171 0 : }
172 :
173 0 : void ReporterImpl::enableTimer() {
174 0 : const uint64_t flush_interval =
175 0 : driver_.runtime().snapshot().getInteger("tracing.zipkin.flush_interval_ms", 5000U);
176 0 : flush_timer_->enableTimer(std::chrono::milliseconds(flush_interval));
177 0 : }
178 :
179 0 : void ReporterImpl::flushSpans() {
180 0 : if (span_buffer_->pendingSpans()) {
181 0 : driver_.tracerStats().spans_sent_.add(span_buffer_->pendingSpans());
182 0 : const std::string request_body = span_buffer_->serialize();
183 0 : Http::RequestMessagePtr message = std::make_unique<Http::RequestMessageImpl>();
184 0 : message->headers().setReferenceMethod(Http::Headers::get().MethodValues.Post);
185 0 : message->headers().setPath(collector_.endpoint_);
186 0 : message->headers().setHost(driver_.hostname());
187 0 : message->headers().setReferenceContentType(
188 0 : collector_.version_ == envoy::config::trace::v3::ZipkinConfig::HTTP_PROTO
189 0 : ? Http::Headers::get().ContentTypeValues.Protobuf
190 0 : : Http::Headers::get().ContentTypeValues.Json);
191 :
192 0 : message->body().add(request_body);
193 :
194 0 : const uint64_t timeout =
195 0 : driver_.runtime().snapshot().getInteger("tracing.zipkin.request_timeout", 5000U);
196 :
197 0 : if (collector_cluster_.threadLocalCluster().has_value()) {
198 0 : Http::AsyncClient::Request* request =
199 0 : collector_cluster_.threadLocalCluster()->get().httpAsyncClient().send(
200 0 : std::move(message), *this,
201 0 : Http::AsyncClient::RequestOptions().setTimeout(std::chrono::milliseconds(timeout)));
202 0 : if (request) {
203 0 : active_requests_.add(*request);
204 0 : }
205 0 : } else {
206 0 : ENVOY_LOG(debug, "collector cluster '{}' does not exist", driver_.cluster());
207 0 : driver_.tracerStats().reports_skipped_no_cluster_.inc();
208 0 : }
209 :
210 0 : span_buffer_->clear();
211 0 : }
212 0 : }
213 :
214 : void ReporterImpl::onFailure(const Http::AsyncClient::Request& request,
215 0 : Http::AsyncClient::FailureReason) {
216 0 : active_requests_.remove(request);
217 0 : driver_.tracerStats().reports_failed_.inc();
218 0 : }
219 :
220 : void ReporterImpl::onSuccess(const Http::AsyncClient::Request& request,
221 0 : Http::ResponseMessagePtr&& http_response) {
222 0 : active_requests_.remove(request);
223 0 : if (Http::Utility::getResponseStatus(http_response->headers()) !=
224 0 : enumToInt(Http::Code::Accepted)) {
225 0 : driver_.tracerStats().reports_dropped_.inc();
226 0 : } else {
227 0 : driver_.tracerStats().reports_sent_.inc();
228 0 : }
229 0 : }
230 :
231 : } // namespace Zipkin
232 : } // namespace Tracers
233 : } // namespace Extensions
234 : } // namespace Envoy
|