LCOV - code coverage report
Current view: top level - source/common/http - codec_client.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 70 92 76.1 %
Date: 2024-01-05 06:35:25 Functions: 26 36 72.2 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <cstdint>
       4             : #include <list>
       5             : #include <memory>
       6             : 
       7             : #include "envoy/common/random_generator.h"
       8             : #include "envoy/event/deferred_deletable.h"
       9             : #include "envoy/event/timer.h"
      10             : #include "envoy/http/codec.h"
      11             : #include "envoy/http/header_validator.h"
      12             : #include "envoy/network/connection.h"
      13             : #include "envoy/network/filter.h"
      14             : #include "envoy/upstream/upstream.h"
      15             : 
      16             : #include "source/common/common/assert.h"
      17             : #include "source/common/common/linked_object.h"
      18             : #include "source/common/common/logger.h"
      19             : #include "source/common/http/codec_wrappers.h"
      20             : #include "source/common/network/filter_impl.h"
      21             : #include "source/common/runtime/runtime_features.h"
      22             : 
      23             : namespace Envoy {
      24             : namespace Http {
      25             : 
      26             : /**
      27             :  * Callbacks specific to a codec client.
      28             :  */
      29             : class CodecClientCallbacks {
      30             : public:
      31         145 :   virtual ~CodecClientCallbacks() = default;
      32             : 
      33             :   // Called in onPreDecodeComplete
      34         105 :   virtual void onStreamPreDecodeComplete() {}
      35             : 
      36             :   /**
      37             :    * Called every time an owned stream is destroyed, whether complete or not.
      38             :    */
      39             :   virtual void onStreamDestroy() PURE;
      40             : 
      41             :   /**
      42             :    * Called when a stream is reset by the client.
      43             :    * @param reason supplies the reset reason.
      44             :    */
      45             :   virtual void onStreamReset(StreamResetReason reason) PURE;
      46             : };
      47             : 
      48             : /**
      49             :  * This is an HTTP client that multiple stream management and underlying connection management
      50             :  * across multiple HTTP codec types.
      51             :  */
      52             : class CodecClient : protected Logger::Loggable<Logger::Id::client>,
      53             :                     public Http::ConnectionCallbacks,
      54             :                     public Network::ConnectionCallbacks,
      55             :                     public Event::DeferredDeletable {
      56             : public:
      57             :   /**
      58             :    * Type of HTTP codec to use.
      59             :    */
      60             :   // This is a legacy alias.
      61             :   using Type = Envoy::Http::CodecType;
      62             : 
      63             :   /**
      64             :    * Add a connection callback to the underlying network connection.
      65             :    */
      66         227 :   void addConnectionCallbacks(Network::ConnectionCallbacks& cb) {
      67         227 :     connection_->addConnectionCallbacks(cb);
      68         227 :   }
      69             : 
      70             :   /**
      71             :    * Return if half-close semantics are enabled on the underlying connection.
      72             :    */
      73           0 :   bool isHalfCloseEnabled() { return connection_->isHalfCloseEnabled(); }
      74             : 
      75             :   /**
      76             :    * Initialize all of the installed read filters on the underlying connection.
      77             :    * This effectively calls onNewConnection() on each of them.
      78             :    */
      79         165 :   void initializeReadFilters() { connection_->initializeReadFilters(); }
      80             : 
      81             :   /**
      82             :    * Close the underlying network connection. This is immediate and will not attempt to flush any
      83             :    * pending write data.
      84             :    */
      85             :   void close(Network::ConnectionCloseType type = Network::ConnectionCloseType::NoFlush);
      86             : 
      87             :   /**
      88             :    * Send a codec level go away indication to the peer.
      89             :    */
      90           0 :   void goAway() { codec_->goAway(); }
      91             : 
      92             :   /**
      93             :    * @return the underlying connection ID.
      94             :    */
      95           0 :   uint64_t id() const { return connection_->id(); }
      96             : 
      97             :   /**
      98             :    * @return the underlying codec protocol.
      99             :    */
     100         649 :   Protocol protocol() { return codec_->protocol(); }
     101             : 
     102             :   /**
     103             :    * @return the underlying connection error.
     104             :    */
     105         338 :   absl::string_view connectionFailureReason() { return connection_->transportFailureReason(); }
     106             : 
     107             :   /**
     108             :    * @return size_t the number of outstanding requests that have not completed or been reset.
     109             :    */
     110        1365 :   size_t numActiveRequests() { return active_requests_.size(); }
     111             : 
     112             :   /**
     113             :    * Create a new stream. Note: The CodecClient will NOT buffer multiple requests for HTTP1
     114             :    * connections. Thus, calling newStream() before the previous request has been fully encoded
     115             :    * is an error. Pipelining is supported however.
     116             :    * @param response_decoder supplies the decoder to use for response callbacks.
     117             :    * @return StreamEncoder& the encoder to use for encoding the request.
     118             :    */
     119             :   RequestEncoder& newStream(ResponseDecoder& response_decoder);
     120             : 
     121         173 :   void setConnectionStats(const Network::Connection::ConnectionStats& stats) {
     122         173 :     connection_->setConnectionStats(stats);
     123         173 :   }
     124             : 
     125         145 :   void setCodecClientCallbacks(CodecClientCallbacks& callbacks) {
     126         145 :     codec_client_callbacks_ = &callbacks;
     127         145 :   }
     128             : 
     129         199 :   void setCodecConnectionCallbacks(Http::ConnectionCallbacks& callbacks) {
     130         199 :     codec_callbacks_ = &callbacks;
     131         199 :   }
     132             : 
     133          33 :   bool remoteClosed() const { return remote_closed_; }
     134             : 
     135          40 :   CodecType type() const { return type_; }
     136             : 
     137             :   // Note this is the L4 stream info, not L7.
     138         202 :   StreamInfo::StreamInfo& streamInfo() { return connection_->streamInfo(); }
     139             : 
     140             :   /**
     141             :    * Connect to the host.
     142             :    * Needs to be called after codec_ is instantiated.
     143             :    */
     144             :   void connect();
     145             : 
     146             : protected:
     147             :   /**
     148             :    * Create a codec client and connect to a remote host/port.
     149             :    * @param type supplies the codec type.
     150             :    * @param connection supplies the connection to communicate on.
     151             :    * @param host supplies the owning host.
     152             :    */
     153             :   CodecClient(CodecType type, Network::ClientConnectionPtr&& connection,
     154             :               Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher);
     155             : 
     156             :   // Http::ConnectionCallbacks
     157          11 :   void onGoAway(GoAwayErrorCode error_code) override {
     158          11 :     if (codec_callbacks_) {
     159          11 :       codec_callbacks_->onGoAway(error_code);
     160          11 :     }
     161          11 :   }
     162         108 :   void onSettings(ReceivedSettings& settings) override {
     163         108 :     if (codec_callbacks_) {
     164         108 :       codec_callbacks_->onSettings(settings);
     165         108 :     }
     166         108 :   }
     167           0 :   void onMaxStreamsChanged(uint32_t num_streams) override {
     168           0 :     if (codec_callbacks_) {
     169           0 :       codec_callbacks_->onMaxStreamsChanged(num_streams);
     170           0 :     }
     171           0 :   }
     172             : 
     173           0 :   void onIdleTimeout() {
     174           0 :     host_->cluster().trafficStats()->upstream_cx_idle_timeout_.inc();
     175           0 :     close();
     176           0 :   }
     177             : 
     178         763 :   void disableIdleTimer() {
     179         763 :     if (idle_timer_ != nullptr) {
     180         375 :       idle_timer_->disableTimer();
     181         375 :     }
     182         763 :   }
     183             : 
     184         571 :   void enableIdleTimer() {
     185         571 :     if (idle_timer_ != nullptr) {
     186         295 :       idle_timer_->enableTimer(idle_timeout_.value());
     187         295 :     }
     188         571 :   }
     189             : 
     190             :   const CodecType type_;
     191             :   // The order of host_, connection_, and codec_ matter as during destruction each can refer to
     192             :   // the previous, at least in tests.
     193             :   Upstream::HostDescriptionConstSharedPtr host_;
     194             :   Network::ClientConnectionPtr connection_;
     195             :   ClientConnectionPtr codec_;
     196             :   Event::TimerPtr idle_timer_;
     197             :   const absl::optional<std::chrono::milliseconds> idle_timeout_;
     198             : 
     199             : private:
     200             :   /**
     201             :    * Wrapper read filter to drive incoming connection data into the codec. We could potentially
     202             :    * support other filters in the future.
     203             :    */
     204             :   struct CodecReadFilter : public Network::ReadFilterBaseImpl {
     205         365 :     CodecReadFilter(CodecClient& parent) : parent_(parent) {}
     206             : 
     207             :     // Network::ReadFilter
     208         675 :     Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
     209         675 :       parent_.onData(data);
     210         675 :       if (end_stream && parent_.isHalfCloseEnabled()) {
     211             :         // Note that this results in the connection closed as if it was closed
     212             :         // locally, it would be more correct to convey the end stream to the
     213             :         // response decoder, but it would require some refactoring.
     214           0 :         parent_.close();
     215           0 :       }
     216         675 :       return Network::FilterStatus::StopIteration;
     217         675 :     }
     218             : 
     219             :     CodecClient& parent_;
     220             :   };
     221             : 
     222             :   struct ActiveRequest;
     223             : 
     224             :   /**
     225             :    * Wrapper for an outstanding request. Designed for handling stream multiplexing.
     226             :    */
     227             :   struct ActiveRequest : LinkedObject<ActiveRequest>,
     228             :                          public Event::DeferredDeletable,
     229             :                          public StreamCallbacks,
     230             :                          public ResponseDecoderWrapper,
     231             :                          public RequestEncoderWrapper {
     232             :     ActiveRequest(CodecClient& parent, ResponseDecoder& inner)
     233             :         : ResponseDecoderWrapper(inner), RequestEncoderWrapper(nullptr), parent_(parent),
     234             :           header_validator_(
     235         398 :               parent.host_->cluster().makeHeaderValidator(parent.codec_->protocol())) {
     236         398 :       switch (parent.protocol()) {
     237          26 :       case Protocol::Http10:
     238         210 :       case Protocol::Http11:
     239             :         // HTTP/1.1 codec does not support half-close on the response completion.
     240         210 :         wait_encode_complete_ = false;
     241         210 :         break;
     242         188 :       case Protocol::Http2:
     243         188 :       case Protocol::Http3:
     244         188 :         wait_encode_complete_ = true;
     245         188 :         break;
     246         398 :       }
     247         398 :     }
     248             : 
     249             :     void decodeHeaders(ResponseHeaderMapPtr&& headers, bool end_stream) override;
     250             : 
     251             :     // StreamCallbacks
     252         163 :     void onResetStream(StreamResetReason reason, absl::string_view) override {
     253         163 :       parent_.onReset(*this, reason);
     254         163 :     }
     255           0 :     void onAboveWriteBufferHighWatermark() override {}
     256           0 :     void onBelowWriteBufferLowWatermark() override {}
     257             : 
     258             :     // StreamDecoderWrapper
     259         260 :     void onPreDecodeComplete() override { parent_.responsePreDecodeComplete(*this); }
     260         260 :     void onDecodeComplete() override {}
     261             : 
     262             :     // RequestEncoderWrapper
     263         343 :     void onEncodeComplete() override { parent_.requestEncodeComplete(*this); }
     264             : 
     265             :     // RequestEncoder
     266             :     Status encodeHeaders(const RequestHeaderMap& headers, bool end_stream) override;
     267             : 
     268         398 :     void setEncoder(RequestEncoder& encoder) {
     269         398 :       inner_encoder_ = &encoder;
     270         398 :       inner_encoder_->getStream().addCallbacks(*this);
     271         398 :     }
     272             : 
     273         235 :     void removeEncoderCallbacks() { inner_encoder_->getStream().removeCallbacks(*this); }
     274             : 
     275             :     CodecClient& parent_;
     276             :     Http::ClientHeaderValidatorPtr header_validator_;
     277             :     bool wait_encode_complete_{true};
     278             :     bool encode_complete_{false};
     279             :     bool decode_complete_{false};
     280             :   };
     281             : 
     282             :   using ActiveRequestPtr = std::unique_ptr<ActiveRequest>;
     283             : 
     284             :   /**
     285             :    * Called when a response finishes decoding. This is called *before* forwarding on to the
     286             :    * wrapped decoder.
     287             :    */
     288             :   void responsePreDecodeComplete(ActiveRequest& request);
     289             :   void requestEncodeComplete(ActiveRequest& request);
     290             :   void completeRequest(ActiveRequest& request);
     291             : 
     292             :   void deleteRequest(ActiveRequest& request);
     293             :   void onReset(ActiveRequest& request, StreamResetReason reason);
     294             :   void onData(Buffer::Instance& data);
     295             : 
     296             :   // Network::ConnectionCallbacks
     297             :   void onEvent(Network::ConnectionEvent event) override;
     298             :   // Pass watermark events from the connection on to the codec which will pass it to the underlying
     299             :   // streams.
     300           0 :   void onAboveWriteBufferHighWatermark() override {
     301           0 :     codec_->onUnderlyingConnectionAboveWriteBufferHighWatermark();
     302           0 :   }
     303           0 :   void onBelowWriteBufferLowWatermark() override {
     304           0 :     codec_->onUnderlyingConnectionBelowWriteBufferLowWatermark();
     305           0 :   }
     306             : 
     307             :   std::list<ActiveRequestPtr> active_requests_;
     308             :   Http::ConnectionCallbacks* codec_callbacks_{};
     309             :   CodecClientCallbacks* codec_client_callbacks_{};
     310             :   bool connected_{};
     311             :   bool remote_closed_{};
     312             :   bool protocol_error_{false};
     313             :   bool connect_called_{false};
     314             : };
     315             : 
     316             : using CodecClientPtr = std::unique_ptr<CodecClient>;
     317             : 
     318             : /**
     319             :  * Production implementation that installs a real codec without automatically connecting.
     320             :  * TODO(danzh) deprecate this class and make CodecClientProd to have the option to defer connect
     321             :  * once "envoy.reloadable_features.postpone_h3_client_connect_to_next_loop" is deprecated.
     322             :  */
     323             : class NoConnectCodecClientProd : public CodecClient {
     324             : public:
     325             :   NoConnectCodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection,
     326             :                            Upstream::HostDescriptionConstSharedPtr host,
     327             :                            Event::Dispatcher& dispatcher, Random::RandomGenerator& random_generator,
     328             :                            const Network::TransportSocketOptionsConstSharedPtr& options);
     329             : };
     330             : 
     331             : /**
     332             :  * Production implementation that installs a real codec.
     333             :  */
     334             : class CodecClientProd : public NoConnectCodecClientProd {
     335             : public:
     336             :   CodecClientProd(CodecType type, Network::ClientConnectionPtr&& connection,
     337             :                   Upstream::HostDescriptionConstSharedPtr host, Event::Dispatcher& dispatcher,
     338             :                   Random::RandomGenerator& random_generator,
     339             :                   const Network::TransportSocketOptionsConstSharedPtr& options);
     340             : };
     341             : 
     342             : } // namespace Http
     343             : } // namespace Envoy

Generated by: LCOV version 1.15