Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/tracers/skywalking/trace_segment_reporter.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <queue>
4
5
#include "envoy/config/trace/v3/skywalking.pb.h"
6
#include "envoy/grpc/async_client_manager.h"
7
8
#include "source/common/common/backoff_strategy.h"
9
#include "source/common/grpc/async_client_impl.h"
10
#include "source/extensions/tracers/skywalking/skywalking_stats.h"
11
12
#include "cpp2sky/tracing_context.h"
13
14
namespace Envoy {
15
namespace Extensions {
16
namespace Tracers {
17
namespace SkyWalking {
18
19
using cpp2sky::TracingContextPtr;
20
21
class TraceSegmentReporter : public Logger::Loggable<Logger::Id::tracing>,
22
                             public Grpc::AsyncStreamCallbacks<skywalking::v3::Commands> {
23
public:
24
  explicit TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory,
25
                                Event::Dispatcher& dispatcher, Random::RandomGenerator& random,
26
                                SkyWalkingTracerStatsSharedPtr stats, uint32_t delayed_buffer_size,
27
                                const std::string& token);
28
  ~TraceSegmentReporter() override;
29
30
  // Grpc::AsyncStreamCallbacks
31
  void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
32
0
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
33
0
  void onReceiveMessage(std::unique_ptr<skywalking::v3::Commands>&&) override {}
34
0
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
35
  void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override;
36
37
  void report(TracingContextPtr tracing_context);
38
39
private:
40
  /*
41
   * Flush all cached segment objects to the back-end tracing service and close the GRPC stream.
42
   */
43
  void closeStream();
44
  void flushTraceSegments();
45
  void establishNewStream();
46
  void handleFailure();
47
  void setRetryTimer();
48
49
  SkyWalkingTracerStatsSharedPtr tracing_stats_;
50
  Grpc::AsyncClient<skywalking::v3::SegmentObject, skywalking::v3::Commands> client_;
51
  Grpc::AsyncStream<skywalking::v3::SegmentObject> stream_{};
52
  const Protobuf::MethodDescriptor& service_method_;
53
  Random::RandomGenerator& random_generator_;
54
  // If the connection is unavailable when reporting data, the created SegmentObject will be cached
55
  // in the queue, and when a new connection is established, the cached data will be reported.
56
  std::queue<skywalking::v3::SegmentObject> delayed_segments_cache_;
57
  Event::TimerPtr retry_timer_;
58
  BackOffStrategyPtr backoff_strategy_;
59
  std::string token_;
60
  uint32_t delayed_buffer_size_{0};
61
};
62
63
using TraceSegmentReporterPtr = std::unique_ptr<TraceSegmentReporter>;
64
65
} // namespace SkyWalking
66
} // namespace Tracers
67
} // namespace Extensions
68
} // namespace Envoy