Line data Source code
1 : #pragma once 2 : 3 : #include <queue> 4 : 5 : #include "envoy/config/trace/v3/skywalking.pb.h" 6 : #include "envoy/grpc/async_client_manager.h" 7 : 8 : #include "source/common/common/backoff_strategy.h" 9 : #include "source/common/grpc/async_client_impl.h" 10 : #include "source/extensions/tracers/skywalking/skywalking_stats.h" 11 : 12 : #include "cpp2sky/tracing_context.h" 13 : 14 : namespace Envoy { 15 : namespace Extensions { 16 : namespace Tracers { 17 : namespace SkyWalking { 18 : 19 : using cpp2sky::TracingContextPtr; 20 : 21 : class TraceSegmentReporter : public Logger::Loggable<Logger::Id::tracing>, 22 : public Grpc::AsyncStreamCallbacks<skywalking::v3::Commands> { 23 : public: 24 : explicit TraceSegmentReporter(Grpc::AsyncClientFactoryPtr&& factory, 25 : Event::Dispatcher& dispatcher, Random::RandomGenerator& random, 26 : SkyWalkingTracerStatsSharedPtr stats, uint32_t delayed_buffer_size, 27 : const std::string& token); 28 : ~TraceSegmentReporter() override; 29 : 30 : // Grpc::AsyncStreamCallbacks 31 : void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override; 32 0 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {} 33 0 : void onReceiveMessage(std::unique_ptr<skywalking::v3::Commands>&&) override {} 34 0 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} 35 : void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override; 36 : 37 : void report(TracingContextPtr tracing_context); 38 : 39 : private: 40 : /* 41 : * Flush all cached segment objects to the back-end tracing service and close the GRPC stream. 42 : */ 43 : void closeStream(); 44 : void flushTraceSegments(); 45 : void establishNewStream(); 46 : void handleFailure(); 47 : void setRetryTimer(); 48 : 49 : SkyWalkingTracerStatsSharedPtr tracing_stats_; 50 : Grpc::AsyncClient<skywalking::v3::SegmentObject, skywalking::v3::Commands> client_; 51 : Grpc::AsyncStream<skywalking::v3::SegmentObject> stream_{}; 52 : const Protobuf::MethodDescriptor& service_method_; 53 : Random::RandomGenerator& random_generator_; 54 : // If the connection is unavailable when reporting data, the created SegmentObject will be cached 55 : // in the queue, and when a new connection is established, the cached data will be reported. 56 : std::queue<skywalking::v3::SegmentObject> delayed_segments_cache_; 57 : Event::TimerPtr retry_timer_; 58 : BackOffStrategyPtr backoff_strategy_; 59 : std::string token_; 60 : uint32_t delayed_buffer_size_{0}; 61 : }; 62 : 63 : using TraceSegmentReporterPtr = std::unique_ptr<TraceSegmentReporter>; 64 : 65 : } // namespace SkyWalking 66 : } // namespace Tracers 67 : } // namespace Extensions 68 : } // namespace Envoy