LCOV - code coverage report
Current view: top level - source/common/http - async_client_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 88 199 44.2 %
Date: 2024-01-05 06:35:25 Functions: 14 30 46.7 %

          Line data    Source code
       1             : #include "source/common/http/async_client_impl.h"
       2             : 
       3             : #include <chrono>
       4             : #include <map>
       5             : #include <memory>
       6             : #include <string>
       7             : #include <vector>
       8             : 
       9             : #include "envoy/config/core/v3/base.pb.h"
      10             : 
      11             : #include "source/common/grpc/common.h"
      12             : #include "source/common/http/utility.h"
      13             : #include "source/common/tracing/http_tracer_impl.h"
      14             : 
      15             : namespace Envoy {
      16             : namespace Http {
      17             : AsyncClientImpl::AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster,
      18             :                                  Stats::Store& stats_store, Event::Dispatcher& dispatcher,
      19             :                                  const LocalInfo::LocalInfo& local_info,
      20             :                                  Upstream::ClusterManager& cm, Runtime::Loader& runtime,
      21             :                                  Random::RandomGenerator& random,
      22             :                                  Router::ShadowWriterPtr&& shadow_writer,
      23             :                                  Http::Context& http_context, Router::Context& router_context)
      24             :     : singleton_manager_(cm.clusterManagerFactory().singletonManager()), cluster_(cluster),
      25             :       config_(http_context.asyncClientStatPrefix(), local_info, *stats_store.rootScope(), cm,
      26             :               runtime, random, std::move(shadow_writer), true, false, false, false, false, false,
      27             :               {}, dispatcher.timeSource(), http_context, router_context),
      28          33 :       dispatcher_(dispatcher) {}
      29             : 
      30          33 : AsyncClientImpl::~AsyncClientImpl() {
      31          33 :   while (!active_streams_.empty()) {
      32           0 :     active_streams_.front()->reset();
      33           0 :   }
      34          33 : }
      35             : 
      36           0 : template <typename T> T* AsyncClientImpl::internalStartRequest(T* async_request) {
      37           0 :   async_request->initialize();
      38           0 :   std::unique_ptr<AsyncStreamImpl> new_request{async_request};
      39             : 
      40             :   // The request may get immediately failed. If so, we will return nullptr.
      41           0 :   if (!new_request->remote_closed_) {
      42           0 :     LinkedList::moveIntoList(std::move(new_request), active_streams_);
      43           0 :     return async_request;
      44           0 :   } else {
      45           0 :     new_request->cleanup();
      46           0 :     return nullptr;
      47           0 :   }
      48           0 : }
      49             : 
      50             : template AsyncRequestImpl*
      51             : AsyncClientImpl::internalStartRequest<AsyncRequestImpl>(AsyncRequestImpl*);
      52             : template AsyncOngoingRequestImpl*
      53             : AsyncClientImpl::internalStartRequest<AsyncOngoingRequestImpl>(AsyncOngoingRequestImpl*);
      54             : 
      55             : AsyncClient::Request* AsyncClientImpl::send(RequestMessagePtr&& request,
      56             :                                             AsyncClient::Callbacks& callbacks,
      57           0 :                                             const AsyncClient::RequestOptions& options) {
      58           0 :   AsyncRequestImpl* async_request =
      59           0 :       new AsyncRequestImpl(std::move(request), *this, callbacks, options);
      60           0 :   return internalStartRequest(async_request);
      61           0 : }
      62             : 
      63             : AsyncClient::OngoingRequest*
      64             : AsyncClientImpl::startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
      65           0 :                               const AsyncClient::RequestOptions& options) {
      66           0 :   AsyncOngoingRequestImpl* async_request =
      67           0 :       new AsyncOngoingRequestImpl(std::move(request_headers), *this, callbacks, options);
      68           0 :   return internalStartRequest(async_request);
      69           0 : }
      70             : 
      71             : AsyncClient::Stream* AsyncClientImpl::start(AsyncClient::StreamCallbacks& callbacks,
      72          68 :                                             const AsyncClient::StreamOptions& options) {
      73          68 :   std::unique_ptr<AsyncStreamImpl> new_stream{new AsyncStreamImpl(*this, callbacks, options)};
      74          68 :   LinkedList::moveIntoList(std::move(new_stream), active_streams_);
      75          68 :   return active_streams_.front().get();
      76          68 : }
      77             : 
      78             : AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
      79             :                                  const AsyncClient::StreamOptions& options)
      80             :     : parent_(parent), stream_callbacks_(callbacks), stream_id_(parent.config_.random_.random()),
      81             :       router_(options.filter_config_ ? *options.filter_config_ : parent.config_,
      82             :               parent.config_.async_stats_),
      83             :       stream_info_(Protocol::Http11, parent.dispatcher().timeSource(), nullptr),
      84             :       tracing_config_(Tracing::EgressConfig::get()),
      85             :       route_(std::make_shared<NullRouteImpl>(parent_.cluster_->name(), parent_.singleton_manager_,
      86             :                                              options.timeout, options.hash_policy,
      87             :                                              options.retry_policy)),
      88             :       account_(options.account_), buffer_limit_(options.buffer_limit_),
      89          68 :       send_xff_(options.send_xff) {
      90          68 :   stream_info_.dynamicMetadata().MergeFrom(options.metadata);
      91          68 :   stream_info_.setIsShadow(options.is_shadow);
      92          68 :   stream_info_.setUpstreamClusterInfo(parent_.cluster_);
      93          68 :   stream_info_.route_ = route_;
      94             : 
      95          68 :   if (options.buffer_body_for_retry) {
      96           0 :     buffered_body_ = std::make_unique<Buffer::OwnedImpl>(account_);
      97           0 :   }
      98             : 
      99          68 :   router_.setDecoderFilterCallbacks(*this);
     100             :   // TODO(mattklein123): Correctly set protocol in stream info when we support access logging.
     101          68 : }
     102             : 
     103             : void AsyncStreamImpl::encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
     104          68 :                                     absl::string_view) {
     105          68 :   ENVOY_LOG(debug, "async http request response headers (end_stream={}):\n{}", end_stream,
     106          68 :             *headers);
     107          68 :   ASSERT(!remote_closed_);
     108          68 :   encoded_response_headers_ = true;
     109          68 :   stream_callbacks_.onHeaders(std::move(headers), end_stream);
     110          68 :   closeRemote(end_stream);
     111             :   // At present, the router cleans up stream state as soon as the remote is closed, making a
     112             :   // half-open local stream unsupported and dangerous. Ensure we close locally to trigger completion
     113             :   // and keep things consistent. Another option would be to issue a stream reset here if local isn't
     114             :   // yet closed, triggering cleanup along a more standardized path. However, this would require
     115             :   // additional logic to handle the response completion and subsequent reset, and run the risk of
     116             :   // being interpreted as a failure, when in fact no error has necessarily occurred. Gracefully
     117             :   // closing seems most in-line with behavior elsewhere in Envoy for now.
     118          68 :   closeLocal(end_stream);
     119          68 : }
     120             : 
     121         253 : void AsyncStreamImpl::encodeData(Buffer::Instance& data, bool end_stream) {
     122         253 :   ENVOY_LOG(trace, "async http request response data (length={} end_stream={})", data.length(),
     123         253 :             end_stream);
     124         253 :   ASSERT(!remote_closed_);
     125         253 :   stream_callbacks_.onData(data, end_stream);
     126         253 :   closeRemote(end_stream);
     127             :   // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
     128             :   // rationale.
     129         253 :   closeLocal(end_stream);
     130         253 : }
     131             : 
     132           6 : void AsyncStreamImpl::encodeTrailers(ResponseTrailerMapPtr&& trailers) {
     133           6 :   ENVOY_LOG(debug, "async http request response trailers:\n{}", *trailers);
     134           6 :   ASSERT(!remote_closed_);
     135           6 :   stream_callbacks_.onTrailers(std::move(trailers));
     136           6 :   closeRemote(true);
     137             :   // Ensure we close locally on receiving a complete response; see comment in encodeHeaders for
     138             :   // rationale.
     139           6 :   closeLocal(true);
     140           6 : }
     141             : 
     142          68 : void AsyncStreamImpl::sendHeaders(RequestHeaderMap& headers, bool end_stream) {
     143          68 :   request_headers_ = &headers;
     144             : 
     145          68 :   if (Http::Headers::get().MethodValues.Head == headers.getMethodValue()) {
     146           0 :     is_head_request_ = true;
     147           0 :   }
     148             : 
     149          68 :   is_grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
     150          68 :   headers.setReferenceEnvoyInternalRequest(Headers::get().EnvoyInternalRequestValues.True);
     151          68 :   if (send_xff_) {
     152          68 :     Utility::appendXff(headers, *parent_.config_.local_info_.address());
     153          68 :   }
     154             : 
     155          68 :   router_.decodeHeaders(headers, end_stream);
     156          68 :   closeLocal(end_stream);
     157          68 : }
     158             : 
     159         431 : void AsyncStreamImpl::sendData(Buffer::Instance& data, bool end_stream) {
     160         431 :   ASSERT(dispatcher().isThreadSafe());
     161             :   // Map send calls after local closure to no-ops. The send call could have been queued prior to
     162             :   // remote reset or closure, and/or closure could have occurred synchronously in response to a
     163             :   // previous send. In these cases the router will have already cleaned up stream state. This
     164             :   // parallels handling in the main Http::ConnectionManagerImpl as well.
     165         431 :   if (local_closed_) {
     166           0 :     return;
     167           0 :   }
     168             : 
     169         431 :   if (buffered_body_ != nullptr) {
     170             :     // TODO(shikugawa): Currently, data is dropped when the retry buffer overflows and there is no
     171             :     // ability implement any error handling. We need to implement buffer overflow handling in the
     172             :     // future. Options include configuring the max buffer size, or for use cases like gRPC
     173             :     // streaming, deleting old data in the retry buffer.
     174           0 :     if (buffered_body_->length() + data.length() > kBufferLimitForRetry) {
     175           0 :       ENVOY_LOG_EVERY_POW_2(
     176           0 :           warn, "the buffer size limit (64KB) for async client retries has been exceeded.");
     177           0 :     } else {
     178           0 :       buffered_body_->add(data);
     179           0 :     }
     180           0 :   }
     181             : 
     182         431 :   router_.decodeData(data, end_stream);
     183         431 :   closeLocal(end_stream);
     184         431 : }
     185             : 
     186           0 : void AsyncStreamImpl::sendTrailers(RequestTrailerMap& trailers) {
     187           0 :   request_trailers_ = &trailers;
     188             : 
     189           0 :   ASSERT(dispatcher().isThreadSafe());
     190             :   // See explanation in sendData.
     191           0 :   if (local_closed_) {
     192           0 :     return;
     193           0 :   }
     194             : 
     195           0 :   router_.decodeTrailers(trailers);
     196           0 :   closeLocal(true);
     197           0 : }
     198             : 
     199         826 : void AsyncStreamImpl::closeLocal(bool end_stream) {
     200             :   // This guard ensures that we don't attempt to clean up a stream or fire a completion callback
     201             :   // for a stream that has already been closed. Both send* calls and resets can result in stream
     202             :   // closure, and this state may be updated synchronously during stream interaction and callbacks.
     203             :   // Additionally AsyncRequestImpl maintains behavior wherein its onComplete callback will fire
     204             :   // immediately upon receiving a complete response, regardless of whether it has finished sending
     205             :   // a request.
     206             :   // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
     207             :   // guards against redundant cleanup), but to surface consistent stream state via callbacks,
     208             :   // it's necessary to be more rigorous.
     209             :   // TODO(goaway): Consider deeper cleanup of assumptions here.
     210         826 :   if (local_closed_) {
     211          40 :     return;
     212          40 :   }
     213             : 
     214         786 :   local_closed_ = end_stream;
     215         786 :   if (complete()) {
     216           0 :     stream_callbacks_.onComplete();
     217           0 :     cleanup();
     218           0 :   }
     219         786 : }
     220             : 
     221         327 : void AsyncStreamImpl::closeRemote(bool end_stream) {
     222             :   // This guard ensures that we don't attempt to clean up a stream or fire a completion callback for
     223             :   // a stream that has already been closed. This function is called synchronously after callbacks
     224             :   // have executed, and it's possible for callbacks to, for instance, directly reset a stream or
     225             :   // close the remote manually. The test case ResetInOnHeaders covers this case specifically.
     226             :   // Previous logic treated post-closure entry here as more-or-less benign (providing later-stage
     227             :   // guards against redundant cleanup), but to surface consistent stream state via callbacks, it's
     228             :   // necessary to be more rigorous.
     229             :   // TODO(goaway): Consider deeper cleanup of assumptions here.
     230         327 :   if (remote_closed_) {
     231          40 :     return;
     232          40 :   }
     233             : 
     234         287 :   remote_closed_ = end_stream;
     235         287 :   if (complete()) {
     236           0 :     stream_callbacks_.onComplete();
     237           0 :     cleanup();
     238           0 :   }
     239         287 : }
     240             : 
     241          40 : void AsyncStreamImpl::reset() {
     242          40 :   router_.onDestroy();
     243          40 :   resetStream();
     244          40 : }
     245             : 
     246          68 : void AsyncStreamImpl::cleanup() {
     247          68 :   ASSERT(dispatcher().isThreadSafe());
     248          68 :   local_closed_ = remote_closed_ = true;
     249             :   // This will destroy us, but only do so if we are actually in a list. This does not happen in
     250             :   // the immediate failure case.
     251          68 :   if (inserted()) {
     252          68 :     dispatcher().deferredDelete(removeFromList(parent_.active_streams_));
     253          68 :   }
     254          68 : }
     255             : 
     256          68 : void AsyncStreamImpl::resetStream(Http::StreamResetReason, absl::string_view) {
     257          68 :   stream_callbacks_.onReset();
     258          68 :   cleanup();
     259          68 : }
     260             : 
     261             : AsyncRequestSharedImpl::AsyncRequestSharedImpl(AsyncClientImpl& parent,
     262             :                                                AsyncClient::Callbacks& callbacks,
     263             :                                                const AsyncClient::RequestOptions& options)
     264           0 :     : AsyncStreamImpl(parent, *this, options), callbacks_(callbacks) {
     265           0 :   if (nullptr != options.parent_span_) {
     266           0 :     const std::string child_span_name =
     267           0 :         options.child_span_name_.empty()
     268           0 :             ? absl::StrCat("async ", parent.cluster_->name(), " egress")
     269           0 :             : options.child_span_name_;
     270           0 :     child_span_ = options.parent_span_->spawnChild(Tracing::EgressConfig::get(), child_span_name,
     271           0 :                                                    parent.dispatcher().timeSource().systemTime());
     272           0 :   } else {
     273           0 :     child_span_ = std::make_unique<Tracing::NullSpan>();
     274           0 :   }
     275             :   // Span gets sampled by default, as sampled_ defaults to true.
     276             :   // If caller overrides sampled_ with empty value, sampling status of the parent is kept.
     277           0 :   if (options.sampled_.has_value()) {
     278           0 :     child_span_->setSampled(options.sampled_.value());
     279           0 :   }
     280           0 : }
     281             : 
     282           0 : void AsyncRequestImpl::initialize() {
     283           0 :   Tracing::HttpTraceContext trace_context(request_->headers());
     284           0 :   child_span_->injectContext(trace_context, nullptr);
     285           0 :   sendHeaders(request_->headers(), request_->body().length() == 0);
     286           0 :   if (request_->body().length() != 0) {
     287             :     // It's possible this will be a no-op due to a local response synchronously generated in
     288             :     // sendHeaders; guards handle this within AsyncStreamImpl.
     289           0 :     sendData(request_->body(), true);
     290           0 :   }
     291             :   // TODO(mattklein123): Support request trailers.
     292           0 : }
     293             : 
     294           0 : void AsyncOngoingRequestImpl::initialize() {
     295           0 :   Tracing::HttpTraceContext trace_context(*request_headers_);
     296           0 :   child_span_->injectContext(trace_context, nullptr);
     297           0 :   sendHeaders(*request_headers_, false);
     298           0 : }
     299             : 
     300           0 : void AsyncRequestSharedImpl::onComplete() {
     301           0 :   callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_, &response_->headers());
     302             : 
     303           0 :   Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
     304           0 :                                                    Tracing::EgressConfig::get());
     305             : 
     306           0 :   callbacks_.onSuccess(*this, std::move(response_));
     307           0 : }
     308             : 
     309           0 : void AsyncRequestSharedImpl::onHeaders(ResponseHeaderMapPtr&& headers, bool) {
     310           0 :   const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
     311           0 :   streamInfo().setResponseCode(response_code);
     312           0 :   response_ = std::make_unique<ResponseMessageImpl>(std::move(headers));
     313           0 : }
     314             : 
     315           0 : void AsyncRequestSharedImpl::onData(Buffer::Instance& data, bool) { response_->body().move(data); }
     316             : 
     317           0 : void AsyncRequestSharedImpl::onTrailers(ResponseTrailerMapPtr&& trailers) {
     318           0 :   response_->trailers(std::move(trailers));
     319           0 : }
     320             : 
     321           0 : void AsyncRequestSharedImpl::onReset() {
     322           0 :   if (!cancelled_) {
     323             :     // Set "error reason" tag related to reset. The tagging for "error true" is done inside the
     324             :     // Tracing::HttpTracerUtility::finalizeUpstreamSpan.
     325           0 :     child_span_->setTag(Tracing::Tags::get().ErrorReason, "Reset");
     326           0 :   }
     327             : 
     328           0 :   callbacks_.onBeforeFinalizeUpstreamSpan(*child_span_,
     329           0 :                                           remoteClosed() ? &response_->headers() : nullptr);
     330             : 
     331             :   // Finalize the span based on whether we received a response or not.
     332           0 :   Tracing::HttpTracerUtility::finalizeUpstreamSpan(*child_span_, streamInfo(),
     333           0 :                                                    Tracing::EgressConfig::get());
     334             : 
     335           0 :   if (!cancelled_) {
     336             :     // In this case we don't have a valid response so we do need to raise a failure.
     337           0 :     callbacks_.onFailure(*this, AsyncClient::FailureReason::Reset);
     338           0 :   }
     339           0 : }
     340             : 
     341           0 : void AsyncRequestSharedImpl::cancel() {
     342           0 :   cancelled_ = true;
     343             : 
     344             :   // Add tags about the cancellation.
     345           0 :   child_span_->setTag(Tracing::Tags::get().Canceled, Tracing::Tags::get().True);
     346             : 
     347           0 :   reset();
     348           0 : }
     349             : 
     350             : } // namespace Http
     351             : } // namespace Envoy

Generated by: LCOV version 1.15