LCOV - code coverage report
Current view: top level - source/common/quic - envoy_quic_stream.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 19 103 18.4 %
Date: 2024-01-05 06:35:25 Functions: 5 21 23.8 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <memory>
       4             : 
       5             : #include "envoy/buffer/buffer.h"
       6             : #include "envoy/config/core/v3/protocol.pb.h"
       7             : #include "envoy/event/dispatcher.h"
       8             : #include "envoy/http/codec.h"
       9             : 
      10             : #include "source/common/http/codec_helper.h"
      11             : #include "source/common/quic/envoy_quic_simulated_watermark_buffer.h"
      12             : #include "source/common/quic/envoy_quic_utils.h"
      13             : #include "source/common/quic/quic_filter_manager_connection_impl.h"
      14             : #include "source/common/quic/send_buffer_monitor.h"
      15             : 
      16             : #include "quiche/http2/adapter/header_validator.h"
      17             : 
      18             : namespace Envoy {
      19             : namespace Quic {
      20             : 
      21             : // Base class for EnvoyQuicServer|ClientStream.
      22             : class EnvoyQuicStream : public virtual Http::StreamEncoder,
      23             :                         public Http::MultiplexedStreamImplBase,
      24             :                         public SendBufferMonitor,
      25             :                         public HeaderValidator,
      26             :                         protected Logger::Loggable<Logger::Id::quic_stream> {
      27             : public:
      28             :   // |buffer_limit| is the high watermark of the stream send buffer, and the low
      29             :   // watermark will be half of it.
      30             :   EnvoyQuicStream(uint32_t buffer_limit, QuicFilterManagerConnectionImpl& filter_manager_connection,
      31             :                   std::function<void()> below_low_watermark,
      32             :                   std::function<void()> above_high_watermark, Http::Http3::CodecStats& stats,
      33             :                   const envoy::config::core::v3::Http3ProtocolOptions& http3_options)
      34             :       : Http::MultiplexedStreamImplBase(filter_manager_connection.dispatcher()), stats_(stats),
      35             :         http3_options_(http3_options),
      36             :         send_buffer_simulation_(buffer_limit / 2, buffer_limit, std::move(below_low_watermark),
      37             :                                 std::move(above_high_watermark), ENVOY_LOGGER()),
      38             :         filter_manager_connection_(filter_manager_connection),
      39             :         async_stream_blockage_change_(
      40             :             filter_manager_connection.dispatcher().createSchedulableCallback(
      41         232 :                 [this]() { switchStreamBlockState(); })) {}
      42             : 
      43         232 :   ~EnvoyQuicStream() override = default;
      44             : 
      45             :   // Http::StreamEncoder
      46           0 :   Stream& getStream() override { return *this; }
      47             : 
      48             :   // Http::Stream
      49           0 :   void readDisable(bool disable) override {
      50           0 :     bool status_changed{false};
      51           0 :     if (disable) {
      52           0 :       ++read_disable_counter_;
      53           0 :       if (read_disable_counter_ == 1) {
      54           0 :         status_changed = true;
      55           0 :       }
      56           0 :     } else {
      57           0 :       ASSERT(read_disable_counter_ > 0);
      58           0 :       --read_disable_counter_;
      59           0 :       if (read_disable_counter_ == 0) {
      60           0 :         status_changed = true;
      61           0 :       }
      62           0 :     }
      63             : 
      64           0 :     if (!status_changed) {
      65           0 :       return;
      66           0 :     }
      67             : 
      68             :     // If the status transiently changed from unblocked to blocked and then unblocked, the quic
      69             :     // stream will be spuriously unblocked and call OnDataAvailable(). This call shouldn't take any
      70             :     // effect because any available data should have been processed already upon arrival or they
      71             :     // were blocked by some condition other than flow control, i.e. Qpack decoding.
      72           0 :     async_stream_blockage_change_->scheduleCallbackNextIteration();
      73           0 :   }
      74             : 
      75           0 :   void addCallbacks(Http::StreamCallbacks& callbacks) override {
      76           0 :     ASSERT(!local_end_stream_);
      77           0 :     addCallbacksHelper(callbacks);
      78           0 :   }
      79           0 :   void removeCallbacks(Http::StreamCallbacks& callbacks) override {
      80           0 :     removeCallbacksHelper(callbacks);
      81           0 :   }
      82           0 :   uint32_t bufferLimit() const override { return send_buffer_simulation_.highWatermark(); }
      83           0 :   const Network::ConnectionInfoProvider& connectionInfoProvider() override {
      84           0 :     return connection()->connectionInfoProvider();
      85           0 :   }
      86             : 
      87           0 :   Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; }
      88             : 
      89           0 :   void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override {
      90           0 :     buffer_memory_account_ = account;
      91           0 :   }
      92             : 
      93             :   // SendBufferMonitor
      94           0 :   void updateBytesBuffered(uint64_t old_buffered_bytes, uint64_t new_buffered_bytes) override {
      95           0 :     if (new_buffered_bytes == old_buffered_bytes) {
      96           0 :       return;
      97           0 :     }
      98             :     // If buffered bytes changed, update stream and session's watermark book
      99             :     // keeping.
     100           0 :     if (new_buffered_bytes > old_buffered_bytes) {
     101           0 :       send_buffer_simulation_.checkHighWatermark(new_buffered_bytes);
     102           0 :     } else {
     103           0 :       send_buffer_simulation_.checkLowWatermark(new_buffered_bytes);
     104           0 :     }
     105           0 :     filter_manager_connection_.updateBytesBuffered(old_buffered_bytes, new_buffered_bytes);
     106           0 :   }
     107             : 
     108             :   Http::HeaderUtility::HeaderValidationResult
     109          20 :   validateHeader(absl::string_view header_name, absl::string_view header_value) override {
     110          20 :     bool override_stream_error_on_invalid_http_message =
     111          20 :         http3_options_.override_stream_error_on_invalid_http_message().value();
     112          20 :     if (header_validator_.ValidateSingleHeader(header_name, header_value) !=
     113          20 :         http2::adapter::HeaderValidator::HEADER_OK) {
     114           0 :       close_connection_upon_invalid_header_ = !override_stream_error_on_invalid_http_message;
     115           0 :       return Http::HeaderUtility::HeaderValidationResult::REJECT;
     116           0 :     }
     117          20 :     if (header_name == "content-length") {
     118           0 :       size_t content_length = 0;
     119           0 :       Http::HeaderUtility::HeaderValidationResult result =
     120           0 :           Http::HeaderUtility::validateContentLength(
     121           0 :               header_value, override_stream_error_on_invalid_http_message,
     122           0 :               close_connection_upon_invalid_header_, content_length);
     123           0 :       content_length_ = content_length;
     124           0 :       return result;
     125           0 :     }
     126          20 :     ASSERT(!header_name.empty());
     127          20 :     if (Http::HeaderUtility::isPseudoHeader(header_name) && saw_regular_headers_) {
     128             :       // If any regular header appears before pseudo headers, the request or response is malformed.
     129           2 :       return Http::HeaderUtility::HeaderValidationResult::REJECT;
     130           2 :     }
     131          18 :     if (!Http::HeaderUtility::isPseudoHeader(header_name)) {
     132          12 :       saw_regular_headers_ = true;
     133          12 :     }
     134          18 :     return Http::HeaderUtility::HeaderValidationResult::ACCEPT;
     135          20 :   }
     136             : 
     137           0 :   absl::string_view responseDetails() override { return details_; }
     138             : 
     139         420 :   const StreamInfo::BytesMeterSharedPtr& bytesMeter() override { return bytes_meter_; }
     140             : 
     141             : protected:
     142             :   virtual void switchStreamBlockState() PURE;
     143             : 
     144             :   // Needed for ENVOY_STREAM_LOG.
     145             :   virtual uint32_t streamId() PURE;
     146             :   virtual Network::Connection* connection() PURE;
     147             :   // Either reset the stream or close the connection according to
     148             :   // should_close_connection and configured http3 options.
     149             :   virtual void
     150             :   onStreamError(absl::optional<bool> should_close_connection,
     151             :                 quic::QuicRstStreamErrorCode rst = quic::QUIC_BAD_APPLICATION_PAYLOAD) PURE;
     152             : 
     153             :   // TODO(danzh) remove this once QUICHE enforces content-length consistency.
     154           0 :   void updateReceivedContentBytes(size_t payload_length, bool end_stream) {
     155           0 :     received_content_bytes_ += payload_length;
     156           0 :     if (!content_length_.has_value()) {
     157           0 :       return;
     158           0 :     }
     159           0 :     if (received_content_bytes_ > content_length_.value() ||
     160           0 :         (end_stream && received_content_bytes_ != content_length_.value() &&
     161           0 :          !(got_304_response_ && received_content_bytes_ == 0) && !(sent_head_request_))) {
     162           0 :       details_ = Http3ResponseCodeDetailValues::inconsistent_content_length;
     163             :       // Reset instead of closing the connection to align with nghttp2.
     164           0 :       onStreamError(false);
     165           0 :     }
     166           0 :   }
     167             : 
     168         375 :   StreamInfo::BytesMeterSharedPtr& mutableBytesMeter() { return bytes_meter_; }
     169             : 
     170             :   // True once end of stream is propagated to Envoy. Envoy doesn't expect to be
     171             :   // notified more than once about end of stream. So once this is true, no need
     172             :   // to set it in the callback to Envoy stream any more.
     173             :   bool end_stream_decoded_{false};
     174             :   // The latest state a QUIC stream blockage state change callback should look at. As
     175             :   // more readDisable() calls may happen between the callback is posted and it's
     176             :   // executed, the stream might be unblocked and blocked several times. If this
     177             :   // counter is 0, the callback should unblock the stream. Otherwise it should
     178             :   // block the stream.
     179             :   uint32_t read_disable_counter_{0u};
     180             : 
     181             :   Http::Http3::CodecStats& stats_;
     182             :   const envoy::config::core::v3::Http3ProtocolOptions& http3_options_;
     183             :   bool close_connection_upon_invalid_header_{false};
     184             :   absl::string_view details_;
     185             :   // TODO(kbaichoo): bind the account to the QUIC buffers to enable tracking of
     186             :   // memory allocated within QUIC buffers.
     187             :   Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_ = nullptr;
     188             :   bool got_304_response_{false};
     189             :   bool sent_head_request_{false};
     190             :   // True if a regular (non-pseudo) HTTP header has been seen before.
     191             :   bool saw_regular_headers_{false};
     192             : 
     193             : private:
     194             :   // Keeps track of bytes buffered in the stream send buffer in QUICHE and reacts
     195             :   // upon crossing high and low watermarks.
     196             :   // Its high watermark is also the buffer limit of stream read/write filters in
     197             :   // HCM.
     198             :   // There is no receive buffer simulation because Quic stream's
     199             :   // OnBodyDataAvailable() hands all the ready-to-use request data from stream sequencer to HCM
     200             :   // directly and buffers them in filters if needed. Itself doesn't buffer request data.
     201             :   EnvoyQuicSimulatedWatermarkBuffer send_buffer_simulation_;
     202             : 
     203             :   QuicFilterManagerConnectionImpl& filter_manager_connection_;
     204             :   // Used to block or unblock stream in the next event loop. QUICHE doesn't like stream blockage
     205             :   // state change in its own call stack. And Envoy upstream doesn't like quic stream to be unblocked
     206             :   // in its callstack either because the stream will push data right away.
     207             :   Event::SchedulableCallbackPtr async_stream_blockage_change_;
     208             : 
     209             :   StreamInfo::BytesMeterSharedPtr bytes_meter_{std::make_shared<StreamInfo::BytesMeter>()};
     210             :   absl::optional<size_t> content_length_;
     211             :   size_t received_content_bytes_{0};
     212             :   http2::adapter::HeaderValidator header_validator_;
     213             : };
     214             : 
     215             : // Object used for updating a BytesMeter to track bytes sent on a QuicStream since this object was
     216             : // constructed.
     217             : class IncrementalBytesSentTracker {
     218             : public:
     219             :   IncrementalBytesSentTracker(const quic::QuicStream& stream, StreamInfo::BytesMeter& bytes_meter,
     220             :                               bool update_header_bytes)
     221             :       : stream_(stream), bytes_meter_(bytes_meter), update_header_bytes_(update_header_bytes),
     222           0 :         initial_bytes_sent_(totalStreamBytesWritten()) {}
     223             : 
     224           0 :   ~IncrementalBytesSentTracker() {
     225           0 :     if (update_header_bytes_) {
     226           0 :       bytes_meter_.addHeaderBytesSent(incrementalBytesWritten());
     227           0 :     }
     228           0 :     bytes_meter_.addWireBytesSent(incrementalBytesWritten());
     229           0 :   }
     230             : 
     231             : private:
     232             :   // Returns the number of newly sent bytes since the tracker was constructed.
     233           0 :   uint64_t incrementalBytesWritten() {
     234           0 :     ASSERT(totalStreamBytesWritten() >= initial_bytes_sent_);
     235           0 :     return totalStreamBytesWritten() - initial_bytes_sent_;
     236           0 :   }
     237             : 
     238             :   // Returns total number of stream bytes written, including buffered bytes.
     239           0 :   uint64_t totalStreamBytesWritten() const {
     240           0 :     return stream_.stream_bytes_written() + stream_.BufferedDataBytes();
     241           0 :   }
     242             : 
     243             :   const quic::QuicStream& stream_;
     244             :   StreamInfo::BytesMeter& bytes_meter_;
     245             :   bool update_header_bytes_;
     246             :   uint64_t initial_bytes_sent_;
     247             : };
     248             : 
     249             : } // namespace Quic
     250             : } // namespace Envoy

Generated by: LCOV version 1.15