LCOV - code coverage report
Current view: top level - source/common/grpc - async_client_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 90 183 49.2 %
Date: 2024-01-05 06:35:25 Functions: 14 24 58.3 %

          Line data    Source code
       1             : #include "source/common/grpc/async_client_impl.h"
       2             : 
       3             : #include "envoy/config/core/v3/grpc_service.pb.h"
       4             : 
       5             : #include "source/common/buffer/zero_copy_input_stream_impl.h"
       6             : #include "source/common/common/enum_to_int.h"
       7             : #include "source/common/common/utility.h"
       8             : #include "source/common/grpc/common.h"
       9             : #include "source/common/http/header_map_impl.h"
      10             : #include "source/common/http/utility.h"
      11             : 
      12             : #include "absl/strings/str_cat.h"
      13             : 
      14             : namespace Envoy {
      15             : namespace Grpc {
      16             : 
      17             : AsyncClientImpl::AsyncClientImpl(Upstream::ClusterManager& cm,
      18             :                                  const envoy::config::core::v3::GrpcService& config,
      19             :                                  TimeSource& time_source)
      20             :     : cm_(cm), remote_cluster_name_(config.envoy_grpc().cluster_name()),
      21             :       host_name_(config.envoy_grpc().authority()), time_source_(time_source),
      22             :       metadata_parser_(Router::HeaderParser::configure(
      23             :           config.initial_metadata(),
      24          33 :           envoy::config::core::v3::HeaderValueOption::OVERWRITE_IF_EXISTS_OR_ADD)) {}
      25             : 
      26          33 : AsyncClientImpl::~AsyncClientImpl() {
      27          33 :   ASSERT(isThreadSafe());
      28          33 :   while (!active_streams_.empty()) {
      29           0 :     active_streams_.front()->resetStream();
      30           0 :   }
      31          33 : }
      32             : 
      33             : AsyncRequest* AsyncClientImpl::sendRaw(absl::string_view service_full_name,
      34             :                                        absl::string_view method_name, Buffer::InstancePtr&& request,
      35             :                                        RawAsyncRequestCallbacks& callbacks,
      36             :                                        Tracing::Span& parent_span,
      37           0 :                                        const Http::AsyncClient::RequestOptions& options) {
      38           0 :   ASSERT(isThreadSafe());
      39           0 :   auto* const async_request = new AsyncRequestImpl(
      40           0 :       *this, service_full_name, method_name, std::move(request), callbacks, parent_span, options);
      41           0 :   AsyncStreamImplPtr grpc_stream{async_request};
      42             : 
      43           0 :   grpc_stream->initialize(true);
      44           0 :   if (grpc_stream->hasResetStream()) {
      45           0 :     return nullptr;
      46           0 :   }
      47             : 
      48           0 :   LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
      49           0 :   return async_request;
      50           0 : }
      51             : 
      52             : RawAsyncStream* AsyncClientImpl::startRaw(absl::string_view service_full_name,
      53             :                                           absl::string_view method_name,
      54             :                                           RawAsyncStreamCallbacks& callbacks,
      55          68 :                                           const Http::AsyncClient::StreamOptions& options) {
      56          68 :   ASSERT(isThreadSafe());
      57          68 :   auto grpc_stream =
      58          68 :       std::make_unique<AsyncStreamImpl>(*this, service_full_name, method_name, callbacks, options);
      59             : 
      60          68 :   grpc_stream->initialize(options.buffer_body_for_retry);
      61          68 :   if (grpc_stream->hasResetStream()) {
      62           0 :     return nullptr;
      63           0 :   }
      64             : 
      65          68 :   LinkedList::moveIntoList(std::move(grpc_stream), active_streams_);
      66          68 :   return active_streams_.front().get();
      67          68 : }
      68             : 
      69             : AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
      70             :                                  absl::string_view method_name, RawAsyncStreamCallbacks& callbacks,
      71             :                                  const Http::AsyncClient::StreamOptions& options)
      72             :     : parent_(parent), service_full_name_(service_full_name), method_name_(method_name),
      73          68 :       callbacks_(callbacks), options_(options) {}
      74             : 
      75          68 : void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
      76          68 :   const auto thread_local_cluster = parent_.cm_.getThreadLocalCluster(parent_.remote_cluster_name_);
      77          68 :   if (thread_local_cluster == nullptr) {
      78           0 :     callbacks_.onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available");
      79           0 :     http_reset_ = true;
      80           0 :     return;
      81           0 :   }
      82             : 
      83          68 :   auto& http_async_client = thread_local_cluster->httpAsyncClient();
      84          68 :   dispatcher_ = &http_async_client.dispatcher();
      85          68 :   stream_ = http_async_client.start(*this, options_.setBufferBodyForRetry(buffer_body_for_retry));
      86          68 :   if (stream_ == nullptr) {
      87           0 :     callbacks_.onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, EMPTY_STRING);
      88           0 :     http_reset_ = true;
      89           0 :     return;
      90           0 :   }
      91             : 
      92             :   // TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
      93             :   // https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
      94          68 :   headers_message_ = Common::prepareHeaders(
      95          68 :       parent_.host_name_.empty() ? parent_.remote_cluster_name_ : parent_.host_name_,
      96          68 :       service_full_name_, method_name_, options_.timeout);
      97             :   // Fill service-wide initial metadata.
      98             :   // TODO(cpakulski): Find a better way to access requestHeaders
      99             :   // request headers should not be stored in stream_info.
     100             :   // Maybe put it to parent_context?
     101             :   // Since request headers may be empty, consider using Envoy::OptRef.
     102          68 :   parent_.metadata_parser_->evaluateHeaders(headers_message_->headers(),
     103          68 :                                             options_.parent_context.stream_info);
     104             : 
     105          68 :   callbacks_.onCreateInitialMetadata(headers_message_->headers());
     106          68 :   stream_->sendHeaders(headers_message_->headers(), false);
     107          68 : }
     108             : 
     109             : // TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
     110             : // https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
     111          68 : void AsyncStreamImpl::onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
     112          68 :   const auto http_response_status = Http::Utility::getResponseStatus(*headers);
     113          68 :   const auto grpc_status = Common::getGrpcStatus(*headers);
     114          68 :   callbacks_.onReceiveInitialMetadata(end_stream ? Http::ResponseHeaderMapImpl::create()
     115          68 :                                                  : std::move(headers));
     116          68 :   if (http_response_status != enumToInt(Http::Code::OK)) {
     117             :     // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md requires that
     118             :     // grpc-status be used if available.
     119           0 :     if (end_stream && grpc_status) {
     120             :       // Due to headers/trailers type differences we need to copy here. This is an uncommon case but
     121             :       // we can potentially optimize in the future.
     122             : 
     123             :       // TODO(mattklein123): clang-tidy is showing a use after move when passing to
     124             :       // onReceiveInitialMetadata() above. This looks like an actual bug that I will fix in a
     125             :       // follow up.
     126             :       // NOLINTNEXTLINE(bugprone-use-after-move)
     127           0 :       onTrailers(Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*headers));
     128           0 :       return;
     129           0 :     }
     130             :     // Status is translated via Utility::httpToGrpcStatus per
     131             :     // https://github.com/grpc/grpc/blob/master/doc/http-grpc-status-mapping.md
     132           0 :     streamError(Utility::httpToGrpcStatus(http_response_status));
     133           0 :     return;
     134           0 :   }
     135          68 :   if (end_stream) {
     136             :     // Due to headers/trailers type differences we need to copy here. This is an uncommon case but
     137             :     // we can potentially optimize in the future.
     138          19 :     onTrailers(Http::createHeaderMap<Http::ResponseTrailerMapImpl>(*headers));
     139          19 :   }
     140          68 : }
     141             : 
     142         253 : void AsyncStreamImpl::onData(Buffer::Instance& data, bool end_stream) {
     143         253 :   decoded_frames_.clear();
     144         253 :   if (!decoder_.decode(data, decoded_frames_)) {
     145           0 :     streamError(Status::WellKnownGrpcStatus::Internal);
     146           0 :     return;
     147           0 :   }
     148             : 
     149         253 :   for (auto& frame : decoded_frames_) {
     150         253 :     if (frame.length_ > 0 && frame.flags_ != GRPC_FH_DEFAULT) {
     151           0 :       streamError(Status::WellKnownGrpcStatus::Internal);
     152           0 :       return;
     153           0 :     }
     154         253 :     if (!callbacks_.onReceiveMessageRaw(frame.data_ ? std::move(frame.data_)
     155         253 :                                                     : std::make_unique<Buffer::OwnedImpl>())) {
     156           0 :       streamError(Status::WellKnownGrpcStatus::Internal);
     157           0 :       return;
     158           0 :     }
     159         253 :   }
     160             : 
     161         253 :   if (end_stream) {
     162           0 :     streamError(Status::WellKnownGrpcStatus::Unknown);
     163           0 :   }
     164         253 : }
     165             : 
     166             : // TODO(htuch): match Google gRPC base64 encoding behavior for *-bin headers, see
     167             : // https://github.com/envoyproxy/envoy/pull/2444#discussion_r163914459.
     168          25 : void AsyncStreamImpl::onTrailers(Http::ResponseTrailerMapPtr&& trailers) {
     169          25 :   auto grpc_status = Common::getGrpcStatus(*trailers);
     170          25 :   const std::string grpc_message = Common::getGrpcMessage(*trailers);
     171          25 :   callbacks_.onReceiveTrailingMetadata(std::move(trailers));
     172          25 :   if (!grpc_status) {
     173           0 :     grpc_status = Status::WellKnownGrpcStatus::Unknown;
     174           0 :   }
     175          25 :   callbacks_.onRemoteClose(grpc_status.value(), grpc_message);
     176          25 :   cleanup();
     177          25 : }
     178             : 
     179          28 : void AsyncStreamImpl::streamError(Status::GrpcStatus grpc_status, const std::string& message) {
     180          28 :   callbacks_.onReceiveTrailingMetadata(Http::ResponseTrailerMapImpl::create());
     181          28 :   callbacks_.onRemoteClose(grpc_status, message);
     182          28 :   resetStream();
     183          28 : }
     184             : 
     185           0 : void AsyncStreamImpl::onComplete() {
     186             :   // No-op since stream completion is handled within other callbacks.
     187           0 : }
     188             : 
     189          68 : void AsyncStreamImpl::onReset() {
     190          68 :   if (http_reset_) {
     191          40 :     return;
     192          40 :   }
     193             : 
     194          28 :   http_reset_ = true;
     195          28 :   streamError(Status::WellKnownGrpcStatus::Internal);
     196          28 : }
     197             : 
     198         416 : void AsyncStreamImpl::sendMessageRaw(Buffer::InstancePtr&& buffer, bool end_stream) {
     199         416 :   Common::prependGrpcFrameHeader(*buffer);
     200         416 :   stream_->sendData(*buffer, end_stream);
     201         416 : }
     202             : 
     203          15 : void AsyncStreamImpl::closeStream() {
     204          15 :   Buffer::OwnedImpl empty_buffer;
     205          15 :   stream_->sendData(empty_buffer, true);
     206          15 : }
     207             : 
     208          43 : void AsyncStreamImpl::resetStream() { cleanup(); }
     209             : 
     210          68 : void AsyncStreamImpl::cleanup() {
     211          68 :   if (!http_reset_) {
     212          40 :     http_reset_ = true;
     213          40 :     stream_->reset();
     214          40 :   }
     215             : 
     216             :   // This will destroy us, but only do so if we are actually in a list. This does not happen in
     217             :   // the immediate failure case.
     218          68 :   if (LinkedObject<AsyncStreamImpl>::inserted()) {
     219          68 :     ASSERT(dispatcher_->isThreadSafe());
     220          68 :     dispatcher_->deferredDelete(
     221          68 :         LinkedObject<AsyncStreamImpl>::removeFromList(parent_.active_streams_));
     222          68 :   }
     223          68 : }
     224             : 
     225             : AsyncRequestImpl::AsyncRequestImpl(AsyncClientImpl& parent, absl::string_view service_full_name,
     226             :                                    absl::string_view method_name, Buffer::InstancePtr&& request,
     227             :                                    RawAsyncRequestCallbacks& callbacks, Tracing::Span& parent_span,
     228             :                                    const Http::AsyncClient::RequestOptions& options)
     229             :     : AsyncStreamImpl(parent, service_full_name, method_name, *this, options),
     230           0 :       request_(std::move(request)), callbacks_(callbacks) {
     231             : 
     232           0 :   current_span_ =
     233           0 :       parent_span.spawnChild(Tracing::EgressConfig::get(),
     234           0 :                              absl::StrCat("async ", service_full_name, ".", method_name, " egress"),
     235           0 :                              parent.time_source_.systemTime());
     236           0 :   current_span_->setTag(Tracing::Tags::get().UpstreamCluster, parent.remote_cluster_name_);
     237           0 :   current_span_->setTag(Tracing::Tags::get().UpstreamAddress, parent.host_name_.empty()
     238           0 :                                                                   ? parent.remote_cluster_name_
     239           0 :                                                                   : parent.host_name_);
     240           0 :   current_span_->setTag(Tracing::Tags::get().Component, Tracing::Tags::get().Proxy);
     241           0 : }
     242             : 
     243           0 : void AsyncRequestImpl::initialize(bool buffer_body_for_retry) {
     244           0 :   AsyncStreamImpl::initialize(buffer_body_for_retry);
     245           0 :   if (this->hasResetStream()) {
     246           0 :     return;
     247           0 :   }
     248           0 :   this->sendMessageRaw(std::move(request_), true);
     249           0 : }
     250             : 
     251           0 : void AsyncRequestImpl::cancel() {
     252           0 :   current_span_->setTag(Tracing::Tags::get().Status, Tracing::Tags::get().Canceled);
     253           0 :   current_span_->finishSpan();
     254           0 :   this->resetStream();
     255           0 : }
     256             : 
     257           0 : void AsyncRequestImpl::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
     258           0 :   Tracing::HttpTraceContext trace_context(metadata);
     259           0 :   current_span_->injectContext(trace_context, nullptr);
     260           0 :   callbacks_.onCreateInitialMetadata(metadata);
     261           0 : }
     262             : 
     263           0 : void AsyncRequestImpl::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&&) {}
     264             : 
     265           0 : bool AsyncRequestImpl::onReceiveMessageRaw(Buffer::InstancePtr&& response) {
     266           0 :   response_ = std::move(response);
     267           0 :   return true;
     268           0 : }
     269             : 
     270           0 : void AsyncRequestImpl::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&&) {}
     271             : 
     272           0 : void AsyncRequestImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
     273           0 :   current_span_->setTag(Tracing::Tags::get().GrpcStatusCode, std::to_string(status));
     274             : 
     275           0 :   if (status != Grpc::Status::WellKnownGrpcStatus::Ok) {
     276           0 :     current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
     277           0 :     callbacks_.onFailure(status, message, *current_span_);
     278           0 :   } else if (response_ == nullptr) {
     279           0 :     current_span_->setTag(Tracing::Tags::get().Error, Tracing::Tags::get().True);
     280           0 :     callbacks_.onFailure(Status::Internal, EMPTY_STRING, *current_span_);
     281           0 :   } else {
     282           0 :     callbacks_.onSuccessRaw(std::move(response_), *current_span_);
     283           0 :   }
     284             : 
     285           0 :   current_span_->finishSpan();
     286           0 : }
     287             : 
     288             : } // namespace Grpc
     289             : } // namespace Envoy

Generated by: LCOV version 1.15