LCOV - code coverage report
Current view: top level - source/common/grpc - async_client_impl.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 3 7 42.9 %
Date: 2024-01-05 06:35:25 Functions: 3 5 60.0 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <memory>
       4             : 
       5             : #include "envoy/config/core/v3/base.pb.h"
       6             : #include "envoy/config/core/v3/grpc_service.pb.h"
       7             : #include "envoy/grpc/async_client.h"
       8             : #include "envoy/stream_info/stream_info.h"
       9             : 
      10             : #include "source/common/common/linked_object.h"
      11             : #include "source/common/grpc/codec.h"
      12             : #include "source/common/grpc/typed_async_client.h"
      13             : #include "source/common/http/async_client_impl.h"
      14             : #include "source/common/router/header_parser.h"
      15             : 
      16             : namespace Envoy {
      17             : namespace Grpc {
      18             : 
      19             : class AsyncRequestImpl;
      20             : 
      21             : class AsyncStreamImpl;
      22             : using AsyncStreamImplPtr = std::unique_ptr<AsyncStreamImpl>;
      23             : 
      24             : class AsyncClientImpl final : public RawAsyncClient {
      25             : public:
      26             :   AsyncClientImpl(Upstream::ClusterManager& cm, const envoy::config::core::v3::GrpcService& config,
      27             :                   TimeSource& time_source);
      28             :   ~AsyncClientImpl() override;
      29             : 
      30             :   // Grpc::AsyncClient
      31             :   AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
      32             :                         Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks,
      33             :                         Tracing::Span& parent_span,
      34             :                         const Http::AsyncClient::RequestOptions& options) override;
      35             :   RawAsyncStream* startRaw(absl::string_view service_full_name, absl::string_view method_name,
      36             :                            RawAsyncStreamCallbacks& callbacks,
      37             :                            const Http::AsyncClient::StreamOptions& options) override;
      38           0 :   absl::string_view destination() override { return remote_cluster_name_; }
      39             : 
      40             : private:
      41             :   Upstream::ClusterManager& cm_;
      42             :   const std::string remote_cluster_name_;
      43             :   // The host header value in the http transport.
      44             :   const std::string host_name_;
      45             :   std::list<AsyncStreamImplPtr> active_streams_;
      46             :   TimeSource& time_source_;
      47             :   Router::HeaderParserPtr metadata_parser_;
      48             : 
      49             :   friend class AsyncRequestImpl;
      50             :   friend class AsyncStreamImpl;
      51             : };
      52             : 
      53             : class AsyncStreamImpl : public RawAsyncStream,
      54             :                         Http::AsyncClient::StreamCallbacks,
      55             :                         public Event::DeferredDeletable,
      56             :                         public LinkedObject<AsyncStreamImpl> {
      57             : public:
      58             :   AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
      59             :                   absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
      60             :                   const Http::AsyncClient::StreamOptions& options);
      61             : 
      62             :   virtual void initialize(bool buffer_body_for_retry);
      63             : 
      64             :   // Http::AsyncClient::StreamCallbacks
      65             :   void onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) override;
      66             :   void onData(Buffer::Instance& data, bool end_stream) override;
      67             :   void onTrailers(Http::ResponseTrailerMapPtr&& trailers) override;
      68             :   void onComplete() override;
      69             :   void onReset() override;
      70             : 
      71             :   // Grpc::AsyncStream
      72             :   void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) override;
      73             :   void closeStream() override;
      74             :   void resetStream() override;
      75           0 :   bool isAboveWriteBufferHighWatermark() const override {
      76           0 :     return stream_ && stream_->isAboveWriteBufferHighWatermark();
      77           0 :   }
      78             : 
      79          68 :   bool hasResetStream() const { return http_reset_; }
      80         122 :   const StreamInfo::StreamInfo& streamInfo() const override { return stream_->streamInfo(); }
      81             : 
      82             : private:
      83             :   void streamError(Status::GrpcStatus grpc_status, const std::string& message);
      84          28 :   void streamError(Status::GrpcStatus grpc_status) { streamError(grpc_status, EMPTY_STRING); }
      85             : 
      86             :   void cleanup();
      87             :   void trailerResponse(absl::optional<Status::GrpcStatus> grpc_status,
      88             :                        const std::string& grpc_message);
      89             : 
      90             :   Event::Dispatcher* dispatcher_{};
      91             :   Http::RequestMessagePtr headers_message_;
      92             :   AsyncClientImpl& parent_;
      93             :   std::string service_full_name_;
      94             :   std::string method_name_;
      95             :   RawAsyncStreamCallbacks& callbacks_;
      96             :   Http::AsyncClient::StreamOptions options_;
      97             :   bool http_reset_{};
      98             :   Http::AsyncClient::Stream* stream_{};
      99             :   Decoder decoder_;
     100             :   // This is a member to avoid reallocation on every onData().
     101             :   std::vector<Frame> decoded_frames_;
     102             : 
     103             :   friend class AsyncClientImpl;
     104             : };
     105             : 
     106             : class AsyncRequestImpl : public AsyncRequest, public AsyncStreamImpl, RawAsyncStreamCallbacks {
     107             : public:
     108             :   AsyncRequestImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
     109             :                    absl::string_view method_name, Buffer::InstancePtr&& request,
     110             :                    RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
     111             :                    const Http::AsyncClient::RequestOptions& options);
     112             : 
     113             :   void initialize(bool buffer_body_for_retry) override;
     114             : 
     115             :   // Grpc::AsyncRequest
     116             :   void cancel() override;
     117             : 
     118             : private:
     119             :   // Grpc::AsyncStreamCallbacks
     120             :   void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
     121             :   void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override;
     122             :   bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override;
     123             :   void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override;
     124             :   void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
     125             : 
     126             :   Buffer::InstancePtr request_;
     127             :   RawAsyncRequestCallbacks& callbacks_;
     128             :   Tracing::SpanPtr current_span_;
     129             :   Buffer::InstancePtr response_;
     130             : };
     131             : 
     132             : } // namespace Grpc
     133             : } // namespace Envoy

Generated by: LCOV version 1.15