1
#include "source/common/quic/envoy_quic_client_stream.h"
2

            
3
#include "source/common/buffer/buffer_impl.h"
4
#include "source/common/common/assert.h"
5
#include "source/common/common/enum_to_int.h"
6
#include "source/common/http/codes.h"
7
#include "source/common/http/header_map_impl.h"
8
#include "source/common/http/header_utility.h"
9
#include "source/common/http/utility.h"
10
#include "source/common/quic/envoy_quic_client_session.h"
11
#include "source/common/quic/envoy_quic_utils.h"
12
#include "source/common/runtime/runtime_features.h"
13

            
14
#include "quiche/common/http/http_header_block.h"
15
#include "quiche/quic/core/http/quic_header_list.h"
16
#include "quiche/quic/core/quic_session.h"
17
#include "quiche/quic/core/quic_types.h"
18

            
19
namespace Envoy {
20
namespace Quic {
21

            
22
EnvoyQuicClientStream::EnvoyQuicClientStream(
23
    quic::QuicStreamId id, quic::QuicSpdyClientSession* client_session, quic::StreamType type,
24
    Http::Http3::CodecStats& stats,
25
    const envoy::config::core::v3::Http3ProtocolOptions& http3_options)
26
4058
    : quic::QuicSpdyClientStream(id, client_session, type),
27
4058
      EnvoyQuicStream(
28
4058
          *this, *client_session,
29
          // Flow control receive window should be larger than 8k so that the send buffer can fully
30
          // utilize congestion control window before it reaches the high watermark.
31
4058
          static_cast<uint32_t>(GetReceiveWindow().value()), *filterManagerConnection(),
32
4058
          [this]() { runLowWatermarkCallbacks(); }, [this]() { runHighWatermarkCallbacks(); },
33
4058
          stats, http3_options) {
34
4058
  ASSERT(static_cast<uint32_t>(GetReceiveWindow().value()) > 8 * 1024,
35
4058
         "Send buffer limit should be larger than 8KB.");
36
4058
  RegisterMetadataVisitor(this);
37
4058
}
38

            
39
4062
void EnvoyQuicClientStream::setResponseDecoder(Http::ResponseDecoder& decoder) {
40
4062
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.use_response_decoder_handle")) {
41
4061
    response_decoder_handle_ = decoder.createResponseDecoderHandle();
42
4061
  }
43
4062
  response_decoder_ = &decoder;
44
4062
}
45

            
46
Http::Status EnvoyQuicClientStream::encodeHeaders(const Http::RequestHeaderMap& headers,
47
4051
                                                  bool end_stream) {
48
4051
  ENVOY_STREAM_LOG(debug, "encodeHeaders: (end_stream={}) {}.", *this, end_stream, headers);
49
4051
#ifndef ENVOY_ENABLE_UHV
50
  // Headers are now validated by UHV before encoding by the codec. Two checks below are not needed
51
  // when UHV is enabled.
52
  //
53
  // Required headers must be present. This can only happen by some erroneous processing after the
54
  // downstream codecs decode.
55
4051
  RETURN_IF_ERROR(Http::HeaderUtility::checkRequiredRequestHeaders(headers));
56
  // Verify that a filter hasn't added an invalid header key or value.
57
4042
  RETURN_IF_ERROR(Http::HeaderUtility::checkValidRequestHeaders(headers));
58
4040
#endif
59

            
60
4040
  if (write_side_closed()) {
61
1
    return absl::CancelledError("encodeHeaders is called on write-closed stream.");
62
1
  }
63

            
64
4039
  local_end_stream_ = end_stream;
65
4039
  SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
66
4039
  quiche::HttpHeaderBlock spdy_headers;
67
4039
#ifndef ENVOY_ENABLE_UHV
68
  // Extended CONNECT to H/1 upgrade transformation has moved to UHV
69
4039
  if (Http::Utility::isUpgrade(headers)) {
70
    // In Envoy, both upgrade requests and extended CONNECT requests are
71
    // represented as their HTTP/1 forms, regardless of the HTTP version used.
72
    // Therefore, these need to be transformed into their HTTP/3 form, before
73
    // sending them.
74
83
    upgrade_protocol_ = std::string(headers.getUpgradeValue());
75
83
    Http::RequestHeaderMapPtr modified_headers =
76
83
        Http::createHeaderMap<Http::RequestHeaderMapImpl>(headers);
77
83
    Http::Utility::transformUpgradeRequestFromH1toH3(*modified_headers);
78
83
    spdy_headers = envoyHeadersToHttp2HeaderBlock(*modified_headers);
79
4032
  } else if (headers.Method()) {
80
3956
    spdy_headers = envoyHeadersToHttp2HeaderBlock(headers);
81
3956
    if (headers.Method()->value() == "CONNECT") {
82
355
      Http::RequestHeaderMapPtr modified_headers =
83
355
          Http::createHeaderMap<Http::RequestHeaderMapImpl>(headers);
84
355
      modified_headers->remove(Http::Headers::get().Scheme);
85
355
      modified_headers->remove(Http::Headers::get().Path);
86
355
      modified_headers->remove(Http::Headers::get().Protocol);
87
355
      spdy_headers = envoyHeadersToHttp2HeaderBlock(*modified_headers);
88
3943
    } else if (headers.Method()->value() == "HEAD") {
89
28
      sent_head_request_ = true;
90
28
    }
91
3956
  }
92
4039
  if (spdy_headers.empty()) {
93
    spdy_headers = envoyHeadersToHttp2HeaderBlock(headers);
94
  }
95
#else
96
  spdy_headers = envoyHeadersToHttp2HeaderBlock(headers);
97
  if (headers.Method()->value() == "HEAD") {
98
    sent_head_request_ = true;
99
  }
100
#endif
101
4039
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
102
4039
  if (Http::HeaderUtility::isCapsuleProtocol(headers) ||
103
4039
      Http::HeaderUtility::isConnectUdpRequest(headers)) {
104
23
    useCapsuleProtocol();
105
23
    if (Http::HeaderUtility::isConnectUdpRequest(headers)) {
106
      // HTTP/3 Datagrams sent over CONNECT-UDP are already congestion controlled, so make it
107
      // bypass the default Datagram queue.
108
20
      session()->SetForceFlushForDefaultQueue(true);
109
20
    }
110
23
  }
111
4039
#endif
112
4039
  addDecompressedHeaderBytesSent(spdy_headers);
113
4039
  {
114
4039
    IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), true);
115
4039
    size_t bytes_sent = WriteHeaders(std::move(spdy_headers), end_stream, nullptr);
116
4039
    ENVOY_BUG(bytes_sent != 0, "Failed to encode headers.");
117
4039
  }
