LCOV - code coverage report
Current view: top level - source/common/grpc - google_async_client_impl.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 21 29 72.4 %
Date: 2024-01-05 06:35:25 Functions: 10 16 62.5 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <memory>
       4             : #include <queue>
       5             : 
       6             : #include "envoy/api/api.h"
       7             : #include "envoy/common/platform.h"
       8             : #include "envoy/config/core/v3/base.pb.h"
       9             : #include "envoy/config/core/v3/grpc_service.pb.h"
      10             : #include "envoy/grpc/async_client.h"
      11             : #include "envoy/stats/scope.h"
      12             : #include "envoy/stream_info/stream_info.h"
      13             : #include "envoy/thread/thread.h"
      14             : #include "envoy/thread_local/thread_local_object.h"
      15             : #include "envoy/tracing/tracer.h"
      16             : 
      17             : #include "source/common/common/linked_object.h"
      18             : #include "source/common/common/thread.h"
      19             : #include "source/common/common/thread_annotations.h"
      20             : #include "source/common/grpc/google_grpc_context.h"
      21             : #include "source/common/grpc/stat_names.h"
      22             : #include "source/common/grpc/typed_async_client.h"
      23             : #include "source/common/router/header_parser.h"
      24             : #include "source/common/stream_info/stream_info_impl.h"
      25             : #include "source/common/tracing/http_tracer_impl.h"
      26             : 
      27             : #include "absl/container/node_hash_set.h"
      28             : #include "grpcpp/generic/generic_stub.h"
      29             : #include "grpcpp/grpcpp.h"
      30             : #include "grpcpp/support/proto_buffer_writer.h"
      31             : 
      32             : namespace Envoy {
      33             : namespace Grpc {
      34             : 
      35             : class GoogleAsyncStreamImpl;
      36             : 
      37             : using GoogleAsyncStreamImplPtr = std::unique_ptr<GoogleAsyncStreamImpl>;
      38             : 
      39             : class GoogleAsyncRequestImpl;
      40             : 
      41             : struct GoogleAsyncTag {
      42             :   // Operation defines tags that are handed to the gRPC AsyncReaderWriter for use in completion
      43             :   // notification for their namesake operations. Read* and Write* operations may be outstanding
      44             :   // simultaneously, but there will be no more than one operation of each type in-flight for a given
      45             :   // stream. Init and Finish will both be issued exclusively when no other operations are in-flight
      46             :   // for a stream. See
      47             :   // https://github.com/grpc/grpc/blob/master/include/grpc%2B%2B/impl/codegen/async_stream.h for
      48             :   // further insight into the semantics of the different gRPC client operations.
      49             :   enum Operation {
      50             :     // Initial stub call issued, waiting for initialization to complete.
      51             :     Init = 0,
      52             :     // Waiting for initial meta-data from server following Init completion.
      53             :     ReadInitialMetadata,
      54             :     // Waiting for response protobuf from server following ReadInitialMetadata completion.
      55             :     Read,
      56             :     // Waiting for write of request protobuf to server to complete.
      57             :     Write,
      58             :     // Waiting for write of request protobuf (EOS) __OR__ an EOS WritesDone to server to complete.
      59             :     WriteLast,
      60             :     // Waiting for final status. This must only be issued once all Read* and Write* operations have
      61             :     // completed.
      62             :     Finish,
      63             :   };
      64             : 
      65          12 :   GoogleAsyncTag(GoogleAsyncStreamImpl& stream, Operation op) : stream_(stream), op_(op) {}
      66             : 
      67             :   GoogleAsyncStreamImpl& stream_;
      68             :   const Operation op_;
      69             : };
      70             : 
      71             : class GoogleAsyncClientThreadLocal : public ThreadLocal::ThreadLocalObject,
      72             :                                      Logger::Loggable<Logger::Id::grpc> {
      73             : public:
      74             :   GoogleAsyncClientThreadLocal(Api::Api& api);
      75             :   ~GoogleAsyncClientThreadLocal() override;
      76             : 
      77           2 :   grpc::CompletionQueue& completionQueue() { return cq_; }
      78             : 
      79           2 :   void registerStream(GoogleAsyncStreamImpl* stream) {
      80           2 :     ASSERT(streams_.find(stream) == streams_.end());
      81           2 :     streams_.insert(stream);
      82           2 :   }
      83             : 
      84           2 :   void unregisterStream(GoogleAsyncStreamImpl* stream) {
      85           2 :     auto it = streams_.find(stream);
      86           2 :     ASSERT(it != streams_.end());
      87           2 :     streams_.erase(it);
      88           2 :   }
      89             : 
      90             : private:
      91             :   void completionThread();
      92             : 
      93             :   // There is blanket google-grpc initialization in MainCommonBase, but that
      94             :   // doesn't cover unit tests. However, putting blanket coverage in ProcessWide
      95             :   // causes background threaded memory allocation in all unit tests making it
      96             :   // hard to measure memory. Thus we also initialize grpc using our idempotent
      97             :   // wrapper-class in classes that need it. See
      98             :   // https://github.com/envoyproxy/envoy/issues/8282 for details.
      99             :   GoogleGrpcContext google_grpc_context_;
     100             : 
     101             :   // The CompletionQueue for in-flight operations. This must precede completion_thread_ to ensure it
     102             :   // is constructed before the thread runs.
     103             :   grpc::CompletionQueue cq_;
     104             :   // The threading model for the Google gRPC C++ library is not directly compatible with Envoy's
     105             :   // siloed model. We resolve this by issuing non-blocking asynchronous
     106             :   // operations on the GoogleAsyncClientImpl silo thread, and then synchronously
     107             :   // blocking on a completion queue, cq_, on a distinct thread. When cq_ events
     108             :   // are delivered, we cross-post to the silo dispatcher to continue the
     109             :   // operation.
     110             :   //
     111             :   // We have an independent completion thread for each TLS silo (i.e. one per worker and
     112             :   // also one for the main thread).
     113             :   Thread::ThreadPtr completion_thread_;
     114             :   // Track all streams that are currently using this CQ, so we can notify them
     115             :   // on shutdown.
     116             :   absl::node_hash_set<GoogleAsyncStreamImpl*> streams_;
     117             : };
     118             : 
     119             : using GoogleAsyncClientThreadLocalPtr = std::unique_ptr<GoogleAsyncClientThreadLocal>;
     120             : 
     121             : // Google gRPC client stats. TODO(htuch): consider how a wider set of stats collected by the
     122             : // library, such as the census related ones, can be externalized as needed.
     123             : struct GoogleAsyncClientStats {
     124             :   // .streams_total
     125             :   Stats::Counter* streams_total_;
     126             :   // .streams_closed_<gRPC status code>
     127             :   std::array<Stats::Counter*, Status::WellKnownGrpcStatus::MaximumKnown + 1> streams_closed_;
     128             : };
     129             : 
     130             : // Interface to allow the gRPC stub to be mocked out by tests.
     131             : class GoogleStub {
     132             : public:
     133           2 :   virtual ~GoogleStub() = default;
     134             : 
     135             :   // See grpc::PrepareCall().
     136             :   virtual std::unique_ptr<grpc::GenericClientAsyncReaderWriter>
     137             :   PrepareCall(grpc::ClientContext* context, const grpc::string& method,
     138             :               grpc::CompletionQueue* cq) PURE;
     139             : };
     140             : 
     141             : using GoogleStubSharedPtr = std::shared_ptr<GoogleStub>;
     142             : 
     143             : class GoogleGenericStub : public GoogleStub {
     144             : public:
     145           2 :   GoogleGenericStub(std::shared_ptr<grpc::Channel> channel) : stub_(channel) {}
     146             : 
     147             :   std::unique_ptr<grpc::GenericClientAsyncReaderWriter>
     148             :   PrepareCall(grpc::ClientContext* context, const grpc::string& method,
     149           2 :               grpc::CompletionQueue* cq) override {
     150           2 :     return stub_.PrepareCall(context, method, cq);
     151           2 :   }
     152             : 
     153             : private:
     154             :   grpc::GenericStub stub_;
     155             : };
     156             : 
     157             : // Interface to allow the gRPC stub creation to be mocked out by tests.
     158             : class GoogleStubFactory {
     159             : public:
     160           2 :   virtual ~GoogleStubFactory() = default;
     161             : 
     162             :   // Create a stub from a given channel.
     163             :   virtual GoogleStubSharedPtr createStub(std::shared_ptr<grpc::Channel> channel) PURE;
     164             : };
     165             : 
     166             : class GoogleGenericStubFactory : public GoogleStubFactory {
     167             : public:
     168           2 :   GoogleStubSharedPtr createStub(std::shared_ptr<grpc::Channel> channel) override {
     169           2 :     return std::make_shared<GoogleGenericStub>(channel);
     170           2 :   }
     171             : };
     172             : 
     173             : // Google gRPC C++ client library implementation of Grpc::AsyncClient.
     174             : class GoogleAsyncClientImpl final : public RawAsyncClient, Logger::Loggable<Logger::Id::grpc> {
     175             : public:
     176             :   GoogleAsyncClientImpl(Event::Dispatcher& dispatcher, GoogleAsyncClientThreadLocal& tls,
     177             :                         GoogleStubFactory& stub_factory, Stats::ScopeSharedPtr scope,
     178             :                         const envoy::config::core::v3::GrpcService& config, Api::Api& api,
     179             :                         const StatNames& stat_names);
     180             :   ~GoogleAsyncClientImpl() override;
     181             : 
     182             :   // Grpc::AsyncClient
     183             :   AsyncRequest* sendRaw(absl::string_view service_full_name, absl::string_view method_name,
     184             :                         Buffer::InstancePtr&& request, RawAsyncRequestCallbacks& callbacks,
     185             :                         Tracing::Span& parent_span,
     186             :                         const Http::AsyncClient::RequestOptions& options) override;
     187             :   RawAsyncStream* startRaw(absl::string_view service_full_name, absl::string_view method_name,
     188             :                            RawAsyncStreamCallbacks& callbacks,
     189             :                            const Http::AsyncClient::StreamOptions& options) override;
     190           0 :   absl::string_view destination() override { return target_uri_; }
     191             : 
     192           0 :   TimeSource& timeSource() { return dispatcher_.timeSource(); }
     193           0 :   uint64_t perStreamBufferLimitBytes() const { return per_stream_buffer_limit_bytes_; }
     194             : 
     195             : private:
     196             :   Event::Dispatcher& dispatcher_;
     197             :   GoogleAsyncClientThreadLocal& tls_;
     198             :   // This is shared with child streams, so that they can cleanup independent of
     199             :   // the client if it gets destructed. The streams need to wait for their tags
     200             :   // to drain from the CQ.
     201             :   GoogleStubSharedPtr stub_;
     202             :   std::list<GoogleAsyncStreamImplPtr> active_streams_;
     203             :   const std::string stat_prefix_;
     204             :   const std::string target_uri_;
     205             :   Stats::ScopeSharedPtr scope_;
     206             :   GoogleAsyncClientStats stats_;
     207             :   uint64_t per_stream_buffer_limit_bytes_;
     208             :   Router::HeaderParserPtr metadata_parser_;
     209             : 
     210             :   friend class GoogleAsyncClientThreadLocal;
     211             :   friend class GoogleAsyncRequestImpl;
     212             :   friend class GoogleAsyncStreamImpl;
     213             : };
     214             : 
     215             : class GoogleAsyncStreamImpl : public RawAsyncStream,
     216             :                               public Event::DeferredDeletable,
     217             :                               Logger::Loggable<Logger::Id::grpc>,
     218             :                               public LinkedObject<GoogleAsyncStreamImpl> {
     219             : public:
     220             :   GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
     221             :                         absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
     222             :                         const Http::AsyncClient::StreamOptions& options);
     223             :   ~GoogleAsyncStreamImpl() override;
     224             : 
     225             :   virtual void initialize(bool buffer_body_for_retry);
     226             : 
     227             :   // Grpc::RawAsyncStream
     228             :   void sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) override;
     229             :   void closeStream() override;
     230             :   void resetStream() override;
     231             :   // While the Google-gRPC code doesn't use Envoy watermark buffers, the logical
     232             :   // analog is to make sure that the aren't too many bytes in the pending write
     233             :   // queue.
     234           0 :   bool isAboveWriteBufferHighWatermark() const override {
     235           0 :     return bytes_in_write_pending_queue_ > parent_.perStreamBufferLimitBytes();
     236           0 :   }
     237           0 :   const StreamInfo::StreamInfo& streamInfo() const override { return unused_stream_info_; }
     238             : 
     239             : protected:
     240           2 :   bool callFailed() const { return call_failed_; }
     241             : 
     242             : private:
     243             :   // Process queued events in completed_ops_ with handleOpCompletion() on
     244             :   // GoogleAsyncClient silo thread.
     245             :   void onCompletedOps();
     246             :   // Handle Operation completion on GoogleAsyncClient silo thread. This is posted by
     247             :   // GoogleAsyncClientThreadLocal::completionThread() when a message is received on cq_.
     248             :   void handleOpCompletion(GoogleAsyncTag::Operation op, bool ok);
     249             :   // Convert from Google gRPC client std::multimap metadata to Envoy Http::HeaderMap.
     250             :   void metadataTranslate(const std::multimap<grpc::string_ref, grpc::string_ref>& grpc_metadata,
     251             :                          Http::HeaderMap& header_map);
     252             :   // Write the first PendingMessage in the write queue if non-empty.
     253             :   void writeQueued();
     254             :   // Deliver notification and update stats when the connection closes.
     255             :   void notifyRemoteClose(Status::GrpcStatus grpc_status,
     256             :                          Http::ResponseTrailerMapPtr trailing_metadata, const std::string& message);
     257             :   // Schedule stream for deferred deletion.
     258             :   void deferredDelete();
     259             :   // Cleanup and schedule stream for deferred deletion if no inflight
     260             :   // completions.
     261             :   void cleanup();
     262             : 
     263             :   // Pending serialized message on write queue. Only one Operation::Write is in-flight at any
     264             :   // point-in-time, so we queue pending writes here.
     265             :   struct PendingMessage {
     266             :     PendingMessage(Buffer::InstancePtr request, bool end_stream);
     267             :     // End-of-stream with no additional message.
     268           0 :     PendingMessage() = default;
     269             : 
     270             :     const absl::optional<grpc::ByteBuffer> buf_{};
     271             :     const bool end_stream_{true};
     272             :   };
     273             : 
     274             :   GoogleAsyncTag init_tag_{*this, GoogleAsyncTag::Operation::Init};
     275             :   GoogleAsyncTag read_initial_metadata_tag_{*this, GoogleAsyncTag::Operation::ReadInitialMetadata};
     276             :   GoogleAsyncTag read_tag_{*this, GoogleAsyncTag::Operation::Read};
     277             :   GoogleAsyncTag write_tag_{*this, GoogleAsyncTag::Operation::Write};
     278             :   GoogleAsyncTag write_last_tag_{*this, GoogleAsyncTag::Operation::WriteLast};
     279             :   GoogleAsyncTag finish_tag_{*this, GoogleAsyncTag::Operation::Finish};
     280             : 
     281             :   GoogleAsyncClientImpl& parent_;
     282             :   GoogleAsyncClientThreadLocal& tls_;
     283             :   // Latch our own version of this reference, so that completionThread() doesn't
     284             :   // try and access via parent_, which might not exist in teardown. We assume
     285             :   // that the dispatcher lives longer than completionThread() life, which should
     286             :   // hold for the expected server object lifetimes.
     287             :   Event::Dispatcher& dispatcher_;
     288             :   // We hold a ref count on the stub_ to allow the stream to wait for its tags
     289             :   // to drain from the CQ on cleanup.
     290             :   GoogleStubSharedPtr stub_;
     291             :   std::string service_full_name_;
     292             :   std::string method_name_;
     293             :   RawAsyncStreamCallbacks& callbacks_;
     294             :   const Http::AsyncClient::StreamOptions& options_;
     295             :   grpc::ClientContext ctxt_;
     296             :   std::unique_ptr<grpc::GenericClientAsyncReaderWriter> rw_;
     297             :   std::queue<PendingMessage> write_pending_queue_;
     298             :   uint64_t bytes_in_write_pending_queue_{};
     299             :   grpc::ByteBuffer read_buf_;
     300             :   grpc::Status status_;
     301             :   // Has Operation::Init completed?
     302             :   bool call_initialized_{};
     303             :   // Did the stub Call fail? If this is true, no Operation::Init completion will ever occur.
     304             :   bool call_failed_{};
     305             :   // Is there an Operation::Write[Last] in-flight?
     306             :   bool write_pending_{};
     307             :   // Is an Operation::Finish in-flight?
     308             :   bool finish_pending_{};
     309             :   // Have we entered CQ draining state? If so, we're just waiting for all our
     310             :   // ops on the CQ to drain away before freeing the stream.
     311             :   bool draining_cq_{};
     312             :   // Count of the tags in-flight. This must hit zero before the stream can be
     313             :   // freed.
     314             :   uint32_t inflight_tags_{};
     315             : 
     316             :   // This is unused.
     317             :   StreamInfo::StreamInfoImpl unused_stream_info_;
     318             : 
     319             :   // Queue of completed (op, ok) passed from completionThread() to
     320             :   // handleOpCompletion().
     321             :   std::deque<std::pair<GoogleAsyncTag::Operation, bool>>
     322             :       completed_ops_ ABSL_GUARDED_BY(completed_ops_lock_);
     323             :   Thread::MutexBasicLockable completed_ops_lock_;
     324             : 
     325             :   friend class GoogleAsyncClientImpl;
     326             :   friend class GoogleAsyncClientThreadLocal;
     327             : };
     328             : 
     329             : class GoogleAsyncRequestImpl : public AsyncRequest,
     330             :                                public GoogleAsyncStreamImpl,
     331             :                                RawAsyncStreamCallbacks {
     332             : public:
     333             :   GoogleAsyncRequestImpl(GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
     334             :                          absl::string_view method_name, Buffer::InstancePtr request,
     335             :                          RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
     336             :                          const Http::AsyncClient::RequestOptions& options);
     337             : 
     338             :   void initialize(bool buffer_body_for_retry) override;
     339             : 
     340             :   // Grpc::AsyncRequest
     341             :   void cancel() override;
     342             : 
     343             : private:
     344             :   // Grpc::RawAsyncStreamCallbacks
     345             :   void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
     346             :   void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) override;
     347             :   bool onReceiveMessageRaw(Buffer::InstancePtr&& response) override;
     348             :   void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) override;
     349             :   void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
     350             : 
     351             :   Buffer::InstancePtr request_;
     352             :   RawAsyncRequestCallbacks& callbacks_;
     353             :   Tracing::SpanPtr current_span_;
     354             :   Buffer::InstancePtr response_;
     355             : };
     356             : 
     357             : } // namespace Grpc
     358             : } // namespace Envoy

Generated by: LCOV version 1.15