1
#include "source/common/quic/envoy_quic_server_stream.h"
2

            
3
#include <openssl/bio.h>
4
#include <openssl/evp.h>
5

            
6
#include <memory>
7
#include <utility>
8

            
9
#include "source/common/buffer/buffer_impl.h"
10
#include "source/common/common/assert.h"
11
#include "source/common/http/header_map_impl.h"
12
#include "source/common/http/header_utility.h"
13
#include "source/common/http/utility.h"
14
#include "source/common/quic/envoy_quic_server_session.h"
15
#include "source/common/quic/envoy_quic_utils.h"
16
#include "source/common/quic/quic_stats_gatherer.h"
17
#include "source/common/runtime/runtime_features.h"
18

            
19
#include "quiche/common/http/http_header_block.h"
20
#include "quiche/quic/core/http/quic_header_list.h"
21
#include "quiche/quic/core/quic_session.h"
22
#include "quiche/quic/core/quic_types.h"
23
#include "quiche_platform_impl/quiche_mem_slice_impl.h"
24

            
25
namespace Envoy {
26
namespace Quic {
27

            
28
EnvoyQuicServerStream::EnvoyQuicServerStream(
29
    quic::QuicStreamId id, quic::QuicSpdySession* session, quic::StreamType type,
30
    Http::Http3::CodecStats& stats,
31
    const envoy::config::core::v3::Http3ProtocolOptions& http3_options,
32
    envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
33
        headers_with_underscores_action)
34
4011
    : quic::QuicSpdyServerStreamBase(id, session, type),
35
4011
      EnvoyQuicStream(
36
4011
          *this, *session,
37
          // Flow control receive window should be larger than 8k to fully utilize congestion
38
          // control window before it reaches the high watermark.
39
4011
          static_cast<uint32_t>(GetReceiveWindow().value()), *filterManagerConnection(),
40
4085
          [this]() { runLowWatermarkCallbacks(); }, [this]() { runHighWatermarkCallbacks(); },
41
4011
          stats, http3_options),
42
4011
      headers_with_underscores_action_(headers_with_underscores_action) {
43
4011
  ASSERT(static_cast<uint32_t>(GetReceiveWindow().value()) > 8 * 1024,
44
4011
         "Send buffer limit should be larger than 8KB.");
45

            
46
4011
  stats_gatherer_ = new QuicStatsGatherer(&filterManagerConnection()->dispatcher().timeSource());
47
4011
  set_ack_listener(stats_gatherer_);
48
4011
  RegisterMetadataVisitor(this);
49
4011
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.validate_http3_pseudo_headers") &&
50
4011
      session->allow_extended_connect()) {
51
1653
    header_validator().SetAllowExtendedConnect();
52
1653
  }
53
4011
}
54

            
55
87
void EnvoyQuicServerStream::encode1xxHeaders(const Http::ResponseHeaderMap& headers) {
56
87
  ASSERT(Http::HeaderUtility::isSpecial1xx(headers));
57
87
  encodeHeaders(headers, false);
58
87
}
59

            
60
2973
void EnvoyQuicServerStream::encodeHeaders(const Http::ResponseHeaderMap& headers, bool end_stream) {
61
2973
  ENVOY_STREAM_LOG(debug, "encodeHeaders (end_stream={}) {}.", *this, end_stream, headers);
62
2973
  if (write_side_closed()) {
63
1
    ENVOY_STREAM_LOG(error, "encodeHeaders is called on write-closed stream. {}", *this,
64
1
                     quicStreamState());
65
1
    return;
66
1
  }
67

            
68
  // In Envoy, both upgrade requests and extended CONNECT requests are
69
  // represented as their HTTP/1 forms, regardless of the HTTP version used.
70
  // Therefore, these need to be transformed into their HTTP/3 form, before
71
  // sending them.
72
2972
  const Http::ResponseHeaderMap* header_map = &headers;
73
2972
  std::unique_ptr<Http::ResponseHeaderMapImpl> modified_headers;
74
2972
#ifndef ENVOY_ENABLE_UHV
75
  // Extended CONNECT to H/1 upgrade transformation has moved to UHV
76
2972
  if (Http::Utility::isUpgrade(headers)) {
77
35
    modified_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers);
78
35
    Http::Utility::transformUpgradeResponseFromH1toH3(*modified_headers);
79
35
    header_map = modified_headers.get();
80
35
  }
81
2972
#endif
82
  // This is counting not serialized bytes in the send buffer.
83
2972
  local_end_stream_ = end_stream;
84
2972
  SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
85
2972
  {
86
2972
    IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), true);