118

            
119
4039
  if (local_end_stream_) {
120
1887
    if (codec_callbacks_) {
121
      codec_callbacks_->onCodecEncodeComplete();
122
    }
123
1887
    onLocalEndStream();
124
1887
  }
125
4039
  return Http::okStatus();
126
4040
}
127

            
128
107
void EnvoyQuicClientStream::encodeTrailers(const Http::RequestTrailerMap& trailers) {
129
107
  ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers);
130
107
  quiche::HttpHeaderBlock trailer_block = envoyHeadersToHttp2HeaderBlock(trailers);
131
107
  addDecompressedHeaderBytesSent(trailer_block);
132
107
  encodeTrailersImpl(std::move(trailer_block));
133
107
}
134

            
135
362
void EnvoyQuicClientStream::resetStream(Http::StreamResetReason reason) {
136
362
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
137
362
  if (http_datagram_handler_) {
138
1
    UnregisterHttp3DatagramVisitor();
139
1
  }
140
362
#endif
141
362
  Reset(envoyResetReasonToQuicRstError(reason));
142
362
}
143

            
144
1730
void EnvoyQuicClientStream::switchStreamBlockState() {
145
  // From when the callback got scheduled till now, readDisable() might have blocked and unblocked
146
  // the stream multiple times, but those actions haven't taken any effect yet, and only the last
147
  // state of read_disable_counter_ determines whether to unblock or block the quic stream. Unlike
148
  // Envoy readDisable() the quic stream gets blocked/unblocked based on the most recent call. So a
149
  // stream will be blocked upon SetBlockedUntilFlush() no matter how many times SetUnblocked() was
150
  // called before, and vice versa.
151
1730
  if (read_disable_counter_ > 0) {
152
856
    sequencer()->SetBlockedUntilFlush();
153
908
  } else {
154
874
    sequencer()->SetUnblocked();
155
874
  }
156
1730
}
157

            
158
void EnvoyQuicClientStream::OnInitialHeadersComplete(bool fin, size_t frame_len,
159
2983
                                                     const quic::QuicHeaderList& header_list) {
160
2983
  mutableBytesMeter()->addHeaderBytesReceived(frame_len);
161
2983
  addDecompressedHeaderBytesReceived(header_list);
162
2983
  if (read_side_closed()) {
163
1
    return;
164
1
  }
165
2982
  quic::QuicSpdyStream::OnInitialHeadersComplete(fin, frame_len, header_list);
166
2982
  if (read_side_closed()) {
167
12
    return;
168
12
  }
169

            
170
2970
  if (!headers_decompressed() || header_list.empty()) {
171
    onStreamError(!http3_options_.override_stream_error_on_invalid_http_message().value(),
172
                  quic::QUIC_BAD_APPLICATION_PAYLOAD);
173
    return;
174
  }
175

            
176
2970
  ENVOY_STREAM_LOG(debug, "Received headers: {}.", *this, header_list.DebugString());
177
2970
  if (fin) {
178
4
    end_stream_decoded_ = true;
179
4
  }
180
2970
  saw_regular_headers_ = false;
181
2970
  quic::QuicRstStreamErrorCode transform_rst = quic::QUIC_STREAM_NO_ERROR;
182
2970
  auto client_session = static_cast<EnvoyQuicClientSession*>(session());
183
2970
  std::unique_ptr<Http::ResponseHeaderMapImpl> headers =
184
2970
      quicHeadersToEnvoyHeaders<Http::ResponseHeaderMapImpl>(
185
2970
          header_list, *this, client_session->max_inbound_header_list_size(),
186
2970
          filterManagerConnection()->maxIncomingHeadersCount(), details_, transform_rst);
187
2970
  if (headers == nullptr) {
188
10
    onStreamError(close_connection_upon_invalid_header_, transform_rst);
189
10
    return;
190
10
  }
191

            
192
2960
  const absl::optional<uint64_t> optional_status =
193
2960
      Http::Utility::getResponseStatusOrNullopt(*headers);
194
2960
#ifndef ENVOY_ENABLE_UHV
195
2960
  if (!optional_status.has_value()) {
196
    details_ = Http3ResponseCodeDetailValues::invalid_http_header;
197
    onStreamError(!http3_options_.override_stream_error_on_invalid_http_message().value(),
198
                  quic::QUIC_BAD_APPLICATION_PAYLOAD);
199
    return;
200
  }
201

            
202
2960
  if (!upgrade_protocol_.empty()) {
203
74
    Http::Utility::transformUpgradeResponseFromH3toH1(*headers, upgrade_protocol_);
204
74
  }
205
#else
206
  // Extended CONNECT to H/1 upgrade transformation has moved to UHV
207
  // In Envoy, both upgrade requests and extended CONNECT requests are
208
  // represented as their HTTP/1 forms, regardless of the HTTP version used.
209
  // Therefore, these need to be transformed into their HTTP/1 form.
210

            
211
  // In UHV mode the :status header at this point can be malformed, as it is validated
212
  // later on in the response_decoder_.decodeHeaders() call.
213
  // Account for this here.
214
  if (!optional_status.has_value()) {
215
    // In case the status is invalid or missing, the response_decoder_.decodeHeaders() will fail the
216
    // request
217
    if (Http::ResponseDecoder* decoder = getResponseDecoder()) {
218
      decoder->decodeHeaders(std::move(headers), fin);
219
    } else {
220
      onResponseDecoderDead();
221
    }
222
    ConsumeHeaderList();
223
    return;
224
  }
225
#endif
226

            
227
2960
  const uint64_t status = optional_status.value();
228
  // TODO(#29071) determine how to handle 101, since it is not supported by HTTP/2
229
2960
  if (Http::CodeUtility::is1xx(status)) {
230
    // These are Informational 1xx headers, not the actual response headers.
231
97
    set_headers_decompressed(false);
232
97
  }
233

            
234
2960
  const bool is_special_1xx = Http::HeaderUtility::isSpecial1xx(*headers);
235
2960
  if (is_special_1xx && !decoded_1xx_) {
236
    // This is 100 Continue, only decode it once to support Expect:100-Continue header.
237
77
    decoded_1xx_ = true;
238
77
    if (Http::ResponseDecoder* decoder = getResponseDecoder()) {
239
76
      decoder->decode1xxHeaders(std::move(headers));
240
76
    } else {
241
1
      onResponseDecoderDead();
242
1
    }
243
2883
  } else if (!is_special_1xx) {
244
2864
    if (Http::ResponseDecoder* decoder = getResponseDecoder()) {
245
2863
      decoder->decodeHeaders(std::move(headers),
246
2863
                             /*end_stream=*/fin);
247
2863
    } else {
248
1
      onResponseDecoderDead();
249
1
    }
250
2864
    if (status == enumToInt(Http::Code::NotModified)) {
251
30
      got_304_response_ = true;
252
30
    }
253
2864
  }
254

            
255
2960
  ConsumeHeaderList();
256
2960
}
257

            
258
400112
void EnvoyQuicClientStream::OnStreamFrame(const quic::QuicStreamFrame& frame) {
259
400112
  uint64_t highest_byte_received = frame.data_length + frame.offset;
260
400112
  if (highest_byte_received > bytesMeter()->wireBytesReceived()) {
261
91669
    mutableBytesMeter()->addWireBytesReceived(highest_byte_received -
262
91669
                                              bytesMeter()->wireBytesReceived());
263
91669
  }
264
400112
  quic::QuicSpdyClientStream::OnStreamFrame(frame);
265
400112
}
266

            
267
361
bool EnvoyQuicClientStream::OnStopSending(quic::QuicResetStreamError error) {
268
  // Only called in IETF Quic to close write side.
269
361
  ENVOY_STREAM_LOG(debug, "received STOP_SENDING with reset code={}", *this,
270
361
                   static_cast<int>(error.internal_code()));
271
361
  bool end_stream_encoded = local_end_stream_;
272
  // This call will close write.
273
361
  if (!quic::QuicSpdyClientStream::OnStopSending(error)) {
274
217
    return false;
275
217
  }
276

            
277
144
  stats_.rx_reset_.inc();
278

            
279
144
  if (read_side_closed() && !end_stream_encoded) {
280
    // If both directions are closed but end stream hasn't been encoded yet, notify reset callbacks.
281
    // Treat this as a remote reset, since the stream will be closed in both directions.
282
106
    runResetCallbacks(
283
106
        quicRstErrorToEnvoyRemoteResetReason(error.internal_code()),
284
106
        absl::StrCat(quic::QuicRstStreamErrorCodeToString(error.internal_code()), "|FROM_PEER"));
285
106
  }
286
144
  return true;
287
361
}
288

            
289
367513
void EnvoyQuicClientStream::OnBodyAvailable() {
290
367513
  ASSERT(FinishedReadingHeaders());
291
367513
  if (read_side_closed()) {
292
24
    return;
293
24
  }
294

            
295
367489
  Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
296
  // TODO(danzh): check Envoy per stream buffer limit.
297
  // Currently read out all the data.
298
788653
  while (HasBytesToRead()) {
299
421164
    iovec iov;
300
421164
    int num_regions = GetReadableRegions(&iov, 1);
301
421164
    ASSERT(num_regions > 0);
302
421164
    size_t bytes_read = iov.iov_len;
303
421164
    buffer->add(iov.iov_base, bytes_read);
304
421164
    MarkConsumed(bytes_read);
305
421164
  }
306
367489
  ASSERT(buffer->length() == 0 || !end_stream_decoded_);
307

            
308
367489
  bool fin_read_and_no_trailers = IsDoneReading();
309
  // If this call is triggered by an empty frame with FIN which is not from peer
310
  // but synthesized by stream itself upon receiving HEADERS with FIN or
311
  // TRAILERS, do not deliver end of stream here. Because either decodeHeaders
312
  // already delivered it or decodeTrailers will be called.
313
367489
  bool skip_decoding = (buffer->length() == 0 && !fin_read_and_no_trailers) || end_stream_decoded_;
314
367489
  if (!skip_decoding) {
315
367431
    if (fin_read_and_no_trailers) {
316
2442
      end_stream_decoded_ = true;
317
2442
    }
318
367431
    updateReceivedContentBytes(buffer->length(), fin_read_and_no_trailers);
319
367431
    if (stream_error() != quic::QUIC_STREAM_NO_ERROR) {
320
      // A stream error has occurred, stop processing.
321
3
      return;
322
3
    }
323
367428
    if (Http::ResponseDecoder* decoder = getResponseDecoder()) {
324
367427
      decoder->decodeData(*buffer, fin_read_and_no_trailers);
325
367427
    } else {
326
1
      onResponseDecoderDead();
327
1
    }
328
367428
  }
329

            
330
367486
  if (!sequencer()->IsClosed() || read_side_closed()) {
331
365000
    return;
332
365000
  }
333

            
334
  // Trailers may arrived earlier and wait to be consumed after reading all the body. Consume it
335
  // here.
336
2486
  maybeDecodeTrailers();
337

            
338
2486
  OnFinRead();
339
2486
}
340

            
341
void EnvoyQuicClientStream::OnTrailingHeadersComplete(bool fin, size_t frame_len,
342
74
                                                      const quic::QuicHeaderList& header_list) {
343
74
  mutableBytesMeter()->addHeaderBytesReceived(frame_len);
344
74
  addDecompressedHeaderBytesReceived(header_list);
345
74
  if (read_side_closed()) {
346
    return;
347
  }
348
74
  ENVOY_STREAM_LOG(debug, "Received trailers: {}.", *this, header_list.DebugString());
349
74
  quic::QuicSpdyStream::OnTrailingHeadersComplete(fin, frame_len, header_list);
350
74
  ASSERT(trailers_decompressed());
351
74
  if (session()->connection()->connected() && !rst_sent()) {
352
74
    maybeDecodeTrailers();
353
74
  }
354
74
}
355

            
356
2560
void EnvoyQuicClientStream::maybeDecodeTrailers() {
357
2560
  if (sequencer()->IsClosed() && !FinishedReadingTrailers()) {
358
    // Only decode trailers after finishing decoding body.
359
73
    end_stream_decoded_ = true;
360
73
    updateReceivedContentBytes(0, true);
361
73
    if (stream_error() != quic::QUIC_STREAM_NO_ERROR) {
362
      // A stream error has occurred, stop processing.
363
      return;
364
    }
365
73
    quic::QuicRstStreamErrorCode transform_rst = quic::QUIC_STREAM_NO_ERROR;
366
73
    auto client_session = static_cast<EnvoyQuicClientSession*>(session());
367
73
    auto trailers = http2HeaderBlockToEnvoyTrailers<Http::ResponseTrailerMapImpl>(
368
73
        received_trailers(), client_session->max_inbound_header_list_size(),
369
73
        filterManagerConnection()->maxIncomingHeadersCount(), *this, details_, transform_rst);
370
73
    if (trailers == nullptr) {
371
      onStreamError(close_connection_upon_invalid_header_, transform_rst);
372
      return;
373
    }
374
73
    if (Http::ResponseDecoder* decoder = getResponseDecoder()) {
375
71
      decoder->decodeTrailers(std::move(trailers));
376
71
    } else {
377
2
      onResponseDecoderDead();
378
2
    }
379
73
    MarkTrailersConsumed();
380
73
  }
381
2560
}
382

            
383
249
void EnvoyQuicClientStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) {
384
249
  ENVOY_STREAM_LOG(debug, "received reset code={}", *this, static_cast<int>(frame.error_code));
385
249
  stats_.rx_reset_.inc();
386
249
  bool end_stream_decoded_and_encoded = read_side_closed() && local_end_stream_;
387
  // This closes read side in IETF Quic, but doesn't close write side.
388
249
  quic::QuicSpdyClientStream::OnStreamReset(frame);
389
249
  ASSERT(read_side_closed());
390
249
  if (write_side_closed() && !end_stream_decoded_and_encoded) {
391
248
    runResetCallbacks(
392
248
        quicRstErrorToEnvoyRemoteResetReason(frame.error_code),
393
248
        absl::StrCat(quic::QuicRstStreamErrorCodeToString(frame.error_code), "|FROM_PEER"));
394
248
  }
395
249
}
396

            
397
381
void EnvoyQuicClientStream::ResetWithError(quic::QuicResetStreamError error) {
398
381
  ENVOY_STREAM_LOG(debug, "sending reset code={}", *this, static_cast<int>(error.internal_code()));
399
381
  stats_.tx_reset_.inc();
400
381
  filterManagerConnection()->incrementSentQuicResetStreamErrorStats(error, /*from_self*/ true,
401
381
                                                                    /*is_upstream*/ true);
402
  // Upper layers expect calling resetStream() to immediately raise reset callbacks.
403
381
  runResetCallbacks(
404
381
      quicRstErrorToEnvoyLocalResetReason(error.internal_code()),
405
381
      absl::StrCat(quic::QuicRstStreamErrorCodeToString(error.internal_code()), "|FROM_SELF"));
406
381
  if (session()->connection()->connected()) {
407
373
    quic::QuicSpdyClientStream::ResetWithError(error);
408
373
  }
409
381
}
410

            
411
void EnvoyQuicClientStream::OnConnectionClosed(const quic::QuicConnectionCloseFrame& frame,
412
987
                                               quic::ConnectionCloseSource source) {
413
987
  if (!end_stream_decoded_) {
414
938
    runResetCallbacks(source == quic::ConnectionCloseSource::FROM_SELF
415
938
                          ? quicErrorCodeToEnvoyLocalResetReason(frame.quic_error_code,
416
145
                                                                 session()->OneRttKeysAvailable())
417
938
                          : quicErrorCodeToEnvoyRemoteResetReason(frame.quic_error_code),
418
938
                      absl::StrCat(quic::QuicErrorCodeToString(frame.quic_error_code), "|",
419
938
                                   quic::ConnectionCloseSourceToString(source), "|",
420
938
                                   frame.error_details));
421
938
  }
422
987
  quic::QuicSpdyClientStream::OnConnectionClosed(frame, source);
423
987
}
424

            
425
4058
void EnvoyQuicClientStream::OnClose() {
426
4058
  destroy();
427
4058
  quic::QuicSpdyClientStream::OnClose();
428
4058
  if (isDoingWatermarkAccounting()) {
429
    // This is called in the scope of a watermark buffer updater. Clear the
430
    // buffer accounting afterwards so that the updater doesn't override the
431
    // result.
432
39
    return;
433
39
  }
434
4019
  clearWatermarkBuffer();
435
4019
}
436

            
437
4019
void EnvoyQuicClientStream::clearWatermarkBuffer() {
438
4019
  if (BufferedDataBytes() > 0) {
439
    // If the stream is closed without sending out all buffered data, regard
440
    // them as sent now and adjust connection buffer book keeping.
441
11
    updateBytesBuffered(BufferedDataBytes(), 0);
442
11
  }
443
4019
}
444

            
445
33378
void EnvoyQuicClientStream::OnCanWrite() {
446
33378
  SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
447
33378
  quic::QuicSpdyClientStream::OnCanWrite();
448
33378
}
449

            
450
27
uint32_t EnvoyQuicClientStream::streamId() { return id(); }
451

            
452
1470
Network::Connection* EnvoyQuicClientStream::connection() { return filterManagerConnection(); }
453

            
454
8952
QuicFilterManagerConnectionImpl* EnvoyQuicClientStream::filterManagerConnection() {
455
8952
  return dynamic_cast<QuicFilterManagerConnectionImpl*>(session());
456
8952
}
457

            
458
void EnvoyQuicClientStream::OnMetadataComplete(size_t /*frame_len*/,
459
418
                                               const quic::QuicHeaderList& header_list) {
460
418
  if (mustRejectMetadata(header_list.uncompressed_header_bytes())) {
461
2
    onStreamError(true, quic::QUIC_HEADERS_TOO_LARGE);
462

            
463
2
    return;
464
2
  }
465
416
  if (!header_list.empty()) {
466
416
    if (Http::ResponseDecoder* decoder = getResponseDecoder()) {
467
416
      decoder->decodeMetadata(metadataMapFromHeaderList(header_list));
468
416
    } else {
469
      onResponseDecoderDead();
470
    }
471
416
  }
472
416
}
473

            
474
void EnvoyQuicClientStream::onStreamError(absl::optional<bool> should_close_connection,
475
27
                                          quic::QuicRstStreamErrorCode rst_code) {
476
27
  if (details_.empty()) {
477
22
    details_ = Http3ResponseCodeDetailValues::invalid_http_header;
478
22
  }
479
27
  bool close_connection_upon_invalid_header;
480
27
  if (should_close_connection != absl::nullopt) {
481
15
    close_connection_upon_invalid_header = should_close_connection.value();
482
21
  } else {
483
12
    close_connection_upon_invalid_header =
484
12
        !http3_options_.override_stream_error_on_invalid_http_message().value();
485
12
  }
486
27
  if (close_connection_upon_invalid_header) {
487
13
    stream_delegate()->OnStreamError(quic::QUIC_HTTP_FRAME_ERROR, "Invalid headers");
488
20
  } else {
489
14
    Reset(rst_code);
490
14
  }
491
27
}
492

            
493
3505
bool EnvoyQuicClientStream::hasPendingData() { return BufferedDataBytes() > 0; }
494

            
495
#ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
496
// TODO(https://github.com/envoyproxy/envoy/issues/23564): Make the stream use Capsule Protocol
497
// for CONNECT-UDP support when the headers contain "Capsule-Protocol: ?1" or "Upgrade:
498
// connect-udp".
499
23
void EnvoyQuicClientStream::useCapsuleProtocol() {
500
23
  http_datagram_handler_ = std::make_unique<HttpDatagramHandler>(*this);
501
23
  http_datagram_handler_->setStreamDecoder(getResponseDecoder());
502
23
  RegisterHttp3DatagramVisitor(http_datagram_handler_.get());
503
23
}
504
#endif
505

            
506
12
void EnvoyQuicClientStream::OnInvalidHeaders() {
507
12
  onStreamError(absl::nullopt, quic::QUIC_BAD_APPLICATION_PAYLOAD);
508
12
}
509

            
510
5
void EnvoyQuicClientStream::onResponseDecoderDead() const {
511
5
  const std::string error_msg = "response_decoder_ use after free detected.";
512
5
  IS_ENVOY_BUG(error_msg);
513
5
  RELEASE_ASSERT(!Runtime::runtimeFeatureEnabled(
514
5
                     "envoy.reloadable_features.abort_when_accessing_dead_decoder"),
515
5
                 error_msg);
516
5
}
517

            
518
370881
Http::ResponseDecoder* EnvoyQuicClientStream::getResponseDecoder() {
519
370881
  if (response_decoder_handle_ == nullptr) {
520
2
    return response_decoder_;
521
2
  }
522
370879
  if (response_decoder_handle_) {
523
370879
    if (OptRef<Http::ResponseDecoder> decoder = response_decoder_handle_->get();
524
370879
        decoder.has_value()) {
525
370874
      return &decoder.value().get();
526
370874
    }
527
370879
  }
528
5
  return nullptr;
529
370879
}
530

            
531
} // namespace Quic
532
} // namespace Envoy