1
#pragma once
2

            
3
#include <queue>
4

            
5
#include "envoy/config/trace/v3/skywalking.pb.h"
6
#include "envoy/grpc/async_client_manager.h"
7

            
8
#include "source/common/common/backoff_strategy.h"
9
#include "source/common/grpc/async_client_impl.h"
10
#include "source/extensions/tracers/skywalking/skywalking_stats.h"
11

            
12
#include "cpp2sky/tracing_context.h"
13

            
14
namespace Envoy {
15
namespace Extensions {
16
namespace Tracers {
17
namespace SkyWalking {
18

            
19
using cpp2sky::TracingContextSharedPtr;
20

            
21
class TraceSegmentReporter : public Logger::Loggable<Logger::Id::tracing>,
22
                             public Grpc::AsyncStreamCallbacks<skywalking::v3::Commands> {
23
public:
24
  explicit TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory,
25
                                Event::Dispatcher& dispatcher, Random::RandomGenerator& random,
26
                                SkyWalkingTracerStatsSharedPtr stats, uint32_t delayed_buffer_size,
27
                                const std::string& token);
28
  ~TraceSegmentReporter() override;
29

            
30
  // Grpc::AsyncStreamCallbacks
31
  void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
32
1
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
33
1
  void onReceiveMessage(std::unique_ptr<skywalking::v3::Commands>&&) override {}
34
1
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
35
  void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override;
36

            
37
  void report(TracingContextSharedPtr tracing_context);
38

            
39
private:
40
  /*
41
   * Flush all cached segment objects to the back-end tracing service and close the GRPC stream.
42
   */
43
  void closeStream();
44
  void flushTraceSegments();
45
  void establishNewStream();
46
  void handleFailure();
47
  void setRetryTimer();
48

            
49
  SkyWalkingTracerStatsSharedPtr tracing_stats_;
50
  Grpc::AsyncClient<skywalking::v3::SegmentObject, skywalking::v3::Commands> client_;
51
  Grpc::AsyncStream<skywalking::v3::SegmentObject> stream_;
52
  const Protobuf::MethodDescriptor& service_method_;
53
  Random::RandomGenerator& random_generator_;
54
  // If the connection is unavailable when reporting data, the created SegmentObject will be cached
55
  // in the queue, and when a new connection is established, the cached data will be reported.
56
  std::queue<skywalking::v3::SegmentObject> delayed_segments_cache_;
57
  Event::TimerPtr retry_timer_;
58
  BackOffStrategyPtr backoff_strategy_;
59
  std::string token_;
60
  uint32_t delayed_buffer_size_{0};
61
};
62

            
63
using TraceSegmentReporterPtr = std::unique_ptr<TraceSegmentReporter>;
64

            
65
} // namespace SkyWalking
66
} // namespace Tracers
67
} // namespace Extensions
68
} // namespace Envoy