LCOV - code coverage report
Current view: top level - source/extensions/tracers/skywalking - trace_segment_reporter.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 71 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 11 0.0 %

          Line data    Source code
       1             : #include "source/extensions/tracers/skywalking/trace_segment_reporter.h"
       2             : 
       3             : #include "envoy/http/header_map.h"
       4             : 
       5             : namespace Envoy {
       6             : namespace Extensions {
       7             : namespace Tracers {
       8             : namespace SkyWalking {
       9             : 
      10             : namespace {
      11             : 
      12             : Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::RequestHeaders>
      13             :     authentication_handle(Http::CustomHeaders::get().Authentication);
      14             : 
      15             : } // namespace
      16             : 
      17             : TraceSegmentReporter::TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory,
      18             :                                            Event::Dispatcher& dispatcher,
      19             :                                            Random::RandomGenerator& random_generator,
      20             :                                            SkyWalkingTracerStatsSharedPtr stats,
      21             :                                            uint32_t delayed_buffer_size, const std::string& token)
      22             :     : tracing_stats_(stats), client_(factory->createUncachedRawAsyncClient()),
      23             :       service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
      24             :           "skywalking.v3.TraceSegmentReportService.collect")),
      25             :       random_generator_(random_generator), token_(token),
      26           0 :       delayed_buffer_size_(delayed_buffer_size) {
      27             : 
      28           0 :   static constexpr uint32_t RetryInitialDelayMs = 500;
      29           0 :   static constexpr uint32_t RetryMaxDelayMs = 30000;
      30           0 :   backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
      31           0 :       RetryInitialDelayMs, RetryMaxDelayMs, random_generator_);
      32             : 
      33           0 :   retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
      34           0 :   establishNewStream();
      35           0 : }
      36             : 
      37           0 : TraceSegmentReporter::~TraceSegmentReporter() { closeStream(); }
      38             : 
      39           0 : void TraceSegmentReporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
      40           0 :   if (!token_.empty()) {
      41           0 :     metadata.setInline(authentication_handle.handle(), token_);
      42           0 :   }
      43           0 : }
      44             : 
      45           0 : void TraceSegmentReporter::report(TracingContextPtr tracing_context) {
      46           0 :   ASSERT(tracing_context);
      47           0 :   auto request = tracing_context->createSegmentObject();
      48           0 :   ENVOY_LOG(trace, "Try to report segment to SkyWalking Server:\n{}", request.DebugString());
      49             : 
      50           0 :   if (stream_ != nullptr) {
      51           0 :     if (stream_->isAboveWriteBufferHighWatermark()) {
      52           0 :       ENVOY_LOG(debug, "Failed to report segment to SkyWalking Server since buffer is over limit");
      53           0 :       tracing_stats_->segments_dropped_.inc();
      54           0 :       return;
      55           0 :     }
      56           0 :     tracing_stats_->segments_sent_.inc();
      57           0 :     stream_->sendMessage(request, false);
      58           0 :     return;
      59           0 :   }
      60             :   // Null stream_ and cache segment data temporarily.
      61           0 :   delayed_segments_cache_.emplace(request);
      62           0 :   if (delayed_segments_cache_.size() > delayed_buffer_size_) {
      63           0 :     tracing_stats_->segments_dropped_.inc();
      64           0 :     delayed_segments_cache_.pop();
      65           0 :   }
      66           0 : }
      67             : 
      68           0 : void TraceSegmentReporter::flushTraceSegments() {
      69           0 :   ENVOY_LOG(debug, "Flush segments in cache to SkyWalking backend service");
      70           0 :   while (!delayed_segments_cache_.empty() && stream_ != nullptr) {
      71           0 :     tracing_stats_->segments_sent_.inc();
      72           0 :     tracing_stats_->segments_flushed_.inc();
      73           0 :     stream_->sendMessage(delayed_segments_cache_.front(), false);
      74           0 :     delayed_segments_cache_.pop();
      75           0 :   }
      76           0 :   tracing_stats_->cache_flushed_.inc();
      77           0 : }
      78             : 
      79           0 : void TraceSegmentReporter::closeStream() {
      80           0 :   if (stream_ != nullptr) {
      81           0 :     flushTraceSegments();
      82           0 :     stream_->closeStream();
      83           0 :   }
      84           0 : }
      85             : 
      86             : void TraceSegmentReporter::onRemoteClose(Grpc::Status::GrpcStatus status,
      87           0 :                                          const std::string& message) {
      88           0 :   ENVOY_LOG(debug, "{} gRPC stream closed: {}, {}", service_method_.name(), status, message);
      89           0 :   stream_ = nullptr;
      90           0 :   handleFailure();
      91           0 : }
      92             : 
      93           0 : void TraceSegmentReporter::establishNewStream() {
      94           0 :   ENVOY_LOG(debug, "Try to create new {} gRPC stream for reporter", service_method_.name());
      95           0 :   stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
      96           0 :   if (stream_ == nullptr) {
      97           0 :     ENVOY_LOG(debug, "Failed to create {} gRPC stream", service_method_.name());
      98           0 :     return;
      99           0 :   }
     100             :   // TODO(wbpcode): Even if stream_ is not empty, there is no guarantee that the connection will be
     101             :   // established correctly. If there is a connection failure, the onRemoteClose method will be
     102             :   // called. Currently, we lack a way to determine whether the connection is truly available. This
     103             :   // may cause partial data loss.
     104           0 :   if (!delayed_segments_cache_.empty()) {
     105           0 :     flushTraceSegments();
     106           0 :   }
     107           0 :   backoff_strategy_->reset();
     108           0 : }
     109             : 
     110           0 : void TraceSegmentReporter::handleFailure() { setRetryTimer(); }
     111             : 
     112           0 : void TraceSegmentReporter::setRetryTimer() {
     113           0 :   retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
     114           0 : }
     115             : 
     116             : } // namespace SkyWalking
     117             : } // namespace Tracers
     118             : } // namespace Extensions
     119             : } // namespace Envoy

Generated by: LCOV version 1.15