1
#pragma once
2

            
3
#include <cstdint>
4
#include <memory>
5

            
6
#include "envoy/buffer/buffer.h"
7
#include "envoy/config/core/v3/protocol.pb.h"
8
#include "envoy/event/dispatcher.h"
9
#include "envoy/http/codec.h"
10

            
11
#include "source/common/http/codec_helper.h"
12
#include "source/common/quic/envoy_quic_simulated_watermark_buffer.h"
13
#include "source/common/quic/envoy_quic_utils.h"
14

            
15
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
16
#include "source/common/quic/http_datagram_handler.h"
17
#endif
18
#include "source/common/quic/quic_filter_manager_connection_impl.h"
19
#include "source/common/quic/quic_stats_gatherer.h"
20
#include "source/common/quic/send_buffer_monitor.h"
21

            
22
#include "quiche/http2/adapter/header_validator.h"
23
#include "quiche/quic/core/http/quic_spdy_stream.h"
24

            
25
namespace Envoy {
26
namespace Quic {
27

            
28
// Base class for EnvoyQuicServer|ClientStream.
29
class EnvoyQuicStream : public virtual Http::StreamEncoder,
30
                        public Http::MultiplexedStreamImplBase,
31
                        public SendBufferMonitor,
32
                        public HeaderValidator,
33
                        protected Logger::Loggable<Logger::Id::quic_stream> {
34
public:
35
  // |buffer_limit| is the high watermark of the stream send buffer, and the low
36
  // watermark will be half of it.
37
  EnvoyQuicStream(quic::QuicSpdyStream& quic_stream, quic::QuicSession& quic_session,
38
                  uint32_t buffer_limit, QuicFilterManagerConnectionImpl& filter_manager_connection,
39
                  std::function<void()> below_low_watermark,
40
                  std::function<void()> above_high_watermark, Http::Http3::CodecStats& stats,
41
                  const envoy::config::core::v3::Http3ProtocolOptions& http3_options)
42
8069
      : Http::MultiplexedStreamImplBase(filter_manager_connection.dispatcher()), stats_(stats),
43
8069
        http3_options_(http3_options), quic_stream_(quic_stream), quic_session_(quic_session),
44
8069
        send_buffer_simulation_(buffer_limit / 2, buffer_limit, std::move(below_low_watermark),
45
8069
                                std::move(above_high_watermark), ENVOY_LOGGER()),
46
8069
        filter_manager_connection_(filter_manager_connection),
47
        async_stream_blockage_change_(
48
8069
            filter_manager_connection.dispatcher().createSchedulableCallback(
49
9816
                [this]() { switchStreamBlockState(); })) {
50
8069
    if (http3_options_.disable_connection_flow_control_for_streams()) {
51
1
      quic_stream_.DisableConnectionFlowControlForThisStream();
52
1
    }
53
8069
  }
54

            
55
8069
  ~EnvoyQuicStream() override = default;
56

            
57
  // Http::StreamEncoder
58
74419
  Stream& getStream() override { return *this; }
59
  void encodeData(Buffer::Instance& data, bool end_stream) override;
60
  void encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) override;
61

            
62
  // Http::Stream
63
28807
  void readDisable(bool disable) override {
64
28807
    bool status_changed{false};
65
28807
    if (disable) {
66
14426
      ++read_disable_counter_;
67
14426
      if (read_disable_counter_ == 1) {
68
1512
        status_changed = true;
69
1512
      }
70
14426
    } else {
71
14381
      ASSERT(read_disable_counter_ > 0);
72
14381
      --read_disable_counter_;
73
14381
      if (read_disable_counter_ == 0) {
74
1467
        status_changed = true;
75
1467
      }
76
14381
    }
77

            
78
28807
    if (!status_changed) {
79
25828
      return;
80
25828
    }
81

            
82
    // If the status transiently changed from unblocked to blocked and then unblocked, the quic
83
    // stream will be spuriously unblocked and call OnDataAvailable(). This call shouldn't take any
84
    // effect because any available data should have been processed already upon arrival or they
85
    // were blocked by some condition other than flow control, i.e. Qpack decoding.
86
2979
    async_stream_blockage_change_->scheduleCallbackNextIteration();
87
2979
  }
88

            
89
12033
  void addCallbacks(Http::StreamCallbacks& callbacks) override {
90
12033
    ASSERT(!local_end_stream_);
91
12033
    addCallbacksHelper(callbacks);
92
12033
  }
93
5227
  void removeCallbacks(Http::StreamCallbacks& callbacks) override {
94
5227
    removeCallbacksHelper(callbacks);
95
5227
  }
96
2527
  uint32_t bufferLimit() const override { return send_buffer_simulation_.highWatermark(); }
97
1416
  const Network::ConnectionInfoProvider& connectionInfoProvider() override {
98
1416
    return connection()->connectionInfoProvider();
99
1416
  }
100

            
101
2527
  Buffer::BufferMemoryAccountSharedPtr account() const override { return buffer_memory_account_; }
102

            
103
3868
  void setAccount(Buffer::BufferMemoryAccountSharedPtr account) override {
104
3868
    buffer_memory_account_ = account;
105
3868
  }
106

            
107
  // SendBufferMonitor
108
71335
  void updateBytesBuffered(uint64_t old_buffered_bytes, uint64_t new_buffered_bytes) override {
109
71335
    if (new_buffered_bytes == old_buffered_bytes) {
110
15093
      return;
111
15093
    }
112
    // If buffered bytes changed, update stream and session's watermark book
113
    // keeping.
114
56242
    if (new_buffered_bytes > old_buffered_bytes) {
115
6395
      send_buffer_simulation_.checkHighWatermark(new_buffered_bytes);
116
50490
    } else {
117
49847
      send_buffer_simulation_.checkLowWatermark(new_buffered_bytes);
118
49847
      ENVOY_BUG(
119
49847
          old_buffered_bytes - new_buffered_bytes <= reported_buffered_bytes_,
120
49847
          fmt::format("Quic stream {} previously reported {} bytes buffered to connection, which "
121
49847
                      "is insufficient to be subtracted from for the current drain of {} bytes.",
122
49847
                      quic_stream_.id(), reported_buffered_bytes_,
123
49847
                      (old_buffered_bytes - new_buffered_bytes)));
124
49847
    }
125
    // This value can momentarily be inconsistent with new_buffered_bytes when
126
    // the buffer goes below low watermark and triggers a write in the
127
    // onBelowWriteBufferLowWatermark() callstack. In this case, any buffered data from the nested
128
    // write will increase reported_buffered_bytes_ and the connection level bookkeeping before the
129
    // reduction of the value in the nesting call to be reported.
130
56242
    reported_buffered_bytes_ += (new_buffered_bytes - old_buffered_bytes);
131
56242
    filter_manager_connection_.updateBytesBuffered(old_buffered_bytes, new_buffered_bytes);
132
56242
  }
133

            
134
  Http::HeaderUtility::HeaderValidationResult
135
188648
  validateHeader(absl::string_view header_name, absl::string_view header_value) override {
136
188648
    bool override_stream_error_on_invalid_http_message =
137
188648
        http3_options_.override_stream_error_on_invalid_http_message().value();
138
188648
    if (header_validator_.ValidateSingleHeader(header_name, header_value) !=
139
188648
        http2::adapter::HeaderValidator::HEADER_OK) {
140
654
      close_connection_upon_invalid_header_ = !override_stream_error_on_invalid_http_message;
141
654
      return Http::HeaderUtility::HeaderValidationResult::REJECT;
142
654
    }
143
187994
    if (header_name == "content-length") {
144
508
      size_t content_length = 0;
145
508
      Http::HeaderUtility::HeaderValidationResult result =
146
508
          Http::HeaderUtility::validateContentLength(
147
508
              header_value, override_stream_error_on_invalid_http_message,
148
508
              close_connection_upon_invalid_header_, content_length);
149
508
      content_length_ = content_length;
150
508
      return result;
151
508
    }
152
187486
    ASSERT(!header_name.empty());
153
187486
    if (Http::HeaderUtility::isPseudoHeader(header_name) && saw_regular_headers_) {
154
      // If any regular header appears before pseudo headers, the request or response is malformed.
155
1
      return Http::HeaderUtility::HeaderValidationResult::REJECT;
156
1
    }
157
187485
    if (!Http::HeaderUtility::isPseudoHeader(header_name)) {
158
169982
      saw_regular_headers_ = true;
159
169982
    }
160
187485
    return Http::HeaderUtility::HeaderValidationResult::ACCEPT;
161
187486
  }
162

            
163
7096
  void startHeaderBlock() override {
164
7096
    if (!Runtime::runtimeFeatureEnabled("envoy.restart_features.validate_http3_pseudo_headers")) {
165
2
      return;
166
2
    }
167
7094
    header_validator_.StartHeaderBlock();
168
7094
  }
169

            
170
6434
  bool finishHeaderBlock(bool is_trailing_headers) override {
171
6434
    if (!Runtime::runtimeFeatureEnabled("envoy.restart_features.validate_http3_pseudo_headers")) {
172
2
      return true;
173
2
    }
174
6432
    if (is_trailing_headers) {
175
169
      return header_validator_.FinishHeaderBlock(quic_session_.perspective() ==
176
169
                                                         quic::Perspective::IS_CLIENT
177
169
                                                     ? http2::adapter::HeaderType::RESPONSE_TRAILER
178
169
                                                     : http2::adapter::HeaderType::REQUEST_TRAILER);
179
169
    }
180
6263
    return header_validator_.FinishHeaderBlock(quic_session_.perspective() ==
181
6263
                                                       quic::Perspective::IS_CLIENT
182
6263
                                                   ? http2::adapter::HeaderType::RESPONSE
183
6263
                                                   : http2::adapter::HeaderType::REQUEST);
184
6432
  }
185

            
186
1002
  absl::string_view responseDetails() override { return details_; }
187

            
188
789746
  const StreamInfo::BytesMeterSharedPtr& bytesMeter() override { return bytes_meter_; }
189

            
190
2
  QuicStatsGatherer* statsGatherer() { return stats_gatherer_.get(); }
191

            
192
protected:
193
  virtual void switchStreamBlockState() PURE;
194

            
195
  // Needed for ENVOY_STREAM_LOG.
196
  virtual uint32_t streamId() PURE;
197
  virtual Network::Connection* connection() PURE;
198
  // Either reset the stream or close the connection according to
199
  // should_close_connection and configured http3 options.
200
  virtual void
201
  onStreamError(absl::optional<bool> should_close_connection,
202
                quic::QuicRstStreamErrorCode rst = quic::QUIC_BAD_APPLICATION_PAYLOAD) PURE;
203

            
204
  // TODO(danzh) remove this once QUICHE enforces content-length consistency.
205
473384
  void updateReceivedContentBytes(size_t payload_length, bool end_stream) {
206
473384
    received_content_bytes_ += payload_length;
207
473384
    if (!content_length_.has_value()) {
208
419550
      return;
209
419550
    }
210
53834
    if (received_content_bytes_ > content_length_.value() ||
211
53834
        (end_stream && received_content_bytes_ != content_length_.value() &&
212
53830
         !(got_304_response_ && received_content_bytes_ == 0) && !(sent_head_request_))) {
213
6
      details_ = Http3ResponseCodeDetailValues::inconsistent_content_length;
214
      // Reset instead of closing the connection to align with nghttp2.
215
6
      onStreamError(false);
216
6
    }
217
53834
  }
218

            
219
266795
  StreamInfo::BytesMeterSharedPtr& mutableBytesMeter() { return bytes_meter_; }
220

            
221
7193
  void addDecompressedHeaderBytesSent(const quiche::HttpHeaderBlock& headers) {
222
7193
    bytes_meter_->addDecompressedHeaderBytesSent(headers.TotalBytesUsed());
223
7193
  }
224

            
225
7133
  void addDecompressedHeaderBytesReceived(const quic::QuicHeaderList& header_list) {
226
7133
    bytes_meter_->addDecompressedHeaderBytesReceived(header_list.uncompressed_header_bytes());
227
7133
  }
228

            
229
  void encodeTrailersImpl(quiche::HttpHeaderBlock&& trailers);
230

            
231
  // Converts `header_list` into a new `Http::MetadataMap`.
232
  std::unique_ptr<Http::MetadataMap>
233
  metadataMapFromHeaderList(const quic::QuicHeaderList& header_list);
234

            
235
  // Returns true if the cumulative limit on METADATA headers has been reached
236
  // after adding `bytes`.
237
1010
  bool mustRejectMetadata(size_t bytes) {
238
1010
    received_metadata_bytes_ += bytes;
239
1010
    return received_metadata_bytes_ > 1 << 20;
240
1010
  }
241

            
242
  std::string quicStreamState();
243

            
244
1653
  http2::adapter::HeaderValidator& header_validator() { return header_validator_; }
245

            
246
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
247
  // Setting |http_datagram_handler_| enables HTTP Datagram support.
248
  std::unique_ptr<HttpDatagramHandler> http_datagram_handler_;
249
#endif
250
  quiche::QuicheReferenceCountedPointer<QuicStatsGatherer> stats_gatherer_;
251

            
252
  // True once end of stream is propagated to Envoy. Envoy doesn't expect to be
253
  // notified more than once about end of stream. So once this is true, no need
254
  // to set it in the callback to Envoy stream any more.
255
  bool end_stream_decoded_{false};
256
  // The latest state a QUIC stream blockage state change callback should look at. As
257
  // more readDisable() calls may happen between the callback is posted and it's
258
  // executed, the stream might be unblocked and blocked several times. If this
259
  // counter is 0, the callback should unblock the stream. Otherwise it should
260
  // block the stream.
261
  uint32_t read_disable_counter_{0u};
262

            
263
  Http::Http3::CodecStats& stats_;
264
  const envoy::config::core::v3::Http3ProtocolOptions& http3_options_;
265
  bool close_connection_upon_invalid_header_{false};
266
  absl::string_view details_;
267
  // TODO(kbaichoo): bind the account to the QUIC buffers to enable tracking of
268
  // memory allocated within QUIC buffers.
269
  Buffer::BufferMemoryAccountSharedPtr buffer_memory_account_ = nullptr;
270
  bool got_304_response_{false};
271
  bool sent_head_request_{false};
272
  // True if a regular (non-pseudo) HTTP header has been seen before.
273
  bool saw_regular_headers_{false};
274

            
275
private:
276
  // QUIC stream and session that this EnvoyQuicStream wraps.
277
  quic::QuicSpdyStream& quic_stream_;
278
  quic::QuicSession& quic_session_;
279

            
280
  // Keeps track of bytes buffered in the stream send buffer in QUICHE and reacts
281
  // upon crossing high and low watermarks.
282
  // Its high watermark is also the buffer limit of stream read/write filters in
283
  // HCM.
284
  // There is no receive buffer simulation because Quic stream's
285
  // OnBodyDataAvailable() hands all the ready-to-use request data from stream sequencer to HCM
286
  // directly and buffers them in filters if needed. Itself doesn't buffer request data.
287
  EnvoyQuicSimulatedWatermarkBuffer send_buffer_simulation_;
288

            
289
  QuicFilterManagerConnectionImpl& filter_manager_connection_;
290
  // Used to block or unblock stream in the next event loop. QUICHE doesn't like stream blockage
291
  // state change in its own call stack. And Envoy upstream doesn't like quic stream to be unblocked
292
  // in its callstack either because the stream will push data right away.
293
  Event::SchedulableCallbackPtr async_stream_blockage_change_;
294

            
295
  StreamInfo::BytesMeterSharedPtr bytes_meter_{std::make_shared<StreamInfo::BytesMeter>()};
296
  absl::optional<size_t> content_length_;
297
  size_t received_content_bytes_{0};
298
  http2::adapter::HeaderValidator header_validator_;
299
  size_t received_metadata_bytes_{0};
300
  // Track the buffered bytes reported to connection in the
301
  // most recent call of updateBytesBuffered().
302
  uint64_t reported_buffered_bytes_{0u};
303
};
304

            
305
// Object used for updating a BytesMeter to track bytes sent on a QuicStream since this object was
306
// constructed.
307
class IncrementalBytesSentTracker {
308
public:
309
  IncrementalBytesSentTracker(const quic::QuicStream& stream, StreamInfo::BytesMeter& bytes_meter,
310
                              bool update_header_bytes)
311
21243
      : stream_(stream), bytes_meter_(bytes_meter), update_header_bytes_(update_header_bytes),
312
21243
        initial_bytes_sent_(totalStreamBytesWritten()) {}
313

            
314
21243
  ~IncrementalBytesSentTracker() {
315
21243
    if (update_header_bytes_) {
316
7191
      bytes_meter_.addHeaderBytesSent(incrementalBytesWritten());
317
7191
    }
318
21243
    bytes_meter_.addWireBytesSent(incrementalBytesWritten());
319
21243
  }
320

            
321
private:
322
  // Returns the number of newly sent bytes since the tracker was constructed.
323
28434
  uint64_t incrementalBytesWritten() {
324
28434
    ASSERT(totalStreamBytesWritten() >= initial_bytes_sent_);
325
28434
    return totalStreamBytesWritten() - initial_bytes_sent_;
326
28434
  }
327

            
328
  // Returns total number of stream bytes written, including buffered bytes.
329
49677
  uint64_t totalStreamBytesWritten() const {
330
49677
    return stream_.stream_bytes_written() + stream_.BufferedDataBytes();
331
49677
  }
332

            
333
  const quic::QuicStream& stream_;
334
  StreamInfo::BytesMeter& bytes_meter_;
335
  bool update_header_bytes_;
336
  uint64_t initial_bytes_sent_;
337
};
338

            
339
} // namespace Quic
340
} // namespace Envoy