LCOV - code coverage report
Current view: top level - source/common/http - async_client_impl.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 31 141 22.0 %
Date: 2024-01-05 06:35:25 Functions: 19 65 29.2 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <chrono>
       4             : #include <cstdint>
       5             : #include <functional>
       6             : #include <list>
       7             : #include <map>
       8             : #include <memory>
       9             : #include <string>
      10             : #include <vector>
      11             : 
      12             : #include "envoy/buffer/buffer.h"
      13             : #include "envoy/common/random_generator.h"
      14             : #include "envoy/common/scope_tracker.h"
      15             : #include "envoy/config/core/v3/base.pb.h"
      16             : #include "envoy/config/route/v3/route_components.pb.h"
      17             : #include "envoy/config/typed_metadata.h"
      18             : #include "envoy/event/dispatcher.h"
      19             : #include "envoy/http/async_client.h"
      20             : #include "envoy/http/codec.h"
      21             : #include "envoy/http/context.h"
      22             : #include "envoy/http/filter.h"
      23             : #include "envoy/http/header_map.h"
      24             : #include "envoy/http/message.h"
      25             : #include "envoy/router/context.h"
      26             : #include "envoy/router/router.h"
      27             : #include "envoy/router/router_ratelimit.h"
      28             : #include "envoy/router/shadow_writer.h"
      29             : #include "envoy/server/filter_config.h"
      30             : #include "envoy/ssl/connection.h"
      31             : #include "envoy/tracing/tracer.h"
      32             : #include "envoy/type/v3/percent.pb.h"
      33             : #include "envoy/upstream/load_balancer.h"
      34             : #include "envoy/upstream/upstream.h"
      35             : 
      36             : #include "source/common/common/assert.h"
      37             : #include "source/common/common/empty_string.h"
      38             : #include "source/common/common/linked_object.h"
      39             : #include "source/common/http/message_impl.h"
      40             : #include "source/common/http/null_route_impl.h"
      41             : #include "source/common/router/config_impl.h"
      42             : #include "source/common/router/router.h"
      43             : #include "source/common/stream_info/stream_info_impl.h"
      44             : #include "source/common/tracing/http_tracer_impl.h"
      45             : #include "source/common/upstream/retry_factory.h"
      46             : #include "source/extensions/early_data/default_early_data_policy.h"
      47             : 
      48             : namespace Envoy {
      49             : namespace Http {
      50             : namespace {
      51             : // Limit the size of buffer for data used for retries.
      52             : // This is currently fixed to 64KB.
      53             : constexpr uint64_t kBufferLimitForRetry = 1 << 16;
      54             : } // namespace
      55             : 
      56             : class AsyncStreamImpl;
      57             : class AsyncRequestSharedImpl;
      58             : 
      59             : class AsyncClientImpl final : public AsyncClient {
      60             : public:
      61             :   AsyncClientImpl(Upstream::ClusterInfoConstSharedPtr cluster, Stats::Store& stats_store,
      62             :                   Event::Dispatcher& dispatcher, const LocalInfo::LocalInfo& local_info,
      63             :                   Upstream::ClusterManager& cm, Runtime::Loader& runtime,
      64             :                   Random::RandomGenerator& random, Router::ShadowWriterPtr&& shadow_writer,
      65             :                   Http::Context& http_context, Router::Context& router_context);
      66             :   ~AsyncClientImpl() override;
      67             : 
      68             :   // Http::AsyncClient
      69             :   Request* send(RequestMessagePtr&& request, Callbacks& callbacks,
      70             :                 const AsyncClient::RequestOptions& options) override;
      71             :   Stream* start(StreamCallbacks& callbacks, const AsyncClient::StreamOptions& options) override;
      72             :   OngoingRequest* startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
      73             :                                const AsyncClient::RequestOptions& options) override;
      74             :   Singleton::Manager& singleton_manager_;
      75             :   Upstream::ClusterInfoConstSharedPtr cluster_;
      76         136 :   Event::Dispatcher& dispatcher() override { return dispatcher_; }
      77             : 
      78             : private:
      79             :   template <typename T> T* internalStartRequest(T* async_request);
      80             :   Router::FilterConfig config_;
      81             :   Event::Dispatcher& dispatcher_;
      82             :   std::list<std::unique_ptr<AsyncStreamImpl>> active_streams_;
      83             : 
      84             :   friend class AsyncStreamImpl;
      85             :   friend class AsyncRequestSharedImpl;
      86             : };
      87             : 
      88             : /**
      89             :  * Implementation of AsyncRequest. This implementation is capable of sending HTTP requests to a
      90             :  * ConnectionPool asynchronously.
      91             :  */
      92             : class AsyncStreamImpl : public virtual AsyncClient::Stream,
      93             :                         public StreamDecoderFilterCallbacks,
      94             :                         public Event::DeferredDeletable,
      95             :                         Logger::Loggable<Logger::Id::http>,
      96             :                         public LinkedObject<AsyncStreamImpl>,
      97             :                         public ScopeTrackedObject {
      98             : public:
      99             :   AsyncStreamImpl(AsyncClientImpl& parent, AsyncClient::StreamCallbacks& callbacks,
     100             :                   const AsyncClient::StreamOptions& options);
     101          68 :   ~AsyncStreamImpl() override {
     102          68 :     router_.onDestroy();
     103             :     // UpstreamRequest::cleanUp() is guaranteed to reset the high watermark calls.
     104          68 :     ENVOY_BUG(high_watermark_calls_ == 0, "Excess high watermark calls after async stream ended.");
     105          68 :     if (destructor_callback_.has_value()) {
     106           0 :       (*destructor_callback_)();
     107           0 :     }
     108          68 :   }
     109             : 
     110           0 :   void setDestructorCallback(AsyncClient::StreamDestructorCallbacks callback) override {
     111           0 :     ASSERT(!destructor_callback_);
     112           0 :     destructor_callback_.emplace(callback);
     113           0 :   }
     114             : 
     115           0 :   void removeDestructorCallback() override {
     116           0 :     ASSERT(destructor_callback_);
     117           0 :     destructor_callback_.reset();
     118           0 :   }
     119             : 
     120           0 :   void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) override {
     121           0 :     ASSERT(!watermark_callbacks_);
     122           0 :     watermark_callbacks_.emplace(callbacks);
     123           0 :     for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
     124           0 :       watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
     125           0 :     }
     126           0 :   }
     127             : 
     128           0 :   void removeWatermarkCallbacks() override {
     129           0 :     ASSERT(watermark_callbacks_);
     130           0 :     for (uint32_t i = 0; i < high_watermark_calls_; ++i) {
     131           0 :       watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
     132           0 :     }
     133           0 :     watermark_callbacks_.reset();
     134           0 :   }
     135             : 
     136             :   // Http::AsyncClient::Stream
     137             :   void sendHeaders(RequestHeaderMap& headers, bool end_stream) override;
     138             :   void sendData(Buffer::Instance& data, bool end_stream) override;
     139             :   void sendTrailers(RequestTrailerMap& trailers) override;
     140             :   void reset() override;
     141           0 :   bool isAboveWriteBufferHighWatermark() const override { return high_watermark_calls_ > 0; }
     142         122 :   const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; }
     143             : 
     144             : protected:
     145           0 :   bool remoteClosed() { return remote_closed_; }
     146             :   void closeLocal(bool end_stream);
     147        1277 :   StreamInfo::StreamInfoImpl& streamInfo() override { return stream_info_; }
     148             : 
     149             :   AsyncClientImpl& parent_;
     150             :   // Callback to listen for stream destruction.
     151             :   absl::optional<AsyncClient::StreamDestructorCallbacks> destructor_callback_;
     152             :   // Callback to listen for low/high/overflow watermark events.
     153             :   absl::optional<std::reference_wrapper<DecoderFilterWatermarkCallbacks>> watermark_callbacks_;
     154             : 
     155             : private:
     156             :   void cleanup();
     157             :   void closeRemote(bool end_stream);
     158        1073 :   bool complete() { return local_closed_ && remote_closed_; }
     159             : 
     160             :   // Http::StreamDecoderFilterCallbacks
     161         204 :   OptRef<const Network::Connection> connection() override { return {}; }
     162        1007 :   Event::Dispatcher& dispatcher() override { return parent_.dispatcher_; }
     163             :   void resetStream(Http::StreamResetReason reset_reason = Http::StreamResetReason::LocalReset,
     164             :                    absl::string_view transport_failure_reason = "") override;
     165         204 :   Router::RouteConstSharedPtr route() override { return route_; }
     166           0 :   Upstream::ClusterInfoConstSharedPtr clusterInfo() override { return parent_.cluster_; }
     167          68 :   uint64_t streamId() const override { return stream_id_; }
     168             :   // TODO(kbaichoo): Plumb account from owning request filter.
     169         136 :   Buffer::BufferMemoryAccountSharedPtr account() const override { return account_; }
     170          68 :   Tracing::Span& activeSpan() override { return active_span_; }
     171          68 :   OptRef<const Tracing::Config> tracingConfig() const override {
     172          68 :     return makeOptRef<const Tracing::Config>(tracing_config_);
     173          68 :   }
     174           0 :   void continueDecoding() override {}
     175           0 :   RequestTrailerMap& addDecodedTrailers() override { PANIC("not implemented"); }
     176           0 :   void addDecodedData(Buffer::Instance&, bool) override {
     177             :     // This should only be called if the user has set up buffering. The request is already fully
     178             :     // buffered. Note that this is only called via the async client's internal use of the router
     179             :     // filter which uses this function for buffering.
     180           0 :     ASSERT(buffered_body_ != nullptr);
     181           0 :   }
     182           0 :   MetadataMapVector& addDecodedMetadata() override { PANIC("not implemented"); }
     183           0 :   void injectDecodedDataToFilterChain(Buffer::Instance&, bool) override {}
     184           0 :   const Buffer::Instance* decodingBuffer() override { return buffered_body_.get(); }
     185           0 :   void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {}
     186             :   void sendLocalReply(Code code, absl::string_view body,
     187             :                       std::function<void(ResponseHeaderMap& headers)> modify_headers,
     188             :                       const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
     189          28 :                       absl::string_view details) override {
     190          28 :     if (encoded_response_headers_) {
     191          28 :       resetStream();
     192          28 :       return;
     193          28 :     }
     194           0 :     Utility::sendLocalReply(
     195           0 :         remote_closed_,
     196           0 :         Utility::EncodeFunctions{nullptr, nullptr,
     197           0 :                                  [this, modify_headers, &details](ResponseHeaderMapPtr&& headers,
     198           0 :                                                                   bool end_stream) -> void {
     199           0 :                                    if (modify_headers != nullptr) {
     200           0 :                                      modify_headers(*headers);
     201           0 :                                    }
     202           0 :                                    encodeHeaders(std::move(headers), end_stream, details);
     203           0 :                                  },
     204           0 :                                  [this](Buffer::Instance& data, bool end_stream) -> void {
     205           0 :                                    encodeData(data, end_stream);
     206           0 :                                  }},
     207           0 :         Utility::LocalReplyData{is_grpc_request_, code, body, grpc_status, is_head_request_});
     208           0 :   }
     209             :   // The async client won't pause if sending 1xx headers so simply swallow any.
     210           0 :   void encode1xxHeaders(ResponseHeaderMapPtr&&) override {}
     211             :   void encodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream,
     212             :                      absl::string_view details) override;
     213             :   void encodeData(Buffer::Instance& data, bool end_stream) override;
     214             :   void encodeTrailers(ResponseTrailerMapPtr&& trailers) override;
     215           0 :   void encodeMetadata(MetadataMapPtr&&) override {}
     216           0 :   void onDecoderFilterAboveWriteBufferHighWatermark() override {
     217           0 :     ++high_watermark_calls_;
     218           0 :     if (watermark_callbacks_.has_value()) {
     219           0 :       watermark_callbacks_->get().onDecoderFilterAboveWriteBufferHighWatermark();
     220           0 :     }
     221           0 :   }
     222           0 :   void onDecoderFilterBelowWriteBufferLowWatermark() override {
     223           0 :     ASSERT(high_watermark_calls_ != 0);
     224           0 :     --high_watermark_calls_;
     225           0 :     if (watermark_callbacks_.has_value()) {
     226           0 :       watermark_callbacks_->get().onDecoderFilterBelowWriteBufferLowWatermark();
     227           0 :     }
     228           0 :   }
     229          68 :   void addDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
     230          68 :   void removeDownstreamWatermarkCallbacks(DownstreamWatermarkCallbacks&) override {}
     231           0 :   void setDecoderBufferLimit(uint32_t) override {
     232           0 :     IS_ENVOY_BUG("decoder buffer limits should not be overridden on async streams.");
     233           0 :   }
     234         136 :   uint32_t decoderBufferLimit() override { return buffer_limit_.value_or(0); }
     235           0 :   bool recreateStream(const ResponseHeaderMap*) override { return false; }
     236         423 :   const ScopeTrackedObject& scope() override { return *this; }
     237           0 :   void restoreContextOnContinue(ScopeTrackedObjectStack& tracked_object_stack) override {
     238           0 :     tracked_object_stack.add(*this);
     239           0 :   }
     240           0 :   void addUpstreamSocketOptions(const Network::Socket::OptionsSharedPtr&) override {}
     241          68 :   Network::Socket::OptionsSharedPtr getUpstreamSocketOptions() const override { return {}; }
     242           0 :   const Router::RouteSpecificFilterConfig* mostSpecificPerFilterConfig() const override {
     243           0 :     return nullptr;
     244           0 :   }
     245             :   void traversePerFilterConfig(
     246           0 :       std::function<void(const Router::RouteSpecificFilterConfig&)>) const override {}
     247           0 :   Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() override { return {}; }
     248           0 :   OptRef<DownstreamStreamFilterCallbacks> downstreamCallbacks() override { return {}; }
     249           0 :   OptRef<UpstreamStreamFilterCallbacks> upstreamCallbacks() override { return {}; }
     250           0 :   void resetIdleTimer() override {}
     251           0 :   void setUpstreamOverrideHost(Upstream::LoadBalancerContext::OverrideHost) override {}
     252             :   absl::optional<Upstream::LoadBalancerContext::OverrideHost>
     253         136 :   upstreamOverrideHost() const override {
     254         136 :     return absl::nullopt;
     255         136 :   }
     256           0 :   absl::string_view filterConfigName() const override { return ""; }
     257           0 :   RequestHeaderMapOptRef requestHeaders() override { return makeOptRefFromPtr(request_headers_); }
     258           0 :   RequestTrailerMapOptRef requestTrailers() override {
     259           0 :     return makeOptRefFromPtr(request_trailers_);
     260           0 :   }
     261           0 :   ResponseHeaderMapOptRef informationalHeaders() override { return {}; }
     262           0 :   ResponseHeaderMapOptRef responseHeaders() override { return {}; }
     263           0 :   ResponseTrailerMapOptRef responseTrailers() override { return {}; }
     264             : 
     265             :   // ScopeTrackedObject
     266           0 :   void dumpState(std::ostream& os, int indent_level) const override {
     267           0 :     const char* spaces = spacesForLevel(indent_level);
     268           0 :     os << spaces << "AsyncClient " << this << DUMP_MEMBER(stream_id_) << "\n";
     269           0 :     DUMP_DETAILS(&stream_info_);
     270           0 :   }
     271             : 
     272             :   AsyncClient::StreamCallbacks& stream_callbacks_;
     273             :   const uint64_t stream_id_;
     274             :   Router::ProdFilter router_;
     275             :   StreamInfo::StreamInfoImpl stream_info_;
     276             :   Tracing::NullSpan active_span_;
     277             :   const Tracing::Config& tracing_config_;
     278             :   std::shared_ptr<NullRouteImpl> route_;
     279             :   uint32_t high_watermark_calls_{};
     280             :   bool local_closed_{};
     281             :   bool remote_closed_{};
     282             :   Buffer::InstancePtr buffered_body_;
     283             :   Buffer::BufferMemoryAccountSharedPtr account_{nullptr};
     284             :   absl::optional<uint32_t> buffer_limit_{absl::nullopt};
     285             :   RequestHeaderMap* request_headers_{};
     286             :   RequestTrailerMap* request_trailers_{};
     287             :   bool encoded_response_headers_{};
     288             :   bool is_grpc_request_{};
     289             :   bool is_head_request_{false};
     290             :   bool send_xff_{true};
     291             : 
     292             :   friend class AsyncClientImpl;
     293             :   friend class AsyncClientImplUnitTest;
     294             : };
     295             : 
     296             : class AsyncRequestSharedImpl : public virtual AsyncClient::Request,
     297             :                                protected AsyncStreamImpl,
     298             :                                protected AsyncClient::StreamCallbacks {
     299             : public:
     300             :   void cancel() final;
     301             : 
     302             : protected:
     303             :   AsyncRequestSharedImpl(AsyncClientImpl& parent, AsyncClient::Callbacks& callbacks,
     304             :                          const AsyncClient::RequestOptions& options);
     305             :   void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) final;
     306             :   void onData(Buffer::Instance& data, bool end_stream) final;
     307             :   void onTrailers(ResponseTrailerMapPtr&& trailers) final;
     308             :   void onComplete() final;
     309             :   void onReset() final;
     310             : 
     311             :   AsyncClient::Callbacks& callbacks_;
     312             :   Tracing::SpanPtr child_span_;
     313             :   std::unique_ptr<ResponseMessageImpl> response_;
     314             :   bool cancelled_{};
     315             : };
     316             : 
     317             : class AsyncOngoingRequestImpl final : public AsyncClient::OngoingRequest,
     318             :                                       public AsyncRequestSharedImpl {
     319             : public:
     320             :   AsyncOngoingRequestImpl(RequestHeaderMapPtr&& request_headers, AsyncClientImpl& parent,
     321             :                           AsyncClient::Callbacks& callbacks,
     322             :                           const AsyncClient::RequestOptions& options)
     323             :       : AsyncRequestSharedImpl(parent, callbacks, options),
     324           0 :         request_headers_(std::move(request_headers)) {
     325           0 :     ASSERT(request_headers_);
     326           0 :   }
     327           0 :   void captureAndSendTrailers(RequestTrailerMapPtr&& trailers) override {
     328           0 :     request_trailers_ = std::move(trailers);
     329           0 :     sendTrailers(*request_trailers_);
     330           0 :   }
     331             : 
     332             : private:
     333             :   void initialize();
     334             : 
     335             :   RequestHeaderMapPtr request_headers_;
     336             :   RequestTrailerMapPtr request_trailers_;
     337             : 
     338             :   friend class AsyncClientImpl;
     339             : };
     340             : 
     341             : class AsyncRequestImpl final : public AsyncRequestSharedImpl {
     342             : public:
     343             :   AsyncRequestImpl(RequestMessagePtr&& request, AsyncClientImpl& parent,
     344             :                    AsyncClient::Callbacks& callbacks, const AsyncClient::RequestOptions& options)
     345           0 :       : AsyncRequestSharedImpl(parent, callbacks, options), request_(std::move(request)) {}
     346             : 
     347             : private:
     348             :   void initialize();
     349             : 
     350             :   // Http::StreamDecoderFilterCallbacks
     351           0 :   void addDecodedData(Buffer::Instance&, bool) override {
     352           0 :     // The request is already fully buffered. Note that this is only called via the async client's
     353           0 :     // internal use of the router filter which uses this function for buffering.
     354           0 :   }
     355           0 :   const Buffer::Instance* decodingBuffer() override { return &request_->body(); }
     356           0 :   void modifyDecodingBuffer(std::function<void(Buffer::Instance&)>) override {}
     357             : 
     358             :   RequestMessagePtr request_;
     359             : 
     360             :   friend class AsyncClientImpl;
     361             : };
     362             : 
     363             : } // namespace Http
     364             : } // namespace Envoy

Generated by: LCOV version 1.15