Line data Source code
1 : #pragma once 2 : 3 : #include <memory> 4 : 5 : #include "envoy/buffer/buffer.h" 6 : #include "envoy/config/core/v3/protocol.pb.h" 7 : #include "envoy/event/dispatcher.h" 8 : #include "envoy/http/codec.h" 9 : 10 : #include "source/common/http/codec_helper.h" 11 : #include "source/common/quic/envoy_quic_simulated_watermark_buffer.h" 12 : #include "source/common/quic/envoy_quic_utils.h" 13 : #include "source/common/quic/quic_filter_manager_connection_impl.h" 14 : #include "source/common/quic/send_buffer_monitor.h" 15 : 16 : #include "quiche/http2/adapter/header_validator.h" 17 : 18 : namespace Envoy { 19 : namespace Quic { 20 : 21 : // Base class for EnvoyQuicServer|ClientStream. 22 : class EnvoyQuicStream : public virtual Http::StreamEncoder, 23 : public Http::MultiplexedStreamImplBase, 24 : public SendBufferMonitor, 25 : public HeaderValidator, 26 : protected Logger::Loggable<Logger::Id::quic_stream> { 27 : public: 28 : // |buffer_limit| is the high watermark of the stream send buffer, and the low 29 : // watermark will be half of it. 30 : EnvoyQuicStream(uint32_t buffer_limit, QuicFilterManagerConnectionImpl& filter_manager_connection, 31 : std::function<void()> below_low_watermark, 32 : std::function<void()> above_high_watermark, Http::Http3::CodecStats& stats, 33 : const envoy::config::core::v3::Http3ProtocolOptions& http3_options) 34 : : Http::MultiplexedStreamImplBase(filter_manager_connection.dispatcher()), stats_(stats), 35 : http3_options_(http3_options), 36 : send_buffer_simulation_(buffer_limit / 2, buffer_limit, std::move(below_low_watermark), 37 : std::move(above_high_watermark), ENVOY_LOGGER()), 38 : filter_manager_connection_(filter_manager_connection), 39 : async_stream_blockage_change_( 40 : filter_manager_connection.dispatcher().createSchedulableCallback( 41 232 : [this]() { switchStreamBlockState(); })) {} 42 : 43 232 : ~EnvoyQuicStream() override = default; 44 : 45 : // Http::StreamEncoder 46 0 : Stream& getStream() override { return *this; } 47 : 48 : // Http::Stream 49 0 : void readDisable(bool disable) override { 50 0 : bool status_changed{false}; 51 0 : if (disable) { 52 0 : ++read_disable_counter_; 53 0 : if (read_disable_counter_ == 1) { 54 0 : status_changed = true; 55 0 : } 56 0 : } else { 57 0 : ASSERT(read_disable_counter_ > 0); 58 0 : --read_disable_counter_; 59 0 : if (read_disable_counter_ == 0) { 60 0 : status_changed = true; 61 0 : } 62 0 : } 63 : 64 0 : if (!status_changed) { 65 0 : return; 66 0 : } 67 : 68 : // If the status transiently changed from unblocked to blocked and then unblocked, the quic 69 : // stream will be spuriously unblocked and call OnDataAvailable(). This call shouldn't take any 70 : // effect because any available data should have been processed already upon arrival or they 71 : // were blocked by some condition other than flow control, i.e. Qpack decoding. 72 0 : async_stream_blockage_change_->scheduleCallbackNextIteration(); 73 0 : } 74 : 75 0 : void addCallbacks(Http::StreamCallbacks& callbacks) override { 76 0 : ASSERT(!local_end_stream_); 77 0 : addCallbacksHelper(callbacks); 78 0 : } 79 0 : void removeCallbacks(Http::StreamCallbacks& callbacks) override { 80 0 : removeCallbacksHelper(callbacks); 81 0 : } 82 0 : uint32_t bufferLimit() const override { return send_buffer_simulation_.highWatermark(); } 83 0 : const Network::ConnectionInfoProvider& connectionInfoProvider() override { 84 0 : return connection()->connectionInfoProvider(); 85 0 : } 86 : 87 0 : Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; } 88 : 89 0 : void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override { 90 0 : buffer_memory_account_ = account; 91 0 : } 92 : 93 : // SendBufferMonitor 94 0 : void updateBytesBuffered(uint64_t old_buffered_bytes, uint64_t new_buffered_bytes) override { 95 0 : if (new_buffered_bytes == old_buffered_bytes) { 96 0 : return; 97 0 : } 98 : // If buffered bytes changed, update stream and session's watermark book 99 : // keeping. 100 0 : if (new_buffered_bytes > old_buffered_bytes) { 101 0 : send_buffer_simulation_.checkHighWatermark(new_buffered_bytes); 102 0 : } else { 103 0 : send_buffer_simulation_.checkLowWatermark(new_buffered_bytes); 104 0 : } 105 0 : filter_manager_connection_.updateBytesBuffered(old_buffered_bytes, new_buffered_bytes); 106 0 : } 107 : 108 : Http::HeaderUtility::HeaderValidationResult 109 20 : validateHeader(absl::string_view header_name, absl::string_view header_value) override { 110 20 : bool override_stream_error_on_invalid_http_message = 111 20 : http3_options_.override_stream_error_on_invalid_http_message().value(); 112 20 : if (header_validator_.ValidateSingleHeader(header_name, header_value) != 113 20 : http2::adapter::HeaderValidator::HEADER_OK) { 114 0 : close_connection_upon_invalid_header_ = !override_stream_error_on_invalid_http_message; 115 0 : return Http::HeaderUtility::HeaderValidationResult::REJECT; 116 0 : } 117 20 : if (header_name == "content-length") { 118 0 : size_t content_length = 0; 119 0 : Http::HeaderUtility::HeaderValidationResult result = 120 0 : Http::HeaderUtility::validateContentLength( 121 0 : header_value, override_stream_error_on_invalid_http_message, 122 0 : close_connection_upon_invalid_header_, content_length); 123 0 : content_length_ = content_length; 124 0 : return result; 125 0 : } 126 20 : ASSERT(!header_name.empty()); 127 20 : if (Http::HeaderUtility::isPseudoHeader(header_name) && saw_regular_headers_) { 128 : // If any regular header appears before pseudo headers, the request or response is malformed. 129 2 : return Http::HeaderUtility::HeaderValidationResult::REJECT; 130 2 : } 131 18 : if (!Http::HeaderUtility::isPseudoHeader(header_name)) { 132 12 : saw_regular_headers_ = true; 133 12 : } 134 18 : return Http::HeaderUtility::HeaderValidationResult::ACCEPT; 135 20 : } 136 : 137 0 : absl::string_view responseDetails() override { return details_; } 138 : 139 420 : const StreamInfo::BytesMeterSharedPtr& bytesMeter() override { return bytes_meter_; } 140 : 141 : protected: 142 : virtual void switchStreamBlockState() PURE; 143 : 144 : // Needed for ENVOY_STREAM_LOG. 145 : virtual uint32_t streamId() PURE; 146 : virtual Network::Connection* connection() PURE; 147 : // Either reset the stream or close the connection according to 148 : // should_close_connection and configured http3 options. 149 : virtual void 150 : onStreamError(absl::optional<bool> should_close_connection, 151 : quic::QuicRstStreamErrorCode rst = quic::QUIC_BAD_APPLICATION_PAYLOAD) PURE; 152 : 153 : // TODO(danzh) remove this once QUICHE enforces content-length consistency. 154 0 : void updateReceivedContentBytes(size_t payload_length, bool end_stream) { 155 0 : received_content_bytes_ += payload_length; 156 0 : if (!content_length_.has_value()) { 157 0 : return; 158 0 : } 159 0 : if (received_content_bytes_ > content_length_.value() || 160 0 : (end_stream && received_content_bytes_ != content_length_.value() && 161 0 : !(got_304_response_ && received_content_bytes_ == 0) && !(sent_head_request_))) { 162 0 : details_ = Http3ResponseCodeDetailValues::inconsistent_content_length; 163 : // Reset instead of closing the connection to align with nghttp2. 164 0 : onStreamError(false); 165 0 : } 166 0 : } 167 : 168 375 : StreamInfo::BytesMeterSharedPtr& mutableBytesMeter() { return bytes_meter_; } 169 : 170 : // True once end of stream is propagated to Envoy. Envoy doesn't expect to be 171 : // notified more than once about end of stream. So once this is true, no need 172 : // to set it in the callback to Envoy stream any more. 173 : bool end_stream_decoded_{false}; 174 : // The latest state a QUIC stream blockage state change callback should look at. As 175 : // more readDisable() calls may happen between the callback is posted and it's 176 : // executed, the stream might be unblocked and blocked several times. If this 177 : // counter is 0, the callback should unblock the stream. Otherwise it should 178 : // block the stream. 179 : uint32_t read_disable_counter_{0u}; 180 : 181 : Http::Http3::CodecStats& stats_; 182 : const envoy::config::core::v3::Http3ProtocolOptions& http3_options_; 183 : bool close_connection_upon_invalid_header_{false}; 184 : absl::string_view details_; 185 : // TODO(kbaichoo): bind the account to the QUIC buffers to enable tracking of 186 : // memory allocated within QUIC buffers. 187 : Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_ = nullptr; 188 : bool got_304_response_{false}; 189 : bool sent_head_request_{false}; 190 : // True if a regular (non-pseudo) HTTP header has been seen before. 191 : bool saw_regular_headers_{false}; 192 : 193 : private: 194 : // Keeps track of bytes buffered in the stream send buffer in QUICHE and reacts 195 : // upon crossing high and low watermarks. 196 : // Its high watermark is also the buffer limit of stream read/write filters in 197 : // HCM. 198 : // There is no receive buffer simulation because Quic stream's 199 : // OnBodyDataAvailable() hands all the ready-to-use request data from stream sequencer to HCM 200 : // directly and buffers them in filters if needed. Itself doesn't buffer request data. 201 : EnvoyQuicSimulatedWatermarkBuffer send_buffer_simulation_; 202 : 203 : QuicFilterManagerConnectionImpl& filter_manager_connection_; 204 : // Used to block or unblock stream in the next event loop. QUICHE doesn't like stream blockage 205 : // state change in its own call stack. And Envoy upstream doesn't like quic stream to be unblocked 206 : // in its callstack either because the stream will push data right away. 207 : Event::SchedulableCallbackPtr async_stream_blockage_change_; 208 : 209 : StreamInfo::BytesMeterSharedPtr bytes_meter_{std::make_shared<StreamInfo::BytesMeter>()}; 210 : absl::optional<size_t> content_length_; 211 : size_t received_content_bytes_{0}; 212 : http2::adapter::HeaderValidator header_validator_; 213 : }; 214 : 215 : // Object used for updating a BytesMeter to track bytes sent on a QuicStream since this object was 216 : // constructed. 217 : class IncrementalBytesSentTracker { 218 : public: 219 : IncrementalBytesSentTracker(const quic::QuicStream& stream, StreamInfo::BytesMeter& bytes_meter, 220 : bool update_header_bytes) 221 : : stream_(stream), bytes_meter_(bytes_meter), update_header_bytes_(update_header_bytes), 222 0 : initial_bytes_sent_(totalStreamBytesWritten()) {} 223 : 224 0 : ~IncrementalBytesSentTracker() { 225 0 : if (update_header_bytes_) { 226 0 : bytes_meter_.addHeaderBytesSent(incrementalBytesWritten()); 227 0 : } 228 0 : bytes_meter_.addWireBytesSent(incrementalBytesWritten()); 229 0 : } 230 : 231 : private: 232 : // Returns the number of newly sent bytes since the tracker was constructed. 233 0 : uint64_t incrementalBytesWritten() { 234 0 : ASSERT(totalStreamBytesWritten() >= initial_bytes_sent_); 235 0 : return totalStreamBytesWritten() - initial_bytes_sent_; 236 0 : } 237 : 238 : // Returns total number of stream bytes written, including buffered bytes. 239 0 : uint64_t totalStreamBytesWritten() const { 240 0 : return stream_.stream_bytes_written() + stream_.BufferedDataBytes(); 241 0 : } 242 : 243 : const quic::QuicStream& stream_; 244 : StreamInfo::BytesMeter& bytes_meter_; 245 : bool update_header_bytes_; 246 : uint64_t initial_bytes_sent_; 247 : }; 248 : 249 : } // namespace Quic 250 : } // namespace Envoy