1
#include "source/extensions/tracers/skywalking/trace_segment_reporter.h"
2

            
3
#include "envoy/http/header_map.h"
4

            
5
namespace Envoy {
6
namespace Extensions {
7
namespace Tracers {
8
namespace SkyWalking {
9

            
10
namespace {
11

            
12
Http::RegisterCustomInlineHeader<Http::CustomInlineHeaderRegistry::Type::RequestHeaders>
13
    authentication_handle(Http::CustomHeaders::get().Authentication);
14

            
15
} // namespace
16

            
17
TraceSegmentReporter::TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory,
18
                                           Event::Dispatcher& dispatcher,
19
                                           Random::RandomGenerator& random_generator,
20
                                           SkyWalkingTracerStatsSharedPtr stats,
21
                                           uint32_t delayed_buffer_size, const std::string& token)
22
12
    : tracing_stats_(stats), client_(THROW_OR_RETURN_VALUE(factory->createUncachedRawAsyncClient(),
23
12
                                                           Grpc::RawAsyncClientPtr)),
24
12
      service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
25
12
          "skywalking.v3.TraceSegmentReportService.collect")),
26
12
      random_generator_(random_generator), token_(token),
27
12
      delayed_buffer_size_(delayed_buffer_size) {
28

            
29
12
  static constexpr uint32_t RetryInitialDelayMs = 500;
30
12
  static constexpr uint32_t RetryMaxDelayMs = 30000;
31
12
  backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
32
12
      RetryInitialDelayMs, RetryMaxDelayMs, random_generator_);
33

            
34
12
  retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
35
12
  establishNewStream();
36
12
}
37

            
38
12
TraceSegmentReporter::~TraceSegmentReporter() { closeStream(); }
39

            
40
2
void TraceSegmentReporter::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
41
2
  if (!token_.empty()) {
42
1
    metadata.setInline(authentication_handle.handle(), token_);
43
1
  }
44
2
}
45

            
46
2068
void TraceSegmentReporter::report(TracingContextSharedPtr tracing_context) {
47
2068
  ASSERT(tracing_context);
48
2068
  auto request = tracing_context->createSegmentObject();
49
2068
  ENVOY_LOG(trace, "Try to report segment to SkyWalking Server:\n{}", request.DebugString());
50

            
51
2068
  if (stream_ != nullptr) {
52
10
    if (stream_->isAboveWriteBufferHighWatermark()) {
53
1
      ENVOY_LOG(debug, "Failed to report segment to SkyWalking Server since buffer is over limit");
54
1
      tracing_stats_->segments_dropped_.inc();
55
1
      return;
56
1
    }
57
9
    tracing_stats_->segments_sent_.inc();
58
9
    stream_->sendMessage(request, false);
59
9
    return;
60
10
  }
61
  // Null stream_ and cache segment data temporarily.
62
2058
  delayed_segments_cache_.emplace(request);
63
2058
  if (delayed_segments_cache_.size() > delayed_buffer_size_) {
64
1031
    tracing_stats_->segments_dropped_.inc();
65
1031
    delayed_segments_cache_.pop();
66
1031
  }
67
2058
}
68

            
69
12
void TraceSegmentReporter::flushTraceSegments() {
70
12
  ENVOY_LOG(debug, "Flush segments in cache to SkyWalking backend service");
71
1039
  while (!delayed_segments_cache_.empty() && stream_ != nullptr) {
72
1027
    tracing_stats_->segments_sent_.inc();
73
1027
    tracing_stats_->segments_flushed_.inc();
74
1027
    stream_->sendMessage(delayed_segments_cache_.front(), false);
75
1027
    delayed_segments_cache_.pop();
76
1027
  }
77
12
  tracing_stats_->cache_flushed_.inc();
78
12
}
79

            
80
12
void TraceSegmentReporter::closeStream() {
81
12
  if (stream_ != nullptr) {
82
10
    flushTraceSegments();
83
10
    stream_->closeStream();
84
10
  }
85
12
}
86

            
87
void TraceSegmentReporter::onRemoteClose(Grpc::Status::GrpcStatus status,
88
2
                                         const std::string& message) {
89
2
  ENVOY_LOG(debug, "{} gRPC stream closed: {}, {}", service_method_.name(), status, message);
90
2
  stream_ = nullptr;
91
2
  handleFailure();
92
2
}
93

            
94
14
void TraceSegmentReporter::establishNewStream() {
95
14
  ENVOY_LOG(debug, "Try to create new {} gRPC stream for reporter", service_method_.name());
96
14
  stream_ = client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
97
14
  if (stream_ == nullptr) {
98
2
    ENVOY_LOG(debug, "Failed to create {} gRPC stream", service_method_.name());
99
2
    return;
100
2
  }
101
  // TODO(wbpcode): Even if stream_ is not empty, there is no guarantee that the connection will be
102
  // established correctly. If there is a connection failure, the onRemoteClose method will be
103
  // called. Currently, we lack a way to determine whether the connection is truly available. This
104
  // may cause partial data loss.
105
12
  if (!delayed_segments_cache_.empty()) {
106
2
    flushTraceSegments();
107
2
  }
108
12
  backoff_strategy_->reset();
109
12
}
110

            
111
2
void TraceSegmentReporter::handleFailure() { setRetryTimer(); }
112

            
113
2
void TraceSegmentReporter::setRetryTimer() {
114
2
  retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
115
2
}
116

            
117
} // namespace SkyWalking
118
} // namespace Tracers
119
} // namespace Extensions
120
} // namespace Envoy