87
2972
    quiche::HttpHeaderBlock header_block = envoyHeadersToHttp2HeaderBlock(*header_map);
88
2972
    addDecompressedHeaderBytesSent(header_block);
89
2972
    size_t bytes_sent = WriteHeaders(std::move(header_block), end_stream, nullptr);
90
2972
    stats_gatherer_->addBytesSent(bytes_sent, end_stream);
91
2972
    ENVOY_BUG(bytes_sent != 0, "Failed to encode headers.");
92
2972
  }
93

            
94
2972
  if (local_end_stream_) {
95
1142
    if (codec_callbacks_) {
96
781
      codec_callbacks_->onCodecEncodeComplete();
97
781
    }
98
1142
    onLocalEndStream();
99
1142
  }
100
2972
}
101

            
102
320
void EnvoyQuicServerStream::encodeTrailers(const Http::ResponseTrailerMap& trailers) {
103
320
  if (trailers.empty()) {
104
245
    ENVOY_STREAM_LOG(debug, "skipping submitting empty trailers", *this);
105
    // Instead of submitting empty trailers, we send empty data with end_stream=true instead.
106
245
    Buffer::OwnedImpl empty_buffer;
107
245
    encodeData(empty_buffer, true);
108
245
    return;
109
245
  }
110
75
  ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers);
111
75
  quiche::HttpHeaderBlock trailer_block = envoyHeadersToHttp2HeaderBlock(trailers);
112
75
  addDecompressedHeaderBytesSent(trailer_block);
113
75
  encodeTrailersImpl(std::move(trailer_block));
