Line data Source code
1 : #include "source/extensions/tracers/skywalking/trace_segment_reporter.h" 2 : 3 : #include "envoy/http/header_map.h" 4 : 5 : namespace Envoy { 6 : namespace Extensions { 7 : namespace Tracers { 8 : namespace SkyWalking { 9 : 10 : namespace { 11 : 12 : Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::RequestHeaders> 13 : authentication_handle(Http::CustomHeaders::get().Authentication); 14 : 15 : } // namespace 16 : 17 : TraceSegmentReporter::TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory, 18 : Event::Dispatcher& dispatcher, 19 : Random::RandomGenerator& random_generator, 20 : SkyWalkingTracerStatsSharedPtr stats, 21 : uint32_t delayed_buffer_size, const std::string& token) 22 : : tracing_stats_(stats), client_(factory->createUncachedRawAsyncClient()), 23 : service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( 24 : "skywalking.v3.TraceSegmentReportService.collect")), 25 : random_generator_(random_generator), token_(token), 26 0 : delayed_buffer_size_(delayed_buffer_size) { 27 : 28 0 : static constexpr uint32_t RetryInitialDelayMs = 500; 29 0 : static constexpr uint32_t RetryMaxDelayMs = 30000; 30 0 : backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>( 31 0 : RetryInitialDelayMs, RetryMaxDelayMs, random_generator_); 32 : 33 0 : retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); 34 0 : establishNewStream(); 35 0 : } 36 : 37 0 : TraceSegmentReporter::~TraceSegmentReporter() { closeStream(); } 38 : 39 0 : void TraceSegmentReporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) { 40 0 : if (!token_.empty()) { 41 0 : metadata.setInline(authentication_handle.handle(), token_); 42 0 : } 43 0 : } 44 : 45 0 : void TraceSegmentReporter::report(TracingContextPtr tracing_context) { 46 0 : ASSERT(tracing_context); 47 0 : auto request = tracing_context->createSegmentObject(); 48 0 : ENVOY_LOG(trace, "Try to report segment to SkyWalking Server:\n{}", request.DebugString()); 49 : 50 0 : if (stream_ != nullptr) { 51 0 : if (stream_->isAboveWriteBufferHighWatermark()) { 52 0 : ENVOY_LOG(debug, "Failed to report segment to SkyWalking Server since buffer is over limit"); 53 0 : tracing_stats_->segments_dropped_.inc(); 54 0 : return; 55 0 : } 56 0 : tracing_stats_->segments_sent_.inc(); 57 0 : stream_->sendMessage(request, false); 58 0 : return; 59 0 : } 60 : // Null stream_ and cache segment data temporarily. 61 0 : delayed_segments_cache_.emplace(request); 62 0 : if (delayed_segments_cache_.size() > delayed_buffer_size_) { 63 0 : tracing_stats_->segments_dropped_.inc(); 64 0 : delayed_segments_cache_.pop(); 65 0 : } 66 0 : } 67 : 68 0 : void TraceSegmentReporter::flushTraceSegments() { 69 0 : ENVOY_LOG(debug, "Flush segments in cache to SkyWalking backend service"); 70 0 : while (!delayed_segments_cache_.empty() && stream_ != nullptr) { 71 0 : tracing_stats_->segments_sent_.inc(); 72 0 : tracing_stats_->segments_flushed_.inc(); 73 0 : stream_->sendMessage(delayed_segments_cache_.front(), false); 74 0 : delayed_segments_cache_.pop(); 75 0 : } 76 0 : tracing_stats_->cache_flushed_.inc(); 77 0 : } 78 : 79 0 : void TraceSegmentReporter::closeStream() { 80 0 : if (stream_ != nullptr) { 81 0 : flushTraceSegments(); 82 0 : stream_->closeStream(); 83 0 : } 84 0 : } 85 : 86 : void TraceSegmentReporter::onRemoteClose(Grpc::Status::GrpcStatus status, 87 0 : const std::string& message) { 88 0 : ENVOY_LOG(debug, "{} gRPC stream closed: {}, {}", service_method_.name(), status, message); 89 0 : stream_ = nullptr; 90 0 : handleFailure(); 91 0 : } 92 : 93 0 : void TraceSegmentReporter::establishNewStream() { 94 0 : ENVOY_LOG(debug, "Try to create new {} gRPC stream for reporter", service_method_.name()); 95 0 : stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions()); 96 0 : if (stream_ == nullptr) { 97 0 : ENVOY_LOG(debug, "Failed to create {} gRPC stream", service_method_.name()); 98 0 : return; 99 0 : } 100 : // TODO(wbpcode): Even if stream_ is not empty, there is no guarantee that the connection will be 101 : // established correctly. If there is a connection failure, the onRemoteClose method will be 102 : // called. Currently, we lack a way to determine whether the connection is truly available. This 103 : // may cause partial data loss. 104 0 : if (!delayed_segments_cache_.empty()) { 105 0 : flushTraceSegments(); 106 0 : } 107 0 : backoff_strategy_->reset(); 108 0 : } 109 : 110 0 : void TraceSegmentReporter::handleFailure() { setRetryTimer(); } 111 : 112 0 : void TraceSegmentReporter::setRetryTimer() { 113 0 : retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs())); 114 0 : } 115 : 116 : } // namespace SkyWalking 117 : } // namespace Tracers 118 : } // namespace Extensions 119 : } // namespace Envoy