Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/grpc/async_client_manager.h" 4 : 5 : #include "source/common/common/logger.h" 6 : #include "source/common/grpc/typed_async_client.h" 7 : #include "source/extensions/tracers/opentelemetry/trace_exporter.h" 8 : 9 : #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h" 10 : 11 : namespace Envoy { 12 : namespace Extensions { 13 : namespace Tracers { 14 : namespace OpenTelemetry { 15 : 16 : using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest; 17 : using opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse; 18 : 19 : /** 20 : * Exporter client for OTLP Traces. Provides abstraction on top of gRPC stream. 21 : */ 22 : class OpenTelemetryGrpcTraceExporterClient : Logger::Loggable<Logger::Id::tracing> { 23 : public: 24 : OpenTelemetryGrpcTraceExporterClient(const Grpc::RawAsyncClientSharedPtr& client, 25 : const Protobuf::MethodDescriptor& service_method) 26 0 : : client_(client), service_method_(service_method) {} 27 : 28 : struct LocalStream : public Grpc::AsyncStreamCallbacks< 29 : opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse> { 30 0 : LocalStream(OpenTelemetryGrpcTraceExporterClient& parent) : parent_(parent) {} 31 : 32 : // Grpc::AsyncStreamCallbacks 33 0 : void onCreateInitialMetadata(Http::RequestHeaderMap&) override {} 34 0 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {} 35 : void onReceiveMessage( 36 : std::unique_ptr<opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse>&&) 37 0 : override {} 38 0 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {} 39 0 : void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override { 40 0 : ASSERT(parent_.stream_ != nullptr); 41 0 : if (parent_.stream_->stream_ != nullptr) { 42 : // Only reset if we have a stream. Otherwise we had an inline failure and we will clear the 43 : // stream data in send(). 44 0 : parent_.stream_.reset(); 45 0 : } 46 0 : } 47 : 48 : OpenTelemetryGrpcTraceExporterClient& parent_; 49 : Grpc::AsyncStream<opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest> 50 : stream_{}; 51 : }; 52 : 53 0 : bool log(const ExportTraceServiceRequest& request) { 54 : // If we don't have a stream already, we need to initialize it. 55 0 : if (!stream_) { 56 0 : stream_ = std::make_unique<LocalStream>(*this); 57 0 : } 58 : 59 : // If we don't have a Grpc AsyncStream, we need to initialize it. 60 0 : if (stream_->stream_ == nullptr) { 61 0 : stream_->stream_ = 62 0 : client_->start(service_method_, *stream_, Http::AsyncClient::StreamOptions()); 63 0 : } 64 : 65 : // If we do have a Grpc AsyncStream, we can first check if we are above the write buffer, and 66 : // send message if it's ok; if we don't have a stream, we need to clear out the stream data 67 : // after stream creation failed. 68 0 : if (stream_->stream_ != nullptr) { 69 0 : if (stream_->stream_->isAboveWriteBufferHighWatermark()) { 70 0 : return false; 71 0 : } 72 0 : stream_->stream_->sendMessage(request, false); 73 0 : } else { 74 0 : stream_.reset(); 75 0 : } 76 0 : return true; 77 0 : } 78 : 79 : Grpc::AsyncClient<ExportTraceServiceRequest, ExportTraceServiceResponse> client_; 80 : std::unique_ptr<LocalStream> stream_; 81 : const Protobuf::MethodDescriptor& service_method_; 82 : }; 83 : 84 : class OpenTelemetryGrpcTraceExporter : public OpenTelemetryTraceExporter { 85 : public: 86 : OpenTelemetryGrpcTraceExporter(const Grpc::RawAsyncClientSharedPtr& client); 87 : 88 : bool log(const ExportTraceServiceRequest& request) override; 89 : 90 : private: 91 : OpenTelemetryGrpcTraceExporterClient client_; 92 : }; 93 : 94 : } // namespace OpenTelemetry 95 : } // namespace Tracers 96 : } // namespace Extensions 97 : } // namespace Envoy