LCOV - code coverage report
Current view: top level - envoy/http - async_client.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 29 117 24.8 %
Date: 2024-01-05 06:35:25 Functions: 11 33 33.3 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <chrono>
       4             : #include <memory>
       5             : 
       6             : #include "envoy/buffer/buffer.h"
       7             : #include "envoy/config/route/v3/route_components.pb.h"
       8             : #include "envoy/event/dispatcher.h"
       9             : #include "envoy/http/filter.h"
      10             : #include "envoy/http/header_map.h"
      11             : #include "envoy/http/message.h"
      12             : #include "envoy/stream_info/stream_info.h"
      13             : #include "envoy/tracing/tracer.h"
      14             : 
      15             : #include "source/common/protobuf/protobuf.h"
      16             : 
      17             : #include "absl/types/optional.h"
      18             : 
      19             : namespace Envoy {
      20             : namespace Router {
      21             : class FilterConfig;
      22             : }
      23             : namespace Http {
      24             : 
      25             : /**
      26             :  * Supports sending an HTTP request message and receiving a response asynchronously.
      27             :  */
      28             : class AsyncClient {
      29             : public:
      30             :   /**
      31             :    * An in-flight HTTP request.
      32             :    */
      33             :   class Request {
      34             :   public:
      35          23 :     virtual ~Request() = default;
      36             : 
      37             :     /**
      38             :      * Signals that the request should be cancelled.
      39             :      */
      40             :     virtual void cancel() PURE;
      41             :   };
      42             : 
      43             :   /**
      44             :    * Async Client failure reasons.
      45             :    */
      46             :   enum class FailureReason {
      47             :     // The stream has been reset.
      48             :     Reset
      49             :   };
      50             : 
      51             :   /**
      52             :    * Notifies caller of async HTTP request status.
      53             :    *
      54             :    * To support a use case where a caller makes multiple requests in parallel,
      55             :    * individual callback methods provide request context corresponding to that response.
      56             :    */
      57             :   class Callbacks {
      58             :   public:
      59         178 :     virtual ~Callbacks() = default;
      60             : 
      61             :     /**
      62             :      * Called when the async HTTP request succeeds.
      63             :      * @param request  request handle.
      64             :      *                 NOTE: request handle is passed for correlation purposes only, e.g.
      65             :      *                 for client code to be able to exclude that handle from a list of
      66             :      *                 requests in progress.
      67             :      * @param response the HTTP response
      68             :      */
      69             :     virtual void onSuccess(const Request& request, ResponseMessagePtr&& response) PURE;
      70             : 
      71             :     /**
      72             :      * Called when the async HTTP request fails.
      73             :      * @param request request handle.
      74             :      *                NOTE: request handle is passed for correlation purposes only, e.g.
      75             :      *                for client code to be able to exclude that handle from a list of
      76             :      *                requests in progress.
      77             :      * @param reason  failure reason
      78             :      */
      79             :     virtual void onFailure(const Request& request, FailureReason reason) PURE;
      80             : 
      81             :     /**
      82             :      * Called before finalizing upstream span when the request is complete or reset.
      83             :      * @param span a tracing span to fill with extra tags.
      84             :      * @param response_headers the response headers.
      85             :      */
      86             :     virtual void onBeforeFinalizeUpstreamSpan(Envoy::Tracing::Span& span,
      87             :                                               const Http::ResponseHeaderMap* response_headers) PURE;
      88             :   };
      89             : 
      90             :   /**
      91             :    * Notifies caller of async HTTP stream status.
      92             :    * Note the HTTP stream is full-duplex, even if the local to remote stream has been ended
      93             :    * by Stream.sendHeaders/sendData with end_stream=true or sendTrailers,
      94             :    * StreamCallbacks can continue to receive events until the remote to local stream is closed,
      95             :    * and vice versa.
      96             :    */
      97             :   class StreamCallbacks {
      98             :   public:
      99          68 :     virtual ~StreamCallbacks() = default;
     100             : 
     101             :     /**
     102             :      * Called when all headers get received on the async HTTP stream.
     103             :      * @param headers the headers received
     104             :      * @param end_stream whether the response is header only
     105             :      */
     106             :     virtual void onHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) PURE;
     107             : 
     108             :     /**
     109             :      * Called when a data frame get received on the async HTTP stream.
     110             :      * This can be invoked multiple times if the data get streamed.
     111             :      * @param data the data received
     112             :      * @param end_stream whether the data is the last data frame
     113             :      */
     114             :     virtual void onData(Buffer::Instance& data, bool end_stream) PURE;
     115             : 
     116             :     /**
     117             :      * Called when all trailers get received on the async HTTP stream.
     118             :      * @param trailers the trailers received.
     119             :      */
     120             :     virtual void onTrailers(ResponseTrailerMapPtr&& trailers) PURE;
     121             : 
     122             :     /**
     123             :      * Called when both the local and remote have gracefully closed the stream.
     124             :      * Useful for asymmetric cases where end_stream may not be bidirectionally observable.
     125             :      * Note this is NOT called on stream reset.
     126             :      */
     127             :     virtual void onComplete() PURE;
     128             : 
     129             :     /**
     130             :      * Called when the async HTTP stream is reset.
     131             :      */
     132             :     virtual void onReset() PURE;
     133             :   };
     134             : 
     135             :   using StreamDestructorCallbacks = std::function<void()>;
     136             : 
     137             :   /**
     138             :    * An in-flight HTTP stream.
     139             :    */
     140             :   class Stream {
     141             :   public:
     142          68 :     virtual ~Stream() = default;
     143             : 
     144             :     /***
     145             :      * Send headers to the stream. This method cannot be invoked more than once and
     146             :      * need to be called before sendData.
     147             :      * @param headers supplies the headers to send.
     148             :      * @param end_stream supplies whether this is a header only request.
     149             :      */
     150             :     virtual void sendHeaders(RequestHeaderMap& headers, bool end_stream) PURE;
     151             : 
     152             :     /***
     153             :      * Send data to the stream. This method can be invoked multiple times if it get streamed.
     154             :      * To end the stream without data, call this method with empty buffer.
     155             :      * @param data supplies the data to send.
     156             :      * @param end_stream supplies whether this is the last data.
     157             :      */
     158             :     virtual void sendData(Buffer::Instance& data, bool end_stream) PURE;
     159             : 
     160             :     /***
     161             :      * Send trailers. This method cannot be invoked more than once, and implicitly ends the stream.
     162             :      * @param trailers supplies the trailers to send.
     163             :      */
     164             :     virtual void sendTrailers(RequestTrailerMap& trailers) PURE;
     165             : 
     166             :     /***
     167             :      * Reset the stream.
     168             :      */
     169             :     virtual void reset() PURE;
     170             : 
     171             :     /***
     172             :      * Register callback to be called on stream destruction. This callback must persist beyond the
     173             :      * lifetime of the stream or be unregistered via removeDestructorCallback. If there's already a
     174             :      * destructor callback registered, this method will ASSERT-fail.
     175             :      */
     176             :     virtual void setDestructorCallback(StreamDestructorCallbacks callback) PURE;
     177             : 
     178             :     /***
     179             :      * Remove previously set destructor callback. Calling this without having previously set a
     180             :      * Destructor callback will ASSERT-fail.
     181             :      */
     182             :     virtual void removeDestructorCallback() PURE;
     183             : 
     184             :     /***
     185             :      * Register a callback to be called when high/low write buffer watermark events occur on the
     186             :      * stream. This callback must persist beyond the lifetime of the stream or be unregistered via
     187             :      * removeWatermarkCallbacks. If there's already a watermark callback registered, this method
     188             :      * will ASSERT-fail.
     189             :      */
     190             :     virtual void setWatermarkCallbacks(DecoderFilterWatermarkCallbacks& callbacks) PURE;
     191             : 
     192             :     /***
     193             :      * Remove previously set watermark callbacks.
     194             :      */
     195             :     virtual void removeWatermarkCallbacks() PURE;
     196             : 
     197             :     /***
     198             :      * @returns if the stream has enough buffered outbound data to be over the configured buffer
     199             :      * limits
     200             :      */
     201             :     virtual bool isAboveWriteBufferHighWatermark() const PURE;
     202             : 
     203             :     /***
     204             :      * @returns the stream info object associated with the stream.
     205             :      */
     206             :     virtual const StreamInfo::StreamInfo& streamInfo() const PURE;
     207             :   };
     208             : 
     209             :   /***
     210             :    * An in-flight HTTP request to which additional data and trailers can be sent, as well as resets
     211             :    * and other stream-cancelling events. Must be terminated by sending trailers or data with
     212             :    * end_stream.
     213             :    */
     214             :   class OngoingRequest : public virtual Request, public virtual Stream {
     215             :   public:
     216             :     /***
     217             :      * Takes ownership of trailers, and sends it to the underlying stream.
     218             :      * @param trailers owned trailers to pass to upstream.
     219             :      */
     220             :     virtual void captureAndSendTrailers(RequestTrailerMapPtr&& trailers) PURE;
     221             :   };
     222             : 
     223         241 :   virtual ~AsyncClient() = default;
     224             : 
     225             :   /**
     226             :    * A context from the caller of an async client.
     227             :    */
     228             :   struct ParentContext {
     229             :     const StreamInfo::StreamInfo* stream_info{nullptr};
     230             :   };
     231             : 
     232             :   /**
     233             :    * A structure to hold the options for AsyncStream object.
     234             :    */
     235             :   struct StreamOptions {
     236           0 :     StreamOptions& setTimeout(const absl::optional<std::chrono::milliseconds>& v) {
     237           0 :       timeout = v;
     238           0 :       return *this;
     239           0 :     }
     240           3 :     StreamOptions& setTimeout(const std::chrono::milliseconds& v) {
     241           3 :       timeout = v;
     242           3 :       return *this;
     243           3 :     }
     244          68 :     StreamOptions& setBufferBodyForRetry(bool v) {
     245          68 :       buffer_body_for_retry = v;
     246          68 :       return *this;
     247          68 :     }
     248           0 :     StreamOptions& setSendXff(bool v) {
     249           0 :       send_xff = v;
     250           0 :       return *this;
     251           0 :     }
     252             :     StreamOptions& setHashPolicy(
     253           0 :         const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>& v) {
     254           0 :       hash_policy = v;
     255           0 :       return *this;
     256           0 :     }
     257          40 :     StreamOptions& setParentContext(const ParentContext& v) {
     258          40 :       parent_context = v;
     259          40 :       return *this;
     260          40 :     }
     261             :     // Set dynamic metadata of async stream. If a metadata record with filter name 'envoy.lb' is
     262             :     // provided, metadata match criteria of async stream route will be overridden by the metadata
     263             :     // and then used by the subset load balancer.
     264           0 :     StreamOptions& setMetadata(const envoy::config::core::v3::Metadata& m) {
     265           0 :       metadata = m;
     266           0 :       return *this;
     267           0 :     }
     268             : 
     269             :     // Set buffer restriction and accounting for the stream.
     270           0 :     StreamOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) {
     271           0 :       account_ = account;
     272           0 :       return *this;
     273           0 :     }
     274           0 :     StreamOptions& setBufferLimit(uint32_t limit) {
     275           0 :       buffer_limit_ = limit;
     276           0 :       return *this;
     277           0 :     }
     278             : 
     279             :     // this should be done with setBufferedBodyForRetry=true ?
     280           0 :     StreamOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) {
     281           0 :       retry_policy = p;
     282           0 :       return *this;
     283           0 :     }
     284           0 :     StreamOptions& setFilterConfig(Router::FilterConfig& config) {
     285           0 :       filter_config_ = config;
     286           0 :       return *this;
     287           0 :     }
     288             : 
     289           0 :     StreamOptions& setIsShadow(bool s) {
     290           0 :       is_shadow = s;
     291           0 :       return *this;
     292           0 :     }
     293             : 
     294             :     // For gmock test
     295           0 :     bool operator==(const StreamOptions& src) const {
     296           0 :       return timeout == src.timeout && buffer_body_for_retry == src.buffer_body_for_retry &&
     297           0 :              send_xff == src.send_xff;
     298           0 :     }
     299             : 
     300             :     // The timeout supplies the stream timeout, measured since when the frame with
     301             :     // end_stream flag is sent until when the first frame is received.
     302             :     absl::optional<std::chrono::milliseconds> timeout;
     303             : 
     304             :     // The buffer_body_for_retry specifies whether the streamed body will be buffered so that
     305             :     // it can be retried. In general, this should be set to false for a true stream. However,
     306             :     // streaming is also used in certain cases such as gRPC unary calls, where retry can
     307             :     // still be useful.
     308             :     bool buffer_body_for_retry{false};
     309             : 
     310             :     // If true, x-forwarded-for header will be added.
     311             :     bool send_xff{true};
     312             : 
     313             :     // Provides the hash policy for hashing load balancing strategies.
     314             :     Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy> hash_policy;
     315             : 
     316             :     // Provides parent context. Currently, this holds stream info from the caller.
     317             :     ParentContext parent_context;
     318             : 
     319             :     envoy::config::core::v3::Metadata metadata;
     320             : 
     321             :     // Buffer memory account for tracking bytes.
     322             :     Buffer::BufferMemoryAccountSharedPtr account_{nullptr};
     323             : 
     324             :     absl::optional<uint32_t> buffer_limit_;
     325             : 
     326             :     absl::optional<envoy::config::route::v3::RetryPolicy> retry_policy;
     327             : 
     328             :     OptRef<Router::FilterConfig> filter_config_;
     329             : 
     330             :     bool is_shadow{false};
     331             :   };
     332             : 
     333             :   /**
     334             :    * A structure to hold the options for AsyncRequest object.
     335             :    */
     336             :   struct RequestOptions : public StreamOptions {
     337           0 :     RequestOptions& setTimeout(const absl::optional<std::chrono::milliseconds>& v) {
     338           0 :       StreamOptions::setTimeout(v);
     339           0 :       return *this;
     340           0 :     }
     341           3 :     RequestOptions& setTimeout(const std::chrono::milliseconds& v) {
     342           3 :       StreamOptions::setTimeout(v);
     343           3 :       return *this;
     344           3 :     }
     345           0 :     RequestOptions& setBufferBodyForRetry(bool v) {
     346           0 :       StreamOptions::setBufferBodyForRetry(v);
     347           0 :       return *this;
     348           0 :     }
     349           0 :     RequestOptions& setSendXff(bool v) {
     350           0 :       StreamOptions::setSendXff(v);
     351           0 :       return *this;
     352           0 :     }
     353             :     RequestOptions& setHashPolicy(
     354           0 :         const Protobuf::RepeatedPtrField<envoy::config::route::v3::RouteAction::HashPolicy>& v) {
     355           0 :       StreamOptions::setHashPolicy(v);
     356           0 :       return *this;
     357           0 :     }
     358           0 :     RequestOptions& setParentContext(const ParentContext& v) {
     359           0 :       StreamOptions::setParentContext(v);
     360           0 :       return *this;
     361           0 :     }
     362           0 :     RequestOptions& setMetadata(const envoy::config::core::v3::Metadata& m) {
     363           0 :       StreamOptions::setMetadata(m);
     364           0 :       return *this;
     365           0 :     }
     366           0 :     RequestOptions& setRetryPolicy(const envoy::config::route::v3::RetryPolicy& p) {
     367           0 :       StreamOptions::setRetryPolicy(p);
     368           0 :       return *this;
     369           0 :     }
     370           0 :     RequestOptions& setIsShadow(bool s) {
     371           0 :       StreamOptions::setIsShadow(s);
     372           0 :       return *this;
     373           0 :     }
     374           3 :     RequestOptions& setParentSpan(Tracing::Span& parent_span) {
     375           3 :       parent_span_ = &parent_span;
     376           3 :       return *this;
     377           3 :     }
     378           3 :     RequestOptions& setChildSpanName(const std::string& child_span_name) {
     379           3 :       child_span_name_ = child_span_name;
     380           3 :       return *this;
     381           3 :     }
     382           0 :     RequestOptions& setSampled(absl::optional<bool> sampled) {
     383           0 :       sampled_ = sampled;
     384           0 :       return *this;
     385           0 :     }
     386           0 :     RequestOptions& setBufferAccount(const Buffer::BufferMemoryAccountSharedPtr& account) {
     387           0 :       account_ = account;
     388           0 :       return *this;
     389           0 :     }
     390           0 :     RequestOptions& setBufferLimit(uint32_t limit) {
     391           0 :       buffer_limit_ = limit;
     392           0 :       return *this;
     393           0 :     }
     394             : 
     395             :     // For gmock test
     396           0 :     bool operator==(const RequestOptions& src) const {
     397           0 :       return StreamOptions::operator==(src) && parent_span_ == src.parent_span_ &&
     398           0 :              child_span_name_ == src.child_span_name_ && sampled_ == src.sampled_;
     399           0 :     }
     400             : 
     401             :     // The parent span that child spans are created under to trace egress requests/responses.
     402             :     // If not set, requests will not be traced.
     403             :     Tracing::Span* parent_span_{nullptr};
     404             :     // The name to give to the child span that represents the async http request.
     405             :     // If left empty and parent_span_ is set, then the default name will have the cluster name.
     406             :     // Only used if parent_span_ is set.
     407             :     std::string child_span_name_{""};
     408             :     // Sampling decision for the tracing span. The span is sampled by default.
     409             :     absl::optional<bool> sampled_{true};
     410             :   };
     411             : 
     412             :   /**
     413             :    * Send an HTTP request asynchronously
     414             :    * @param request the request to send.
     415             :    * @param callbacks the callbacks to be notified of request status.
     416             :    * @param options the data struct to control the request sending.
     417             :    * @return a request handle or nullptr if no request could be created. NOTE: In this case
     418             :    *         onFailure() has already been called inline. The client owns the request and the
     419             :    *         handle should just be used to cancel.
     420             :    */
     421             : 
     422             :   virtual Request* send(RequestMessagePtr&& request, Callbacks& callbacks,
     423             :                         const RequestOptions& options) PURE;
     424             : 
     425             :   /**
     426             :    * Starts a new OngoingRequest asynchronously with the given headers.
     427             :    *
     428             :    * @param request_headers headers to send.
     429             :    * @param callbacks the callbacks to be notified of request status.
     430             :    * @param options the data struct to control the request sending.
     431             :    * @return a request handle or nullptr if no request could be created. See note attached to
     432             :    * `send`. Calling startRequest will not trigger end stream. For header-only requests, `send`
     433             :    * should be called instead.
     434             :    */
     435             :   virtual OngoingRequest* startRequest(RequestHeaderMapPtr&& request_headers, Callbacks& callbacks,
     436             :                                        const RequestOptions& options) PURE;
     437             : 
     438             :   /**
     439             :    * Start an HTTP stream asynchronously, without an associated HTTP request.
     440             :    * @param callbacks the callbacks to be notified of stream status.
     441             :    * @param options the data struct to control the stream.
     442             :    * @return a stream handle or nullptr if no stream could be started. NOTE: In this case
     443             :    *         onResetStream() has already been called inline. The client owns the stream and
     444             :    *         the handle can be used to send more messages or close the stream.
     445             :    */
     446             :   virtual Stream* start(StreamCallbacks& callbacks, const StreamOptions& options) PURE;
     447             : 
     448             :   /**
     449             :    * @return Event::Dispatcher& the dispatcher backing this client.
     450             :    */
     451             :   virtual Event::Dispatcher& dispatcher() PURE;
     452             : };
     453             : 
     454             : using AsyncClientPtr = std::unique_ptr<AsyncClient>;
     455             : 
     456             : } // namespace Http
     457             : } // namespace Envoy

Generated by: LCOV version 1.15