/proc/self/cwd/source/extensions/tracers/skywalking/trace_segment_reporter.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/tracers/skywalking/trace_segment_reporter.h" |
2 | | |
3 | | #include "envoy/http/header_map.h" |
4 | | |
5 | | namespace Envoy { |
6 | | namespace Extensions { |
7 | | namespace Tracers { |
8 | | namespace SkyWalking { |
9 | | |
10 | | namespace { |
11 | | |
12 | | Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::RequestHeaders> |
13 | | authentication_handle(Http::CustomHeaders::get().Authentication); |
14 | | |
15 | | } // namespace |
16 | | |
17 | | TraceSegmentReporter::TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory, |
18 | | Event::Dispatcher& dispatcher, |
19 | | Random::RandomGenerator& random_generator, |
20 | | SkyWalkingTracerStatsSharedPtr stats, |
21 | | uint32_t delayed_buffer_size, const std::string& token) |
22 | | : tracing_stats_(stats), client_(factory->createUncachedRawAsyncClient()), |
23 | | service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( |
24 | | "skywalking.v3.TraceSegmentReportService.collect")), |
25 | | random_generator_(random_generator), token_(token), |
26 | 0 | delayed_buffer_size_(delayed_buffer_size) { |
27 | |
|
28 | 0 | static constexpr uint32_t RetryInitialDelayMs = 500; |
29 | 0 | static constexpr uint32_t RetryMaxDelayMs = 30000; |
30 | 0 | backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>( |
31 | 0 | RetryInitialDelayMs, RetryMaxDelayMs, random_generator_); |
32 | |
|
33 | 0 | retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); |
34 | 0 | establishNewStream(); |
35 | 0 | } |
36 | | |
37 | 0 | TraceSegmentReporter::~TraceSegmentReporter() { closeStream(); } |
38 | | |
39 | 0 | void TraceSegmentReporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) { |
40 | 0 | if (!token_.empty()) { |
41 | 0 | metadata.setInline(authentication_handle.handle(), token_); |
42 | 0 | } |
43 | 0 | } |
44 | | |
45 | 0 | void TraceSegmentReporter::report(TracingContextPtr tracing_context) { |
46 | 0 | ASSERT(tracing_context); |
47 | 0 | auto request = tracing_context->createSegmentObject(); |
48 | 0 | ENVOY_LOG(trace, "Try to report segment to SkyWalking Server:\n{}", request.DebugString()); |
49 | |
|
50 | 0 | if (stream_ != nullptr) { |
51 | 0 | if (stream_->isAboveWriteBufferHighWatermark()) { |
52 | 0 | ENVOY_LOG(debug, "Failed to report segment to SkyWalking Server since buffer is over limit"); |
53 | 0 | tracing_stats_->segments_dropped_.inc(); |
54 | 0 | return; |
55 | 0 | } |
56 | 0 | tracing_stats_->segments_sent_.inc(); |
57 | 0 | stream_->sendMessage(request, false); |
58 | 0 | return; |
59 | 0 | } |
60 | | // Null stream_ and cache segment data temporarily. |
61 | 0 | delayed_segments_cache_.emplace(request); |
62 | 0 | if (delayed_segments_cache_.size() > delayed_buffer_size_) { |
63 | 0 | tracing_stats_->segments_dropped_.inc(); |
64 | 0 | delayed_segments_cache_.pop(); |
65 | 0 | } |
66 | 0 | } |
67 | | |
68 | 0 | void TraceSegmentReporter::flushTraceSegments() { |
69 | 0 | ENVOY_LOG(debug, "Flush segments in cache to SkyWalking backend service"); |
70 | 0 | while (!delayed_segments_cache_.empty() && stream_ != nullptr) { |
71 | 0 | tracing_stats_->segments_sent_.inc(); |
72 | 0 | tracing_stats_->segments_flushed_.inc(); |
73 | 0 | stream_->sendMessage(delayed_segments_cache_.front(), false); |
74 | 0 | delayed_segments_cache_.pop(); |
75 | 0 | } |
76 | 0 | tracing_stats_->cache_flushed_.inc(); |
77 | 0 | } |
78 | | |
79 | 0 | void TraceSegmentReporter::closeStream() { |
80 | 0 | if (stream_ != nullptr) { |
81 | 0 | flushTraceSegments(); |
82 | 0 | stream_->closeStream(); |
83 | 0 | } |
84 | 0 | } |
85 | | |
86 | | void TraceSegmentReporter::onRemoteClose(Grpc::Status::GrpcStatus status, |
87 | 0 | const std::string& message) { |
88 | 0 | ENVOY_LOG(debug, "{} gRPC stream closed: {}, {}", service_method_.name(), status, message); |
89 | 0 | stream_ = nullptr; |
90 | 0 | handleFailure(); |
91 | 0 | } |
92 | | |
93 | 0 | void TraceSegmentReporter::establishNewStream() { |
94 | 0 | ENVOY_LOG(debug, "Try to create new {} gRPC stream for reporter", service_method_.name()); |
95 | 0 | stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions()); |
96 | 0 | if (stream_ == nullptr) { |
97 | 0 | ENVOY_LOG(debug, "Failed to create {} gRPC stream", service_method_.name()); |
98 | 0 | return; |
99 | 0 | } |
100 | | // TODO(wbpcode): Even if stream_ is not empty, there is no guarantee that the connection will be |
101 | | // established correctly. If there is a connection failure, the onRemoteClose method will be |
102 | | // called. Currently, we lack a way to determine whether the connection is truly available. This |
103 | | // may cause partial data loss. |
104 | 0 | if (!delayed_segments_cache_.empty()) { |
105 | 0 | flushTraceSegments(); |
106 | 0 | } |
107 | 0 | backoff_strategy_->reset(); |
108 | 0 | } |
109 | | |
110 | 0 | void TraceSegmentReporter::handleFailure() { setRetryTimer(); } |
111 | | |
112 | 0 | void TraceSegmentReporter::setRetryTimer() { |
113 | 0 | retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs())); |
114 | 0 | } |
115 | | |
116 | | } // namespace SkyWalking |
117 | | } // namespace Tracers |
118 | | } // namespace Extensions |
119 | | } // namespace Envoy |