LCOV - code coverage report
Current view: top level - source/common/grpc - google_async_client_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 160 329 48.6 %
Date: 2024-01-05 06:35:25 Functions: 20 32 62.5 %

          Line data    Source code
       1             : #include "source/common/grpc/google_async_client_impl.h"
       2             : 
       3             : #include "envoy/common/time.h"
       4             : #include "envoy/config/core/v3/grpc_service.pb.h"
       5             : #include "envoy/http/protocol.h"
       6             : #include "envoy/stats/scope.h"
       7             : 
       8             : #include "source/common/common/base64.h"
       9             : #include "source/common/common/empty_string.h"
      10             : #include "source/common/common/lock_guard.h"
      11             : #include "source/common/common/utility.h"
      12             : #include "source/common/config/datasource.h"
      13             : #include "source/common/grpc/common.h"
      14             : #include "source/common/grpc/google_grpc_creds_impl.h"
      15             : #include "source/common/grpc/google_grpc_utils.h"
      16             : #include "source/common/router/header_parser.h"
      17             : #include "source/common/tracing/http_tracer_impl.h"
      18             : 
      19             : #include "absl/strings/str_cat.h"
      20             : #include "grpcpp/support/proto_buffer_reader.h"
      21             : 
      22             : namespace Envoy {
      23             : namespace Grpc {
      24             : namespace {
      25             : static constexpr int DefaultBufferLimitBytes = 1024 * 1024;
      26             : }
      27             : 
      28             : GoogleAsyncClientThreadLocal::GoogleAsyncClientThreadLocal(Api::Api& api)
      29         226 :     : completion_thread_(api.threadFactory().createThread([this] { completionThread(); },
      30         226 :                                                           Thread::Options{"GrpcGoogClient"})) {}
      31             : 
      32         226 : GoogleAsyncClientThreadLocal::~GoogleAsyncClientThreadLocal() {
      33             :   // Force streams to shutdown and invoke TryCancel() to start the drain of
      34             :   // pending op. If we don't do this, Shutdown() below can jam on pending ops.
      35             :   // This is also required to satisfy the contract that once Shutdown is called,
      36             :   // streams no longer queue any additional tags.
      37         228 :   for (auto it = streams_.begin(); it != streams_.end();) {
      38             :     // resetStream() may result in immediate unregisterStream() and erase(),
      39             :     // which would invalidate the iterator for the current element, so make sure
      40             :     // we point to the next one first.
      41           2 :     (*it++)->resetStream();
      42           2 :   }
      43         226 :   cq_.Shutdown();
      44         226 :   ENVOY_LOG(debug, "Joining completionThread");
      45         226 :   completion_thread_->join();
      46         226 :   ENVOY_LOG(debug, "Joined completionThread");
      47             :   // Ensure that we have cleaned up all orphan streams, now that CQ is gone.
      48         228 :   while (!streams_.empty()) {
      49           2 :     (*streams_.begin())->onCompletedOps();
      50           2 :   }
      51         226 : }
      52             : 
      53         226 : void GoogleAsyncClientThreadLocal::completionThread() {
      54         226 :   ENVOY_LOG(debug, "completionThread running");
      55         226 :   void* tag;
      56         226 :   bool ok;
      57         230 :   while (cq_.Next(&tag, &ok)) {
      58           4 :     const auto& google_async_tag = *reinterpret_cast<GoogleAsyncTag*>(tag);
      59           4 :     const GoogleAsyncTag::Operation op = google_async_tag.op_;
      60           4 :     GoogleAsyncStreamImpl& stream = google_async_tag.stream_;
      61           4 :     ENVOY_LOG(trace, "completionThread CQ event {} {}", op, ok);
      62           4 :     Thread::LockGuard lock(stream.completed_ops_lock_);
      63             : 
      64             :     // It's an invariant that there must only be one pending post for arbitrary
      65             :     // length completed_ops_, otherwise we can race in stream destruction, where
      66             :     // we process multiple events in onCompletedOps() but have only partially
      67             :     // consumed the posts on the dispatcher.
      68             :     // TODO(htuch): This may result in unbounded processing on the silo thread
      69             :     // in onCompletedOps() in extreme cases, when we emplace_back() in
      70             :     // completionThread() at a high rate, consider bounding the length of such
      71             :     // sequences if this behavior becomes an issue.
      72           4 :     if (stream.completed_ops_.empty()) {
      73           3 :       stream.dispatcher_.post([&stream] { stream.onCompletedOps(); });
      74           3 :     }
      75           4 :     stream.completed_ops_.emplace_back(op, ok);
      76           4 :   }
      77         226 :   ENVOY_LOG(debug, "completionThread exiting");
      78         226 : }
      79             : 
      80             : GoogleAsyncClientImpl::GoogleAsyncClientImpl(Event::Dispatcher& dispatcher,
      81             :                                              GoogleAsyncClientThreadLocal& tls,
      82             :                                              GoogleStubFactory& stub_factory,
      83             :                                              Stats::ScopeSharedPtr scope,
      84             :                                              const envoy::config::core::v3::GrpcService& config,
      85             :                                              Api::Api& api, const StatNames& stat_names)
      86             :     : dispatcher_(dispatcher), tls_(tls), stat_prefix_(config.google_grpc().stat_prefix()),
      87             :       target_uri_(config.google_grpc().target_uri()), scope_(scope),
      88             :       per_stream_buffer_limit_bytes_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
      89             :           config.google_grpc(), per_stream_buffer_limit_bytes, DefaultBufferLimitBytes)),
      90             :       metadata_parser_(Router::HeaderParser::configure(
      91             :           config.initial_metadata(),
      92           2 :           envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD)) {
      93             :   // We rebuild the channel each time we construct the channel. It appears that the gRPC library is
      94             :   // smart enough to do connection pooling and reuse with identical channel args, so this should
      95             :   // have comparable overhead to what we are doing in Grpc::AsyncClientImpl, i.e. no expensive
      96             :   // new connection implied.
      97           2 :   std::shared_ptr<grpc::Channel> channel = GoogleGrpcUtils::createChannel(config, api);
      98             :   // Get state with try_to_connect = true to try connection at channel creation.
      99             :   // This is for initializing gRPC channel at channel creation. This GetState(true) is used to poke
     100             :   // the gRPC lb at channel creation, it doesn't have any effect no matter it succeeds or fails. But
     101             :   // it helps on initialization. Otherwise, the channel establishment still happens at the first
     102             :   // request, no matter when we create the channel.
     103           2 :   channel->GetState(true);
     104           2 :   stub_ = stub_factory.createStub(channel);
     105           2 :   scope_->counterFromStatName(stat_names.google_grpc_client_creation_).inc();
     106             :   // Initialize client stats.
     107             :   // TODO(jmarantz): Capture these names in async_client_manager_impl.cc and
     108             :   // pass in a struct of StatName objects so we don't have to take locks here.
     109           2 :   stats_.streams_total_ = &scope_->counterFromStatName(stat_names.streams_total_);
     110          36 :   for (uint32_t i = 0; i <= Status::WellKnownGrpcStatus::MaximumKnown; ++i) {
     111          34 :     stats_.streams_closed_[i] = &scope_->counterFromStatName(stat_names.streams_closed_[i]);
     112          34 :   }
     113           2 : }
     114             : 
     115           2 : GoogleAsyncClientImpl::~GoogleAsyncClientImpl() {
     116           2 :   ASSERT(isThreadSafe());
     117           2 :   ENVOY_LOG(debug, "Client teardown, resetting streams");
     118           2 :   while (!active_streams_.empty()) {
     119           0 :     active_streams_.front()->resetStream();
     120           0 :   }
     121           2 : }
     122             : 
     123             : AsyncRequest* GoogleAsyncClientImpl::sendRaw(absl::string_view service_full_name,
     124             :                                              absl::string_view method_name,
     125             :                                              Buffer::InstancePtr&& request,
     126             :                                              RawAsyncRequestCallbacks& callbacks,
     127             :                                              Tracing::Span& parent_span,
     128           0 :                                              const Http::AsyncClient::RequestOptions& options) {
     129           0 :   ASSERT(isThreadSafe());
     130           0 :   auto* const async_request = new GoogleAsyncRequestImpl(
     131           0 :       *this, service_full_name, method_name, std::move(request), callbacks, parent_span, options);
     132           0 :   GoogleAsyncStreamImplPtr grpc_stream{async_request};
     133             : 
     134           0 :   grpc_stream->initialize(true);
     135           0 :   if (grpc_stream->callFailed()) {
     136           0 :     return nullptr;
     137           0 :   }
     138             : 
     139           0 :   LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
     140           0 :   return async_request;
     141           0 : }
     142             : 
     143             : RawAsyncStream* GoogleAsyncClientImpl::startRaw(absl::string_view service_full_name,
     144             :                                                 absl::string_view method_name,
     145             :                                                 RawAsyncStreamCallbacks& callbacks,
     146           2 :                                                 const Http::AsyncClient::StreamOptions& options) {
     147           2 :   ASSERT(isThreadSafe());
     148           2 :   auto grpc_stream = std::make_unique<GoogleAsyncStreamImpl>(*this, service_full_name, method_name,
     149           2 :                                                              callbacks, options);
     150             : 
     151           2 :   grpc_stream->initialize(false);
     152           2 :   if (grpc_stream->callFailed()) {
     153           0 :     return nullptr;
     154           0 :   }
     155             : 
     156           2 :   LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
     157           2 :   return active_streams_.front().get();
     158           2 : }
     159             : 
     160             : GoogleAsyncStreamImpl::GoogleAsyncStreamImpl(GoogleAsyncClientImpl& parent,
     161             :                                              absl::string_view service_full_name,
     162             :                                              absl::string_view method_name,
     163             :                                              RawAsyncStreamCallbacks& callbacks,
     164             :                                              const Http::AsyncClient::StreamOptions& options)
     165             :     : parent_(parent), tls_(parent_.tls_), dispatcher_(parent_.dispatcher_), stub_(parent_.stub_),
     166             :       service_full_name_(service_full_name), method_name_(method_name), callbacks_(callbacks),
     167             :       options_(options), unused_stream_info_(Http::Protocol::Http2, dispatcher_.timeSource(),
     168           2 :                                              Network::ConnectionInfoProviderSharedPtr{}) {}
     169             : 
     170           2 : GoogleAsyncStreamImpl::~GoogleAsyncStreamImpl() {
     171           2 :   ENVOY_LOG(debug, "GoogleAsyncStreamImpl destruct");
     172           2 : }
     173             : 
     174             : GoogleAsyncStreamImpl::PendingMessage::PendingMessage(Buffer::InstancePtr request, bool end_stream)
     175           1 :     : buf_(GoogleGrpcUtils::makeByteBuffer(std::move(request))), end_stream_(end_stream) {}
     176             : 
     177             : // TODO(htuch): figure out how to propagate "this request should be buffered for
     178             : // retry" bit to Google gRPC library.
     179           2 : void GoogleAsyncStreamImpl::initialize(bool /*buffer_body_for_retry*/) {
     180           2 :   parent_.stats_.streams_total_->inc();
     181           2 :   gpr_timespec abs_deadline =
     182           2 :       options_.timeout
     183           2 :           ? gpr_time_add(gpr_now(GPR_CLOCK_REALTIME),
     184           0 :                          gpr_time_from_millis(options_.timeout.value().count(), GPR_TIMESPAN))
     185           2 :           : gpr_inf_future(GPR_CLOCK_REALTIME);
     186           2 :   ctxt_.set_deadline(abs_deadline);
     187             :   // Fill service-wide initial metadata.
     188           2 :   auto initial_metadata = Http::RequestHeaderMapImpl::create();
     189             :   // TODO(cpakulski): Find a better way to access requestHeaders
     190             :   // request headers should not be stored in stream_info.
     191             :   // Maybe put it to parent_context?
     192           2 :   parent_.metadata_parser_->evaluateHeaders(*initial_metadata, options_.parent_context.stream_info);
     193           2 :   callbacks_.onCreateInitialMetadata(*initial_metadata);
     194           2 :   initial_metadata->iterate([this](const Http::HeaderEntry& header) {
     195           0 :     ctxt_.AddMetadata(std::string(header.key().getStringView()),
     196           0 :                       std::string(header.value().getStringView()));
     197           0 :     return Http::HeaderMap::Iterate::Continue;
     198           0 :   });
     199             :   // Invoke stub call.
     200           2 :   rw_ = parent_.stub_->PrepareCall(&ctxt_, "/" + service_full_name_ + "/" + method_name_,
     201           2 :                                    &parent_.tls_.completionQueue());
     202           2 :   if (rw_ == nullptr) {
     203           0 :     notifyRemoteClose(Status::WellKnownGrpcStatus::Unavailable, nullptr, EMPTY_STRING);
     204           0 :     call_failed_ = true;
     205           0 :     return;
     206           0 :   }
     207           2 :   parent_.tls_.registerStream(this);
     208           2 :   rw_->StartCall(&init_tag_);
     209           2 :   ++inflight_tags_;
     210           2 : }
     211             : 
     212             : void GoogleAsyncStreamImpl::notifyRemoteClose(Status::GrpcStatus grpc_status,
     213             :                                               Http::ResponseTrailerMapPtr trailing_metadata,
     214           1 :                                               const std::string& message) {
     215           1 :   if (grpc_status > Status::WellKnownGrpcStatus::MaximumKnown || grpc_status < 0) {
     216           0 :     ENVOY_LOG(error, "notifyRemoteClose invalid gRPC status code {}", grpc_status);
     217             :     // Set the grpc_status as InvalidCode but increment the Unknown stream to avoid out-of-range
     218             :     // crash..
     219           0 :     grpc_status = Status::WellKnownGrpcStatus::InvalidCode;
     220           0 :     parent_.stats_.streams_closed_[Status::WellKnownGrpcStatus::Unknown]->inc();
     221           1 :   } else {
     222           1 :     parent_.stats_.streams_closed_[grpc_status]->inc();
     223           1 :   }
     224           1 :   ENVOY_LOG(debug, "notifyRemoteClose {} {}", grpc_status, message);
     225           1 :   callbacks_.onReceiveTrailingMetadata(trailing_metadata ? std::move(trailing_metadata)
     226           1 :                                                          : Http::ResponseTrailerMapImpl::create());
     227           1 :   callbacks_.onRemoteClose(grpc_status, message);
     228           1 : }
     229             : 
     230           1 : void GoogleAsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& request, bool end_stream) {
     231           1 :   write_pending_queue_.emplace(std::move(request), end_stream);
     232           1 :   ENVOY_LOG(trace, "Queued message to write ({} bytes)",
     233           1 :             write_pending_queue_.back().buf_.value().Length());
     234           1 :   bytes_in_write_pending_queue_ += write_pending_queue_.back().buf_.value().Length();
     235           1 :   writeQueued();
     236           1 : }
     237             : 
     238           0 : void GoogleAsyncStreamImpl::closeStream() {
     239             :   // Empty EOS write queued.
     240           0 :   write_pending_queue_.emplace();
     241           0 :   writeQueued();
     242           0 : }
     243             : 
     244           3 : void GoogleAsyncStreamImpl::resetStream() {
     245           3 :   ENVOY_LOG(debug, "resetStream");
     246             :   // The gRPC API requires calling Finish() at the end of a stream, even
     247             :   // if the stream is cancelled.
     248           3 :   if (!finish_pending_) {
     249           2 :     finish_pending_ = true;
     250           2 :     rw_->Finish(&status_, &finish_tag_);
     251           2 :     ++inflight_tags_;
     252           2 :   }
     253           3 :   cleanup();
     254           3 : }
     255             : 
     256           1 : void GoogleAsyncStreamImpl::writeQueued() {
     257           1 :   if (!call_initialized_ || finish_pending_ || write_pending_ || write_pending_queue_.empty() ||
     258           1 :       draining_cq_) {
     259           1 :     return;
     260           1 :   }
     261           0 :   write_pending_ = true;
     262           0 :   const PendingMessage& msg = write_pending_queue_.front();
     263             : 
     264           0 :   if (!msg.buf_) {
     265           0 :     ASSERT(msg.end_stream_);
     266           0 :     rw_->WritesDone(&write_last_tag_);
     267           0 :     ++inflight_tags_;
     268           0 :   } else if (msg.end_stream_) {
     269           0 :     grpc::WriteOptions write_options;
     270           0 :     rw_->WriteLast(msg.buf_.value(), write_options, &write_last_tag_);
     271           0 :     ++inflight_tags_;
     272           0 :   } else {
     273           0 :     rw_->Write(msg.buf_.value(), &write_tag_);
     274           0 :     ++inflight_tags_;
     275           0 :   }
     276           0 :   ENVOY_LOG(trace, "Write op dispatched");
     277           0 : }
     278             : 
     279           3 : void GoogleAsyncStreamImpl::onCompletedOps() {
     280             :   // The items in completed_ops_ execute in the order they were originally added to the queue since
     281             :   // both the post callback scheduled by the completionThread and the deferred deletion of the
     282             :   // GoogleAsyncClientThreadLocal happen on the dispatcher thread.
     283           3 :   std::deque<std::pair<GoogleAsyncTag::Operation, bool>> completed_ops;
     284           3 :   {
     285           3 :     Thread::LockGuard lock(completed_ops_lock_);
     286           3 :     completed_ops = std::move(completed_ops_);
     287             :     // completed_ops_ should be empty after the move.
     288           3 :     ASSERT(completed_ops_.empty());
     289           3 :   }
     290             : 
     291           7 :   while (!completed_ops.empty()) {
     292           4 :     GoogleAsyncTag::Operation op;
     293           4 :     bool ok;
     294           4 :     std::tie(op, ok) = completed_ops.front();
     295           4 :     completed_ops.pop_front();
     296           4 :     handleOpCompletion(op, ok);
     297           4 :   }
     298           3 : }
     299             : 
     300           4 : void GoogleAsyncStreamImpl::handleOpCompletion(GoogleAsyncTag::Operation op, bool ok) {
     301           4 :   ENVOY_LOG(trace, "handleOpCompletion op={} ok={} inflight={}", op, ok, inflight_tags_);
     302           4 :   ASSERT(inflight_tags_ > 0);
     303           4 :   --inflight_tags_;
     304           4 :   if (draining_cq_) {
     305           3 :     if (inflight_tags_ == 0) {
     306           2 :       deferredDelete();
     307           2 :     }
     308             :     // Ignore op completions while draining CQ.
     309           3 :     return;
     310           3 :   }
     311             :   // Consider failure cases first.
     312           1 :   if (!ok) {
     313             :     // Early fails can be just treated as Internal.
     314           1 :     if (op == GoogleAsyncTag::Operation::Init ||
     315           1 :         op == GoogleAsyncTag::Operation::ReadInitialMetadata) {
     316           1 :       notifyRemoteClose(Status::WellKnownGrpcStatus::Internal, nullptr, EMPTY_STRING);
     317           1 :       resetStream();
     318           1 :       return;
     319           1 :     }
     320             :     // Remote server has closed, we can pick up some meaningful status.
     321             :     // TODO(htuch): We're assuming here that a failed Write/WriteLast operation will result in
     322             :     // stream termination, and pick up on the failed Read here. Confirm that this assumption is
     323             :     // valid.
     324           0 :     if (op == GoogleAsyncTag::Operation::Read) {
     325           0 :       finish_pending_ = true;
     326           0 :       rw_->Finish(&status_, &finish_tag_);
     327           0 :       ++inflight_tags_;
     328           0 :     }
     329           0 :     return;
     330           1 :   }
     331           0 :   switch (op) {
     332           0 :   case GoogleAsyncTag::Operation::Init: {
     333           0 :     ASSERT(ok);
     334           0 :     ASSERT(!call_initialized_);
     335           0 :     call_initialized_ = true;
     336           0 :     rw_->ReadInitialMetadata(&read_initial_metadata_tag_);
     337           0 :     ++inflight_tags_;
     338           0 :     writeQueued();
     339           0 :     break;
     340           0 :   }
     341           0 :   case GoogleAsyncTag::Operation::ReadInitialMetadata: {
     342           0 :     ASSERT(ok);
     343           0 :     ASSERT(call_initialized_);
     344           0 :     rw_->Read(&read_buf_, &read_tag_);
     345           0 :     ++inflight_tags_;
     346           0 :     Http::ResponseHeaderMapPtr initial_metadata = Http::ResponseHeaderMapImpl::create();
     347           0 :     metadataTranslate(ctxt_.GetServerInitialMetadata(), *initial_metadata);
     348           0 :     callbacks_.onReceiveInitialMetadata(std::move(initial_metadata));
     349           0 :     break;
     350           0 :   }
     351           0 :   case GoogleAsyncTag::Operation::Write: {
     352           0 :     ASSERT(ok);
     353           0 :     write_pending_ = false;
     354           0 :     bytes_in_write_pending_queue_ -= write_pending_queue_.front().buf_.value().Length();
     355           0 :     write_pending_queue_.pop();
     356           0 :     writeQueued();
     357           0 :     break;
     358           0 :   }
     359           0 :   case GoogleAsyncTag::Operation::WriteLast: {
     360           0 :     ASSERT(ok);
     361           0 :     write_pending_ = false;
     362           0 :     break;
     363           0 :   }
     364           0 :   case GoogleAsyncTag::Operation::Read: {
     365           0 :     ASSERT(ok);
     366           0 :     auto buffer = GoogleGrpcUtils::makeBufferInstance(read_buf_);
     367           0 :     if (!buffer || !callbacks_.onReceiveMessageRaw(std::move(buffer))) {
     368             :       // This is basically streamError in Grpc::AsyncClientImpl.
     369           0 :       notifyRemoteClose(Status::WellKnownGrpcStatus::Internal, nullptr, EMPTY_STRING);
     370           0 :       resetStream();
     371           0 :       break;
     372           0 :     }
     373           0 :     rw_->Read(&read_buf_, &read_tag_);
     374           0 :     ++inflight_tags_;
     375           0 :     break;
     376           0 :   }
     377           0 :   case GoogleAsyncTag::Operation::Finish: {
     378           0 :     ASSERT(finish_pending_);
     379           0 :     ENVOY_LOG(debug, "Finish with grpc-status code {}", status_.error_code());
     380           0 :     Http::ResponseTrailerMapPtr trailing_metadata = Http::ResponseTrailerMapImpl::create();
     381           0 :     metadataTranslate(ctxt_.GetServerTrailingMetadata(), *trailing_metadata);
     382           0 :     notifyRemoteClose(static_cast<Status::GrpcStatus>(status_.error_code()),
     383           0 :                       std::move(trailing_metadata), status_.error_message());
     384           0 :     cleanup();
     385           0 :     break;
     386           0 :   }
     387           0 :   }
     388           0 : }
     389             : 
     390             : void GoogleAsyncStreamImpl::metadataTranslate(
     391             :     const std::multimap<grpc::string_ref, grpc::string_ref>& grpc_metadata,
     392           0 :     Http::HeaderMap& header_map) {
     393             :   // More painful copying, this time due to the mismatch in header
     394             :   // representation data structures in Envoy and Google gRPC.
     395           0 :   for (const auto& it : grpc_metadata) {
     396           0 :     auto key = Http::LowerCaseString(std::string(it.first.data(), it.first.size()));
     397           0 :     if (absl::EndsWith(key.get(), "-bin")) {
     398           0 :       auto value = Base64::encode(it.second.data(), it.second.size());
     399           0 :       header_map.addCopy(key, value);
     400           0 :       continue;
     401           0 :     }
     402           0 :     header_map.addCopy(key, std::string(it.second.data(), it.second.size()));
     403           0 :   }
     404           0 : }
     405             : 
     406           2 : void GoogleAsyncStreamImpl::deferredDelete() {
     407           2 :   ENVOY_LOG(debug, "Deferred delete");
     408           2 :   tls_.unregisterStream(this);
     409             :   // We only get here following cleanup(), which has performed a
     410             :   // remoteFromList(), resulting in self-ownership of the object's memory.
     411             :   // Hence, it is safe here to create a unique_ptr to this and transfer
     412             :   // ownership to dispatcher_.deferredDelete(). After this call, no further
     413             :   // methods may be invoked on this object.
     414           2 :   dispatcher_.deferredDelete(GoogleAsyncStreamImplPtr(this));
     415           2 : }
     416             : 
     417           3 : void GoogleAsyncStreamImpl::cleanup() {
     418           3 :   ENVOY_LOG(debug, "Stream cleanup with {} in-flight tags", inflight_tags_);
     419             :   // We can get here if the client has already issued resetStream() and, while
     420             :   // this is in progress, the destructor runs.
     421           3 :   if (draining_cq_) {
     422           1 :     ENVOY_LOG(debug, "Cleanup already in progress");
     423           1 :     return;
     424           1 :   }
     425           2 :   draining_cq_ = true;
     426           2 :   ctxt_.TryCancel();
     427           2 :   if (LinkedObject<GoogleAsyncStreamImpl>::inserted()) {
     428             :     // We take ownership of our own memory at this point.
     429           2 :     LinkedObject<GoogleAsyncStreamImpl>::removeFromList(parent_.active_streams_).release();
     430           2 :     if (inflight_tags_ == 0) {
     431           0 :       deferredDelete();
     432           0 :     }
     433           2 :   }
     434           2 : }
     435             : 
     436             : GoogleAsyncRequestImpl::GoogleAsyncRequestImpl(
     437             :     GoogleAsyncClientImpl& parent, absl::string_view service_full_name,
     438             :     absl::string_view method_name, Buffer::InstancePtr request, RawAsyncRequestCallbacks& callbacks,
     439             :     Tracing::Span& parent_span, const Http::AsyncClient::RequestOptions& options)
     440             :     : GoogleAsyncStreamImpl(parent, service_full_name, method_name, *this, options),
     441           0 :       request_(std::move(request)), callbacks_(callbacks) {
     442           0 :   current_span_ =
     443           0 :       parent_span.spawnChild(Tracing::EgressConfig::get(),
     444           0 :                              absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
     445           0 :                              parent.timeSource().systemTime());
     446           0 :   current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.stat_prefix_);
     447           0 :   current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.target_uri_);
     448           0 :   current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
     449           0 : }
     450             : 
     451           0 : void GoogleAsyncRequestImpl::initialize(bool buffer_body_for_retry) {
     452           0 :   GoogleAsyncStreamImpl::initialize(buffer_body_for_retry);
     453           0 :   if (callFailed()) {
     454           0 :     return;
     455           0 :   }
     456           0 :   sendMessageRaw(std::move(request_), true);
     457           0 : }
     458             : 
     459           0 : void GoogleAsyncRequestImpl::cancel() {
     460           0 :   current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
     461           0 :   current_span_->finishSpan();
     462           0 :   resetStream();
     463           0 : }
     464             : 
     465           0 : void GoogleAsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
     466           0 :   Tracing::HttpTraceContext trace_context(metadata);
     467           0 :   current_span_->injectContext(trace_context, nullptr);
     468           0 :   callbacks_.onCreateInitialMetadata(metadata);
     469           0 : }
     470             : 
     471           0 : void GoogleAsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
     472             : 
     473           0 : bool GoogleAsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
     474           0 :   response_ = std::move(response);
     475           0 :   return true;
     476           0 : }
     477             : 
     478           0 : void GoogleAsyncRequestImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}
     479             : 
     480             : void GoogleAsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status,
     481           0 :                                            const std::string& message) {
     482           0 :   current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
     483             : 
     484           0 :   if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
     485           0 :     current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
     486           0 :     callbacks_.onFailure(status, message, *current_span_);
     487           0 :   } else if (response_ == nullptr) {
     488           0 :     current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
     489           0 :     callbacks_.onFailure(Status::Internal, EMPTY_STRING, *current_span_);
     490           0 :   } else {
     491           0 :     callbacks_.onSuccessRaw(std::move(response_), *current_span_);
     492           0 :   }
     493             : 
     494           0 :   current_span_->finishSpan();
     495           0 : }
     496             : 
     497             : } // namespace Grpc
     498             : } // namespace Envoy

Generated by: LCOV version 1.15