Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/tracers/skywalking/trace_segment_reporter.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/tracers/skywalking/trace_segment_reporter.h"
2
3
#include "envoy/http/header_map.h"
4
5
namespace Envoy {
6
namespace Extensions {
7
namespace Tracers {
8
namespace SkyWalking {
9
10
namespace {
11
12
Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::RequestHeaders>
13
    authentication_handle(Http::CustomHeaders::get().Authentication);
14
15
} // namespace
16
17
TraceSegmentReporter::TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory,
18
                                           Event::Dispatcher& dispatcher,
19
                                           Random::RandomGenerator& random_generator,
20
                                           SkyWalkingTracerStatsSharedPtr stats,
21
                                           uint32_t delayed_buffer_size, const std::string& token)
22
    : tracing_stats_(stats), client_(factory->createUncachedRawAsyncClient()),
23
      service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
24
          "skywalking.v3.TraceSegmentReportService.collect")),
25
      random_generator_(random_generator), token_(token),
26
0
      delayed_buffer_size_(delayed_buffer_size) {
27
28
0
  static constexpr uint32_t RetryInitialDelayMs = 500;
29
0
  static constexpr uint32_t RetryMaxDelayMs = 30000;
30
0
  backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
31
0
      RetryInitialDelayMs, RetryMaxDelayMs, random_generator_);
32
33
0
  retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
34
0
  establishNewStream();
35
0
}
36
37
0
TraceSegmentReporter::~TraceSegmentReporter() { closeStream(); }
38
39
0
void TraceSegmentReporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
40
0
  if (!token_.empty()) {
41
0
    metadata.setInline(authentication_handle.handle(), token_);
42
0
  }
43
0
}
44
45
0
void TraceSegmentReporter::report(TracingContextPtr tracing_context) {
46
0
  ASSERT(tracing_context);
47
0
  auto request = tracing_context->createSegmentObject();
48
0
  ENVOY_LOG(trace, "Try to report segment to SkyWalking Server:\n{}", request.DebugString());
49
50
0
  if (stream_ != nullptr) {
51
0
    if (stream_->isAboveWriteBufferHighWatermark()) {
52
0
      ENVOY_LOG(debug, "Failed to report segment to SkyWalking Server since buffer is over limit");
53
0
      tracing_stats_->segments_dropped_.inc();
54
0
      return;
55
0
    }
56
0
    tracing_stats_->segments_sent_.inc();
57
0
    stream_->sendMessage(request, false);
58
0
    return;
59
0
  }
60
  // Null stream_ and cache segment data temporarily.
61
0
  delayed_segments_cache_.emplace(request);
62
0
  if (delayed_segments_cache_.size() > delayed_buffer_size_) {
63
0
    tracing_stats_->segments_dropped_.inc();
64
0
    delayed_segments_cache_.pop();
65
0
  }
66
0
}
67
68
0
void TraceSegmentReporter::flushTraceSegments() {
69
0
  ENVOY_LOG(debug, "Flush segments in cache to SkyWalking backend service");
70
0
  while (!delayed_segments_cache_.empty() && stream_ != nullptr) {
71
0
    tracing_stats_->segments_sent_.inc();
72
0
    tracing_stats_->segments_flushed_.inc();
73
0
    stream_->sendMessage(delayed_segments_cache_.front(), false);
74
0
    delayed_segments_cache_.pop();
75
0
  }
76
0
  tracing_stats_->cache_flushed_.inc();
77
0
}
78
79
0
void TraceSegmentReporter::closeStream() {
80
0
  if (stream_ != nullptr) {
81
0
    flushTraceSegments();
82
0
    stream_->closeStream();
83
0
  }
84
0
}
85
86
void TraceSegmentReporter::onRemoteClose(Grpc::Status::GrpcStatus status,
87
0
                                         const std::string& message) {
88
0
  ENVOY_LOG(debug, "{} gRPC stream closed: {}, {}", service_method_.name(), status, message);
89
0
  stream_ = nullptr;
90
0
  handleFailure();
91
0
}
92
93
0
void TraceSegmentReporter::establishNewStream() {
94
0
  ENVOY_LOG(debug, "Try to create new {} gRPC stream for reporter", service_method_.name());
95
0
  stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
96
0
  if (stream_ == nullptr) {
97
0
    ENVOY_LOG(debug, "Failed to create {} gRPC stream", service_method_.name());
98
0
    return;
99
0
  }
100
  // TODO(wbpcode): Even if stream_ is not empty, there is no guarantee that the connection will be
101
  // established correctly. If there is a connection failure, the onRemoteClose method will be
102
  // called. Currently, we lack a way to determine whether the connection is truly available. This
103
  // may cause partial data loss.
104
0
  if (!delayed_segments_cache_.empty()) {
105
0
    flushTraceSegments();
106
0
  }
107
0
  backoff_strategy_->reset();
108
0
}
109
110
0
void TraceSegmentReporter::handleFailure() { setRetryTimer(); }
111
112
0
void TraceSegmentReporter::setRetryTimer() {
113
0
  retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
114
0
}
115
116
} // namespace SkyWalking
117
} // namespace Tracers
118
} // namespace Extensions
119
} // namespace Envoy