LCOV - code coverage report
Current view: top level - source/extensions/tracers/opentelemetry - grpc_trace_exporter.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 30 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 8 0.0 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include "envoy/grpc/async_client_manager.h"
       4             : 
       5             : #include "source/common/common/logger.h"
       6             : #include "source/common/grpc/typed_async_client.h"
       7             : #include "source/extensions/tracers/opentelemetry/trace_exporter.h"
       8             : 
       9             : #include "opentelemetry/proto/collector/trace/v1/trace_service.pb.h"
      10             : 
      11             : namespace Envoy {
      12             : namespace Extensions {
      13             : namespace Tracers {
      14             : namespace OpenTelemetry {
      15             : 
      16             : using opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest;
      17             : using opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse;
      18             : 
      19             : /**
      20             :  * Exporter client for OTLP Traces. Provides abstraction on top of gRPC stream.
      21             :  */
      22             : class OpenTelemetryGrpcTraceExporterClient : Logger::Loggable<Logger::Id::tracing> {
      23             : public:
      24             :   OpenTelemetryGrpcTraceExporterClient(const Grpc::RawAsyncClientSharedPtr& client,
      25             :                                        const Protobuf::MethodDescriptor& service_method)
      26           0 :       : client_(client), service_method_(service_method) {}
      27             : 
      28             :   struct LocalStream : public Grpc::AsyncStreamCallbacks<
      29             :                            opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse> {
      30           0 :     LocalStream(OpenTelemetryGrpcTraceExporterClient& parent) : parent_(parent) {}
      31             : 
      32             :     // Grpc::AsyncStreamCallbacks
      33           0 :     void onCreateInitialMetadata(Http::RequestHeaderMap&) override {}
      34           0 :     void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override {}
      35             :     void onReceiveMessage(
      36             :         std::unique_ptr<opentelemetry::proto::collector::trace::v1::ExportTraceServiceResponse>&&)
      37           0 :         override {}
      38           0 :     void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override {}
      39           0 :     void onRemoteClose(Grpc::Status::GrpcStatus, const std::string&) override {
      40           0 :       ASSERT(parent_.stream_ != nullptr);
      41           0 :       if (parent_.stream_->stream_ != nullptr) {
      42             :         // Only reset if we have a stream. Otherwise we had an inline failure and we will clear the
      43             :         // stream data in send().
      44           0 :         parent_.stream_.reset();
      45           0 :       }
      46           0 :     }
      47             : 
      48             :     OpenTelemetryGrpcTraceExporterClient& parent_;
      49             :     Grpc::AsyncStream<opentelemetry::proto::collector::trace::v1::ExportTraceServiceRequest>
      50             :         stream_{};
      51             :   };
      52             : 
      53           0 :   bool log(const ExportTraceServiceRequest& request) {
      54             :     // If we don't have a stream already, we need to initialize it.
      55           0 :     if (!stream_) {
      56           0 :       stream_ = std::make_unique<LocalStream>(*this);
      57           0 :     }
      58             : 
      59             :     // If we don't have a Grpc AsyncStream, we need to initialize it.
      60           0 :     if (stream_->stream_ == nullptr) {
      61           0 :       stream_->stream_ =
      62           0 :           client_->start(service_method_, *stream_, Http::AsyncClient::StreamOptions());
      63           0 :     }
      64             : 
      65             :     // If we do have a Grpc AsyncStream, we can first check if we are above the write buffer, and
      66             :     // send message if it's ok; if we don't have a stream, we need to clear out the stream data
      67             :     // after stream creation failed.
      68           0 :     if (stream_->stream_ != nullptr) {
      69           0 :       if (stream_->stream_->isAboveWriteBufferHighWatermark()) {
      70           0 :         return false;
      71           0 :       }
      72           0 :       stream_->stream_->sendMessage(request, false);
      73           0 :     } else {
      74           0 :       stream_.reset();
      75           0 :     }
      76           0 :     return true;
      77           0 :   }
      78             : 
      79             :   Grpc::AsyncClient<ExportTraceServiceRequest, ExportTraceServiceResponse> client_;
      80             :   std::unique_ptr<LocalStream> stream_;
      81             :   const Protobuf::MethodDescriptor& service_method_;
      82             : };
      83             : 
      84             : class OpenTelemetryGrpcTraceExporter : public OpenTelemetryTraceExporter {
      85             : public:
      86             :   OpenTelemetryGrpcTraceExporter(const Grpc::RawAsyncClientSharedPtr& client);
      87             : 
      88             :   bool log(const ExportTraceServiceRequest& request) override;
      89             : 
      90             : private:
      91             :   OpenTelemetryGrpcTraceExporterClient client_;
      92             : };
      93             : 
      94             : } // namespace OpenTelemetry
      95             : } // namespace Tracers
      96             : } // namespace Extensions
      97             : } // namespace Envoy

Generated by: LCOV version 1.15