114
75
}
115

            
116
384
void EnvoyQuicServerStream::resetStream(Http::StreamResetReason reason) {
117
384
  if (buffer_memory_account_) {
118
1
    buffer_memory_account_->clearDownstream();
119
1
  }
120

            
121
384
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
122
384
  if (http_datagram_handler_) {
123
9
    UnregisterHttp3DatagramVisitor();
124
9
  }
125
384
#endif
126

            
127
384
  if (local_end_stream_ && !reading_stopped()) {
128
    // This is after 200 early response. Reset with QUIC_STREAM_NO_ERROR instead
129
    // of propagating original reset reason. In QUICHE if a stream stops reading
130
    // before FIN or RESET received, it resets the steam with QUIC_STREAM_NO_ERROR.
131
133
    StopReading();
132
353
  } else {
133
251
    Reset(envoyResetReasonToQuicRstError(reason));
134
251
  }
135
  // Run reset callbacks once because HCM calls resetStream() without tearing
136
  // down its own ActiveStream. It might be no-op if it has been called already
137
  // in ResetWithError().
138
384
  runResetCallbacks(Http::StreamResetReason::LocalReset, absl::string_view());
139
384
}
140

            
141
456
void EnvoyQuicServerStream::switchStreamBlockState() {
142
  // From when the callback got scheduled till now, readDisable() might have blocked and unblocked
143
  // the stream multiple times, but those actions haven't taken any effect yet, and only the last
144
  // state of read_disable_counter_ determines whether to unblock or block the quic stream.
145
  // Unlike Envoy readDisable() the quic stream gets blocked/unblocked based on the most recent
146
  // call. So a stream will be blocked upon SetBlockedUntilFlush() no matter how many times
147
  // SetUnblocked() was called before, and vice versa.
148
456
  if (read_disable_counter_ > 0) {
149
158
    sequencer()->SetBlockedUntilFlush();
150
298
  } else {
151
298
    sequencer()->SetUnblocked();
152
298
  }
153
456
}
154

            
155
void EnvoyQuicServerStream::OnInitialHeadersComplete(bool fin, size_t frame_len,
156
3971
                                                     const quic::QuicHeaderList& header_list) {
157
3971
  mutableBytesMeter()->addHeaderBytesReceived(frame_len);
158
3971
  addDecompressedHeaderBytesReceived(header_list);
159
  // TODO(danzh) Fix in QUICHE. If the stream has been reset in the call stack,
160
  // OnInitialHeadersComplete() shouldn't be called.
161
3971
  if (read_side_closed()) {
162
4
    return;
163
4
  }
164
3967
  quic::QuicSpdyServerStreamBase::OnInitialHeadersComplete(fin, frame_len, header_list);
165
3967
  if (read_side_closed()) {
166
15
    return;
167
15
  }
168

            
169
3952
  if (!headers_decompressed() || header_list.empty()) {
170
    onStreamError(absl::nullopt);
171
    return;
172
  }
173

            
174
3952
  ENVOY_STREAM_LOG(debug, "Received headers: {}.", *this, header_list.DebugString());
175
3952
  const bool headers_only = fin_received() && highest_received_byte_offset() == NumBytesConsumed();
176
3952
  const bool end_stream =
177
3952
      fin ||
178
3952
      (headers_only && Runtime::runtimeFeatureEnabled("envoy.reloadable_features.quic_signal_"
179
1792
                                                      "headers_only_to_http1_backend"));
180
3952
  ENVOY_STREAM_LOG(debug, "Headers_only: {}, end_stream: {}.", *this, headers_only, end_stream);
181
3952
  if (end_stream) {
182
1800
    end_stream_decoded_ = true;
183
1800
  }
184
3952
  saw_regular_headers_ = false;
185
3952
  quic::QuicRstStreamErrorCode rst = quic::QUIC_STREAM_NO_ERROR;
186
3952
  auto server_session = static_cast<EnvoyQuicServerSession*>(session());
187
3952
  std::unique_ptr<Http::RequestHeaderMapImpl> headers =
188
3952
      quicHeadersToEnvoyHeaders<Http::RequestHeaderMapImpl>(
189
3952
          header_list, *this, server_session->max_inbound_header_list_size(),
190
3952
          filterManagerConnection()->maxIncomingHeadersCount(), details_, rst);
191
3952
  if (headers == nullptr) {
192
647
    onStreamError(close_connection_upon_invalid_header_, rst);
193
647
    return;
194
647
  }
195

            
196
3305
#ifndef ENVOY_ENABLE_UHV
197
  // These checks are now part of UHV
198
3305
  if (Http::HeaderUtility::checkRequiredRequestHeaders(*headers) != Http::okStatus() ||
199
3305
      Http::HeaderUtility::checkValidRequestHeaders(*headers) != Http::okStatus() ||
200
3305
      (headers->Protocol() && !spdy_session()->allow_extended_connect())) {
201
    details_ = Http3ResponseCodeDetailValues::invalid_http_header;
202
    onStreamError(absl::nullopt);
203
    return;
204
  }
205

            
206
  // Extended CONNECT to H/1 upgrade transformation has moved to UHV
207
3305
  if (Http::Utility::isH3UpgradeRequest(*headers)) {
208
    // Transform Request from H3 to H1
209
84
    Http::Utility::transformUpgradeRequestFromH3toH1(*headers);
210
84
  }
211
#else
212
  if (Http::HeaderUtility::checkRequiredRequestHeaders(*headers) != Http::okStatus() ||
213
      (headers->Protocol() && !spdy_session()->allow_extended_connect())) {
214
    details_ = Http3ResponseCodeDetailValues::invalid_http_header;
215
    onStreamError(absl::nullopt);
216
    return;
217
  }
218
#endif
219

            
220
3305
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
221
3305
  if (Http::HeaderUtility::isCapsuleProtocol(*headers) ||
222
3305
      Http::HeaderUtility::isConnectUdpRequest(*headers)) {
223
22
    useCapsuleProtocol();
224
    // HTTP/3 Datagrams sent over CONNECT-UDP are already congestion controlled, so make it bypass
225
    // the default Datagram queue.
226
22
    if (Http::HeaderUtility::isConnectUdpRequest(*headers)) {
227
22
      session()->SetForceFlushForDefaultQueue(true);
228
22
    }
229
22
  }
230
3305
#endif
231

            
232
3305
  Http::RequestDecoder* decoder = request_decoder_->get().ptr();
233
3305
  if (decoder != nullptr) {
234
3304
    decoder->decodeHeaders(std::move(headers), /*end_stream=*/end_stream);
235
3304
  }
236
3305
  ConsumeHeaderList();
237
3305
}
238

            
239
147198
void EnvoyQuicServerStream::OnStreamFrame(const quic::QuicStreamFrame& frame) {
240
147198
  uint64_t highest_byte_received = frame.data_length + frame.offset;
241
147198
  if (highest_byte_received > bytesMeter()->wireBytesReceived()) {
242
146754
    mutableBytesMeter()->addWireBytesReceived(highest_byte_received -
243
146754
                                              bytesMeter()->wireBytesReceived());
244
146754
  }
245
147198
  quic::QuicSpdyServerStreamBase::OnStreamFrame(frame);
246
147198
}
247

            
248
107020
void EnvoyQuicServerStream::OnBodyAvailable() {
249
107020
  ASSERT(FinishedReadingHeaders());
250
107020
  if (read_side_closed()) {
251
15
    return;
252
15
  }
253
  // If read has been disabled, QUIC should not deliver any more data upstream to increase the bytes
254
  // buffered/processed.
255
107005
  if (Runtime::runtimeFeatureEnabled(
256
107005
          "envoy.reloadable_features.quic_disable_data_read_immediately") &&
257
107005
      read_disable_counter_ > 0) {
258
    return;
259
  }
260

            
261
107005
  Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
262
  // TODO(danzh): check Envoy per stream buffer limit.
263
  // Currently read out all the data.
264
229165
  while (HasBytesToRead()) {
265
122160
    iovec iov;
266
122160
    int num_regions = GetReadableRegions(&iov, 1);
267
122160
    ASSERT(num_regions > 0);
268
122160
    size_t bytes_read = iov.iov_len;
269
122160
    buffer->add(iov.iov_base, bytes_read);
270
122160
    MarkConsumed(bytes_read);
271
122160
  }
272

            
273
107005
  bool fin_read_and_no_trailers = IsDoneReading();
274
107005
  ENVOY_STREAM_LOG(debug, "Received {} bytes of data {} FIN.", *this, buffer->length(),
275
107005
                   fin_read_and_no_trailers ? "with" : "without");
276
  // If this call is triggered by an empty frame with FIN which is not from peer
277
  // but synthesized by stream itself upon receiving HEADERS with FIN or
278
  // TRAILERS, do not deliver end of stream here. Because either decodeHeaders
279
  // already delivered it or decodeTrailers will be called.
280
107005
  bool skip_decoding = (buffer->length() == 0 && !fin_read_and_no_trailers) || end_stream_decoded_;
281
107005
  if (!skip_decoding) {
282
105778
    if (fin_read_and_no_trailers) {
283
1499
      end_stream_decoded_ = true;
284
1499
    }
285
105778
    updateReceivedContentBytes(buffer->length(), fin_read_and_no_trailers);
286
105778
    if (stream_error() != quic::QUIC_STREAM_NO_ERROR) {
287
      // A stream error has occurred, stop processing.
288
2
      return;
289
2
    }
290
105776
    Http::RequestDecoder* decoder = request_decoder_->get().ptr();
291
105776
    if (decoder != nullptr) {
292
105776
      decoder->decodeData(*buffer, fin_read_and_no_trailers);
293
105776
    }
294
105776
  }
295

            
296
107003
  if (!sequencer()->IsClosed() || read_side_closed()) {
297
104253
    return;
298
104253
  }
299

            
300
  // Trailers may arrived earlier and wait to be consumed after reading all the body. Consume it
301
  // here.
302
2750
  maybeDecodeTrailers();
303

            
304
2750
  OnFinRead();
305
2750
}
306

            
307
void EnvoyQuicServerStream::OnTrailingHeadersComplete(bool fin, size_t frame_len,
308
105
                                                      const quic::QuicHeaderList& header_list) {
309
105
  mutableBytesMeter()->addHeaderBytesReceived(frame_len);
310
105
  addDecompressedHeaderBytesReceived(header_list);
311
105
  ENVOY_STREAM_LOG(debug, "Received trailers: {}.", *this, received_trailers().DebugString());
312
105
  quic::QuicSpdyServerStreamBase::OnTrailingHeadersComplete(fin, frame_len, header_list);
313
105
  if (read_side_closed()) {
314
3
    return;
315
3
  }
316
102
  ASSERT(trailers_decompressed());
317
102
  if (session()->connection()->connected() && !rst_sent()) {
318
102
    maybeDecodeTrailers();
319
102
  }
320
102
}
321

            
322
7
void EnvoyQuicServerStream::OnHeadersTooLarge() {
323
7
  ENVOY_STREAM_LOG(debug, "Headers too large.", *this);
324
7
  details_ = Http3ResponseCodeDetailValues::headers_too_large;
325
7
  quic::QuicSpdyServerStreamBase::OnHeadersTooLarge();
326
7
}
327

            
328
2852
void EnvoyQuicServerStream::maybeDecodeTrailers() {
329
2852
  if (sequencer()->IsClosed() && !FinishedReadingTrailers()) {
330
    // Only decode trailers after finishing decoding body.
331
102
    end_stream_decoded_ = true;
332
102
    updateReceivedContentBytes(0, true);
333
102
    if (stream_error() != quic::QUIC_STREAM_NO_ERROR) {
334
      // A stream error has occurred, stop processing.
335
1
      return;
336
1
    }
337
101
    quic::QuicRstStreamErrorCode rst = quic::QUIC_STREAM_NO_ERROR;
338
101
    auto server_session = static_cast<EnvoyQuicServerSession*>(session());
339
101
    auto trailers = http2HeaderBlockToEnvoyTrailers<Http::RequestTrailerMapImpl>(
340
101
        received_trailers(), server_session->max_inbound_header_list_size(),
341
101
        filterManagerConnection()->maxIncomingHeadersCount(), *this, details_, rst);
342
101
    if (trailers == nullptr) {
343
5
      onStreamError(close_connection_upon_invalid_header_, rst);
344
5
      return;
345
5
    }
346
96
    Http::RequestDecoder* decoder = request_decoder_->get().ptr();
347
96
    if (decoder != nullptr) {
348
96
      decoder->decodeTrailers(std::move(trailers));
349
96
    }
350
96
    MarkTrailersConsumed();
351
96
  }
352
2852
}
353

            
354
289
bool EnvoyQuicServerStream::OnStopSending(quic::QuicResetStreamError error) {
355
  // Only called in IETF Quic to close write side.
356
289
  ENVOY_STREAM_LOG(debug, "received STOP_SENDING with reset code={}", *this,
357
289
                   static_cast<int>(error.internal_code()));
358
289
  stats_.rx_reset_.inc();
359
289
  bool end_stream_encoded = local_end_stream_;
360
  // This call will close write.
361
289
  if (!quic::QuicSpdyServerStreamBase::OnStopSending(error)) {
362
22
    return false;
363
22
  }
364
267
  ASSERT(write_side_closed());
365
  // Also stop reading because the peer already didn't care about the response any more.
366
267
  if (!reading_stopped()) {
367
190
    StopReading();
368
190
  }
369
267
  if (!end_stream_encoded) {
370
    // If both directions are closed but end stream hasn't been encoded yet, notify reset callbacks.
371
    // Treat this as a remote reset, since the stream will be closed in both directions.
372
259
    runResetCallbacks(
373
259
        quicRstErrorToEnvoyRemoteResetReason(error.internal_code()),
374
259
        absl::StrCat(quic::QuicRstStreamErrorCodeToString(error.internal_code()), "|FROM_PEER"));
375
259
  }
376
267
  return true;
377
289
}
378

            
379
214
void EnvoyQuicServerStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) {
380
214
  ENVOY_STREAM_LOG(debug, "received RESET_STREAM with reset code={}", *this,
381
214
                   static_cast<int>(frame.error_code));
382
214
  stats_.rx_reset_.inc();
383
214
  bool end_stream_decoded_and_encoded = read_side_closed() && local_end_stream_;
384
  // This closes read side in IETF Quic, but doesn't close write side.
385
214
  quic::QuicSpdyServerStreamBase::OnStreamReset(frame);
386
214
  ASSERT(read_side_closed());
387
214
  if (write_side_closed() && !end_stream_decoded_and_encoded) {
388
    // If both directions are closed but upstream hasn't received or sent end stream, run reset
389
    // stream callback.
390
211
    runResetCallbacks(
391
211
        quicRstErrorToEnvoyRemoteResetReason(frame.error_code),
392
211
        absl::StrCat(quic::QuicRstStreamErrorCodeToString(frame.error_code), "|FROM_PEER"));
393
211
  }
394
214
}
395

            
396
274
void EnvoyQuicServerStream::ResetWithError(quic::QuicResetStreamError error) {
397
274
  ENVOY_STREAM_LOG(debug, "sending reset code={}", *this, static_cast<int>(error.internal_code()));
398
274
  stats_.tx_reset_.inc();
399
274
  filterManagerConnection()->incrementSentQuicResetStreamErrorStats(error, /*from_self*/ true,
400
274
                                                                    /*is_upstream*/ false);
401
274
  if (!local_end_stream_) {
402
    // Upper layers expect calling resetStream() to immediately raise reset callbacks.
403
270
    runResetCallbacks(
404
270
        quicRstErrorToEnvoyLocalResetReason(error.internal_code()),
405
270
        absl::StrCat(quic::QuicRstStreamErrorCodeToString(error.internal_code()), "|FROM_SELF"));
406
270
  }
407
274
  quic::QuicSpdyServerStreamBase::ResetWithError(error);
408
274
}
409

            
410
void EnvoyQuicServerStream::OnConnectionClosed(const quic::QuicConnectionCloseFrame& frame,
411
1270
                                               quic::ConnectionCloseSource source) {
412
  // Run reset callback before closing the stream so that the watermark change will not trigger
413
  // callbacks.
414
1270
  if (!local_end_stream_) {
415
909
    runResetCallbacks(source == quic::ConnectionCloseSource::FROM_SELF
416
909
                          ? quicErrorCodeToEnvoyLocalResetReason(frame.quic_error_code,
417
826
                                                                 session()->OneRttKeysAvailable())
418
909
                          : quicErrorCodeToEnvoyRemoteResetReason(frame.quic_error_code),
419
909
                      absl::StrCat(quic::QuicErrorCodeToString(frame.quic_error_code), "|",
420
909
                                   quic::ConnectionCloseSourceToString(source), "|",
421
909
                                   frame.error_details));
422
909
  }
423
1270
  quic::QuicSpdyServerStreamBase::OnConnectionClosed(frame, source);
424
1270
}
425

            
426
5163
void EnvoyQuicServerStream::CloseWriteSide() {
427
  // Clear the downstream since the stream should not write additional data
428
  // after this is called, e.g. cannot reset the stream.
429
  // Only the downstream stream should clear the downstream of the
430
  // memory account.
431
  //
432
  // There are cases where a corresponding upstream stream dtor might
433
  // be called, but the downstream stream isn't going to terminate soon
434
  // such as StreamDecoderFilterCallbacks::recreateStream().
435
5163
  if (buffer_memory_account_) {
436
2
    buffer_memory_account_->clearDownstream();
437
2
  }
438
5163
  quic::QuicSpdyServerStreamBase::CloseWriteSide();
439
5163
}
440

            
441
4011
void EnvoyQuicServerStream::OnClose() {
442
4011
  destroy();
443
4011
  quic::QuicSpdyServerStreamBase::OnClose();
444
4011
  if (isDoingWatermarkAccounting()) {
445
2335
    return;
446
2335
  }
447
1676
  clearWatermarkBuffer();
448
1676
  if (stats_gatherer_->notify_ack_listener_before_soon_to_be_destroyed()) {
449
    // Either stats_gatherer_ will do deferred logging upon receiving the last
450
    // ACK, or OnSoonToBeDestroyed() will catch all the cases where the stream
451
    // is destroyed without receiving the last ACK.
452
1676
    return;
453
1676
  }
454
  if (!stats_gatherer_->loggingDone()) {
455
    stats_gatherer_->maybeDoDeferredLog(/* record_ack_timing */ false);
456
  }
457
  stats_gatherer_ = nullptr;
458
}
459

            
460
1676
void EnvoyQuicServerStream::clearWatermarkBuffer() {
461
1676
  if (BufferedDataBytes() > 0) {
462
    // If the stream is closed without sending out all buffered data, regard
463
    // them as sent now and adjust connection buffer book keeping.
464
18
    updateBytesBuffered(BufferedDataBytes(), 0);
465
18
  }
466
1676
}
467

            
468
16685
void EnvoyQuicServerStream::OnCanWrite() {
469
16685
  SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
470
16685
  quic::QuicSpdyServerStreamBase::OnCanWrite();
471
16685
}
472

            
473
43
uint32_t EnvoyQuicServerStream::streamId() { return id(); }
474

            
475
86
Network::Connection* EnvoyQuicServerStream::connection() { return filterManagerConnection(); }
476

            
477
15449
QuicFilterManagerConnectionImpl* EnvoyQuicServerStream::filterManagerConnection() {
478
15449
  return dynamic_cast<QuicFilterManagerConnectionImpl*>(session());
479
15449
}
480

            
481
Http::HeaderUtility::HeaderValidationResult
482
EnvoyQuicServerStream::validateHeader(absl::string_view header_name,
483
178313
                                      absl::string_view header_value) {
484
178313
  Http::HeaderUtility::HeaderValidationResult result =
485
178313
      EnvoyQuicStream::validateHeader(header_name, header_value);
486
178313
  if (result != Http::HeaderUtility::HeaderValidationResult::ACCEPT) {
487
647
    return result;
488
647
  }
489
  // Do request specific checks.
490
177666
  result = Http::HeaderUtility::checkHeaderNameForUnderscores(
491
177666
      header_name, headers_with_underscores_action_, stats_);
492
177666
  if (result != Http::HeaderUtility::HeaderValidationResult::ACCEPT) {
493
8
    details_ = Http3ResponseCodeDetailValues::invalid_underscore;
494
8
    return result;
495
8
  }
496
177658
  ASSERT(!header_name.empty());
497
177658
  if (!Http::HeaderUtility::isPseudoHeader(header_name)) {
498
163117
    if (header_name == "cookie" && header_value.empty()) {
499
2
      return Http::HeaderUtility::HeaderValidationResult::DROP;
500
2
    }
501
163115
    return result;
502
163117
  }
503
14541
  static const absl::flat_hash_set<std::string> known_pseudo_headers{":authority", ":protocol",
504
14541
                                                                     ":path", ":method", ":scheme"};
505
14541
  if (header_name == ":path") {
506
2962
    if (saw_path_) {
507
      // According to RFC9114, :path header should only have one value.
508
1
      return Http::HeaderUtility::HeaderValidationResult::REJECT;
509
1
    }
510
2961
    saw_path_ = true;
511
11579
  } else if (!known_pseudo_headers.contains(header_name)) {
512
    return Http::HeaderUtility::HeaderValidationResult::REJECT;
513
  }
514
14540
  return result;
515
14541
}
516

            
517
void EnvoyQuicServerStream::OnMetadataComplete(size_t /*frame_len*/,
518
592
                                               const quic::QuicHeaderList& header_list) {
519
592
  if (mustRejectMetadata(header_list.uncompressed_header_bytes())) {
520
2
    onStreamError(true, quic::QUIC_HEADERS_TOO_LARGE);
521
2
    return;
522
2
  }
523
590
  if (!header_list.empty()) {
524
590
    Http::RequestDecoder* decoder = request_decoder_->get().ptr();
525
590
    if (decoder != nullptr) {
526
590
      decoder->decodeMetadata(metadataMapFromHeaderList(header_list));
527
590
    }
528
590
  }
529
590
}
530

            
531
void EnvoyQuicServerStream::onStreamError(absl::optional<bool> should_close_connection,
532
672
                                          quic::QuicRstStreamErrorCode rst) {
533
672
  if (details_.empty()) {
534
665
    details_ = Http3ResponseCodeDetailValues::invalid_http_header;
535
665
  }
536

            
537
672
  bool close_connection_upon_invalid_header;
538
672
  if (should_close_connection != absl::nullopt) {
539
657
    close_connection_upon_invalid_header = should_close_connection.value();
540
662
  } else {
541
15
    close_connection_upon_invalid_header =
542
15
        !http3_options_.override_stream_error_on_invalid_http_message().value();
543
15
  }
544
672
  if (close_connection_upon_invalid_header) {
545
659
    stream_delegate()->OnStreamError(quic::QUIC_HTTP_FRAME_ERROR, std::string(details_));
546
668
  } else {
547
13
    Reset(rst);
548
13
  }
549
672
}
550

            
551
2
void EnvoyQuicServerStream::onPendingFlushTimer() {
552
2
  ENVOY_STREAM_LOG(debug, "pending stream flush timeout", *this);
553
2
  Http::MultiplexedStreamImplBase::onPendingFlushTimer();
554
2
  stats_.tx_flush_timeout_.inc();
555
2
  ASSERT(local_end_stream_ && !fin_sent());
556
  // Reset the stream locally. But no reset callbacks will be run because higher layers think the
557
  // stream is already finished.
558
2
  Reset(quic::QUIC_STREAM_CANCELLED);
559
2
}
560

            
561
2573
bool EnvoyQuicServerStream::hasPendingData() {
562
  // Quic stream sends headers and trailers on the same stream, and buffers them in the same sending
563
  // buffer if needed. So checking this buffer is sufficient.
564
2573
  return (!write_side_closed()) && BufferedDataBytes() > 0;
565
2573
}
566

            
567
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
568
22
void EnvoyQuicServerStream::useCapsuleProtocol() {
569
22
  http_datagram_handler_ = std::make_unique<HttpDatagramHandler>(*this);
570
22
  ASSERT(request_decoder_->get().has_value());
571
22
  http_datagram_handler_->setStreamDecoder(request_decoder_->get().ptr());
572
22
  RegisterHttp3DatagramVisitor(http_datagram_handler_.get());
573
22
}
574
#endif
575

            
576
15
void EnvoyQuicServerStream::OnInvalidHeaders() { onStreamError(absl::nullopt); }
577

            
578
4011
void EnvoyQuicServerStream::OnSoonToBeDestroyed() {
579
4011
  quic::QuicSpdyServerStreamBase::OnSoonToBeDestroyed();
580
4011
  if (stats_gatherer_ != nullptr &&
581
4011
      stats_gatherer_->notify_ack_listener_before_soon_to_be_destroyed() &&
582
4011
      !stats_gatherer_->loggingDone()) {
583
    // Catch all the cases where the stream is destroyed without receiving the
584
    // last ACK.
585
2539
    stats_gatherer_->maybeDoDeferredLog(/* record_ack_timing */ false);
586
2539
  }
587
4011
}
588

            
589
} // namespace Quic
590
} // namespace Envoy