Line data Source code
1 : #include "source/common/quic/envoy_quic_client_stream.h"
2 :
3 : #include "source/common/buffer/buffer_impl.h"
4 : #include "source/common/common/assert.h"
5 : #include "source/common/common/enum_to_int.h"
6 : #include "source/common/http/codes.h"
7 : #include "source/common/http/header_map_impl.h"
8 : #include "source/common/http/header_utility.h"
9 : #include "source/common/http/utility.h"
10 : #include "source/common/quic/envoy_quic_client_session.h"
11 : #include "source/common/quic/envoy_quic_utils.h"
12 :
13 : #include "quiche/quic/core/http/quic_header_list.h"
14 : #include "quiche/quic/core/quic_session.h"
15 : #include "quiche/spdy/core/http2_header_block.h"
16 :
17 : namespace Envoy {
18 : namespace Quic {
19 :
20 : EnvoyQuicClientStream::EnvoyQuicClientStream(
21 : quic::QuicStreamId id, quic::QuicSpdyClientSession* client_session, quic::StreamType type,
22 : Http::Http3::CodecStats& stats,
23 : const envoy::config::core::v3::Http3ProtocolOptions& http3_options)
24 : : quic::QuicSpdyClientStream(id, client_session, type),
25 : EnvoyQuicStream(
26 : // Flow control receive window should be larger than 8k so that the send buffer can fully
27 : // utilize congestion control window before it reaches the high watermark.
28 : static_cast<uint32_t>(GetReceiveWindow().value()), *filterManagerConnection(),
29 0 : [this]() { runLowWatermarkCallbacks(); }, [this]() { runHighWatermarkCallbacks(); },
30 0 : stats, http3_options) {
31 0 : ASSERT(static_cast<uint32_t>(GetReceiveWindow().value()) > 8 * 1024,
32 0 : "Send buffer limit should be larger than 8KB.");
33 0 : }
34 :
35 : Http::Status EnvoyQuicClientStream::encodeHeaders(const Http::RequestHeaderMap& headers,
36 0 : bool end_stream) {
37 0 : ENVOY_STREAM_LOG(debug, "encodeHeaders: (end_stream={}) {}.", *this, end_stream, headers);
38 0 : #ifndef ENVOY_ENABLE_UHV
39 : // Headers are now validated by UHV before encoding by the codec. Two checks below are not needed
40 : // when UHV is enabled.
41 : //
42 : // Required headers must be present. This can only happen by some erroneous processing after the
43 : // downstream codecs decode.
44 0 : RETURN_IF_ERROR(Http::HeaderUtility::checkRequiredRequestHeaders(headers));
45 : // Verify that a filter hasn't added an invalid header key or value.
46 0 : RETURN_IF_ERROR(Http::HeaderUtility::checkValidRequestHeaders(headers));
47 0 : #endif
48 :
49 0 : if (write_side_closed()) {
50 0 : return absl::CancelledError("encodeHeaders is called on write-closed stream.");
51 0 : }
52 :
53 0 : local_end_stream_ = end_stream;
54 0 : SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
55 0 : spdy::Http2HeaderBlock spdy_headers;
56 0 : #ifndef ENVOY_ENABLE_UHV
57 : // Extended CONNECT to H/1 upgrade transformation has moved to UHV
58 0 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.use_http3_header_normalisation") &&
59 0 : Http::Utility::isUpgrade(headers)) {
60 : // In Envoy, both upgrade requests and extended CONNECT requests are
61 : // represented as their HTTP/1 forms, regardless of the HTTP version used.
62 : // Therefore, these need to be transformed into their HTTP/3 form, before
63 : // sending them.
64 0 : upgrade_protocol_ = std::string(headers.getUpgradeValue());
65 0 : Http::RequestHeaderMapPtr modified_headers =
66 0 : Http::createHeaderMap<Http::RequestHeaderMapImpl>(headers);
67 0 : Http::Utility::transformUpgradeRequestFromH1toH3(*modified_headers);
68 0 : spdy_headers = envoyHeadersToHttp2HeaderBlock(*modified_headers);
69 0 : } else if (headers.Method()) {
70 0 : spdy_headers = envoyHeadersToHttp2HeaderBlock(headers);
71 0 : if (headers.Method()->value() == "CONNECT") {
72 0 : Http::RequestHeaderMapPtr modified_headers =
73 0 : Http::createHeaderMap<Http::RequestHeaderMapImpl>(headers);
74 0 : modified_headers->remove(Http::Headers::get().Scheme);
75 0 : modified_headers->remove(Http::Headers::get().Path);
76 0 : modified_headers->remove(Http::Headers::get().Protocol);
77 0 : spdy_headers = envoyHeadersToHttp2HeaderBlock(*modified_headers);
78 0 : } else if (headers.Method()->value() == "HEAD") {
79 0 : sent_head_request_ = true;
80 0 : }
81 0 : }
82 0 : if (spdy_headers.empty()) {
83 0 : spdy_headers = envoyHeadersToHttp2HeaderBlock(headers);
84 0 : }
85 : #else
86 : spdy_headers = envoyHeadersToHttp2HeaderBlock(headers);
87 : if (headers.Method()->value() == "HEAD") {
88 : sent_head_request_ = true;
89 : }
90 : #endif
91 0 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
92 0 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_connect_udp_support") &&
93 0 : (Http::HeaderUtility::isCapsuleProtocol(headers) ||
94 0 : Http::HeaderUtility::isConnectUdpRequest(headers))) {
95 0 : useCapsuleProtocol();
96 0 : if (Http::HeaderUtility::isConnectUdpRequest(headers)) {
97 : // HTTP/3 Datagrams sent over CONNECT-UDP are already congestion controlled, so make it
98 : // bypass the default Datagram queue.
99 0 : session()->SetForceFlushForDefaultQueue(true);
100 0 : }
101 0 : }
102 0 : #endif
103 0 : {
104 0 : IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), true);
105 0 : size_t bytes_sent = WriteHeaders(std::move(spdy_headers), end_stream, nullptr);
106 0 : ENVOY_BUG(bytes_sent != 0, "Failed to encode headers.");
107 0 : }
108 :
109 0 : if (local_end_stream_) {
110 0 : if (codec_callbacks_) {
111 0 : codec_callbacks_->onCodecEncodeComplete();
112 0 : }
113 0 : onLocalEndStream();
114 0 : }
115 0 : return Http::okStatus();
116 0 : }
117 :
118 0 : void EnvoyQuicClientStream::encodeData(Buffer::Instance& data, bool end_stream) {
119 0 : ENVOY_STREAM_LOG(debug, "encodeData (end_stream={}) of {} bytes.", *this, end_stream,
120 0 : data.length());
121 0 : const bool has_data = data.length() > 0;
122 0 : if (!has_data && !end_stream) {
123 0 : return;
124 0 : }
125 0 : if (write_side_closed()) {
126 0 : IS_ENVOY_BUG("encodeData is called on write-closed stream.");
127 0 : return;
128 0 : }
129 0 : ASSERT(!local_end_stream_);
130 0 : local_end_stream_ = end_stream;
131 0 : SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
132 0 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
133 0 : if (http_datagram_handler_) {
134 0 : IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), false);
135 0 : if (!http_datagram_handler_->encodeCapsuleFragment(data.toString(), end_stream)) {
136 0 : Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
137 0 : return;
138 0 : }
139 0 : } else {
140 0 : #endif
141 0 : Buffer::RawSliceVector raw_slices = data.getRawSlices();
142 0 : absl::InlinedVector<quiche::QuicheMemSlice, 4> quic_slices;
143 0 : quic_slices.reserve(raw_slices.size());
144 0 : for (auto& slice : raw_slices) {
145 0 : ASSERT(slice.len_ != 0);
146 : // Move each slice into a stand-alone buffer.
147 : // TODO(danzh): investigate the cost of allocating one buffer per slice.
148 : // If it turns out to be expensive, add a new function to free data in the middle in buffer
149 : // interface and re-design QuicheMemSliceImpl.
150 0 : if (!Runtime::runtimeFeatureEnabled(
151 0 : "envoy.reloadable_features.quiche_use_mem_slice_releasor_api")) {
152 0 : quic_slices.emplace_back(quiche::QuicheMemSlice::InPlace(), data, slice.len_);
153 0 : } else {
154 0 : auto single_slice_buffer = std::make_unique<Buffer::OwnedImpl>();
155 0 : single_slice_buffer->move(data, slice.len_);
156 0 : quic_slices.emplace_back(
157 0 : reinterpret_cast<char*>(slice.mem_), slice.len_,
158 0 : [single_slice_buffer = std::move(single_slice_buffer)](const char*) mutable {
159 : // Free this memory explicitly when the callback is invoked.
160 0 : single_slice_buffer = nullptr;
161 0 : });
162 0 : }
163 0 : }
164 0 : quic::QuicConsumedData result{0, false};
165 0 : absl::Span<quiche::QuicheMemSlice> span(quic_slices);
166 0 : {
167 0 : IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), false);
168 0 : result = WriteBodySlices(span, end_stream);
169 0 : }
170 : // QUIC stream must take all.
171 0 : if (result.bytes_consumed == 0 && has_data) {
172 0 : IS_ENVOY_BUG(fmt::format("Send buffer didn't take all the data. Stream is write {} with {} "
173 0 : "bytes in send buffer. Current write was rejected.",
174 0 : write_side_closed() ? "closed" : "open", BufferedDataBytes()));
175 0 : Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
176 0 : return;
177 0 : }
178 0 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
179 0 : }
180 0 : #endif
181 0 : if (local_end_stream_) {
182 0 : if (codec_callbacks_) {
183 0 : codec_callbacks_->onCodecEncodeComplete();
184 0 : }
185 0 : onLocalEndStream();
186 0 : }
187 0 : }
188 :
189 0 : void EnvoyQuicClientStream::encodeTrailers(const Http::RequestTrailerMap& trailers) {
190 0 : ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers);
191 0 : if (write_side_closed()) {
192 0 : IS_ENVOY_BUG("encodeTrailers is called on write-closed stream.");
193 0 : return;
194 0 : }
195 0 : ASSERT(!local_end_stream_);
196 0 : local_end_stream_ = true;
197 0 : ScopedWatermarkBufferUpdater updater(this, this);
198 :
199 0 : {
200 0 : IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), true);
201 0 : size_t bytes_sent = WriteTrailers(envoyHeadersToHttp2HeaderBlock(trailers), nullptr);
202 0 : ENVOY_BUG(bytes_sent != 0, "Failed to encode trailers");
203 0 : }
204 :
205 0 : if (codec_callbacks_) {
206 0 : codec_callbacks_->onCodecEncodeComplete();
207 0 : }
208 0 : onLocalEndStream();
209 0 : }
210 :
211 0 : void EnvoyQuicClientStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) {
212 : // Metadata Frame is not supported in QUICHE.
213 0 : ENVOY_STREAM_LOG(debug, "METADATA is not supported in Http3.", *this);
214 0 : stats_.metadata_not_supported_error_.inc();
215 0 : }
216 :
217 0 : void EnvoyQuicClientStream::resetStream(Http::StreamResetReason reason) {
218 0 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
219 0 : if (http_datagram_handler_) {
220 0 : UnregisterHttp3DatagramVisitor();
221 0 : }
222 0 : #endif
223 0 : Reset(envoyResetReasonToQuicRstError(reason));
224 0 : }
225 :
226 0 : void EnvoyQuicClientStream::switchStreamBlockState() {
227 : // From when the callback got scheduled till now, readDisable() might have blocked and unblocked
228 : // the stream multiple times, but those actions haven't taken any effect yet, and only the last
229 : // state of read_disable_counter_ determines whether to unblock or block the quic stream. Unlike
230 : // Envoy readDisable() the quic stream gets blocked/unblocked based on the most recent call. So a
231 : // stream will be blocked upon SetBlockedUntilFlush() no matter how many times SetUnblocked() was
232 : // called before, and vice versa.
233 0 : if (read_disable_counter_ > 0) {
234 0 : sequencer()->SetBlockedUntilFlush();
235 0 : } else {
236 0 : sequencer()->SetUnblocked();
237 0 : }
238 0 : }
239 :
240 : void EnvoyQuicClientStream::OnInitialHeadersComplete(bool fin, size_t frame_len,
241 0 : const quic::QuicHeaderList& header_list) {
242 0 : mutableBytesMeter()->addHeaderBytesReceived(frame_len);
243 0 : if (read_side_closed()) {
244 0 : return;
245 0 : }
246 0 : quic::QuicSpdyStream::OnInitialHeadersComplete(fin, frame_len, header_list);
247 0 : if (read_side_closed()) {
248 0 : return;
249 0 : }
250 :
251 0 : if (!headers_decompressed() || header_list.empty()) {
252 0 : onStreamError(!http3_options_.override_stream_error_on_invalid_http_message().value(),
253 0 : quic::QUIC_BAD_APPLICATION_PAYLOAD);
254 0 : return;
255 0 : }
256 :
257 0 : ENVOY_STREAM_LOG(debug, "Received headers: {}.", *this, header_list.DebugString());
258 0 : if (fin) {
259 0 : end_stream_decoded_ = true;
260 0 : }
261 0 : saw_regular_headers_ = false;
262 0 : quic::QuicRstStreamErrorCode transform_rst = quic::QUIC_STREAM_NO_ERROR;
263 0 : auto client_session = static_cast<EnvoyQuicClientSession*>(session());
264 0 : std::unique_ptr<Http::ResponseHeaderMapImpl> headers =
265 0 : quicHeadersToEnvoyHeaders<Http::ResponseHeaderMapImpl>(
266 0 : header_list, *this, client_session->max_inbound_header_list_size(),
267 0 : filterManagerConnection()->maxIncomingHeadersCount(), details_, transform_rst);
268 0 : if (headers == nullptr) {
269 0 : onStreamError(close_connection_upon_invalid_header_, transform_rst);
270 0 : return;
271 0 : }
272 :
273 0 : const absl::optional<uint64_t> optional_status =
274 0 : Http::Utility::getResponseStatusOrNullopt(*headers);
275 0 : #ifndef ENVOY_ENABLE_UHV
276 0 : if (!optional_status.has_value()) {
277 0 : details_ = Http3ResponseCodeDetailValues::invalid_http_header;
278 0 : onStreamError(!http3_options_.override_stream_error_on_invalid_http_message().value(),
279 0 : quic::QUIC_BAD_APPLICATION_PAYLOAD);
280 0 : return;
281 0 : }
282 :
283 0 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.use_http3_header_normalisation") &&
284 0 : !upgrade_protocol_.empty()) {
285 0 : Http::Utility::transformUpgradeResponseFromH3toH1(*headers, upgrade_protocol_);
286 0 : }
287 : #else
288 : // Extended CONNECT to H/1 upgrade transformation has moved to UHV
289 : // In Envoy, both upgrade requests and extended CONNECT requests are
290 : // represented as their HTTP/1 forms, regardless of the HTTP version used.
291 : // Therefore, these need to be transformed into their HTTP/1 form.
292 :
293 : // In UHV mode the :status header at this point can be malformed, as it is validated
294 : // later on in the response_decoder_.decodeHeaders() call.
295 : // Account for this here.
296 : if (!optional_status.has_value()) {
297 : // In case the status is invalid or missing, the response_decoder_.decodeHeaders() will fail the
298 : // request
299 : response_decoder_->decodeHeaders(std::move(headers), fin);
300 : ConsumeHeaderList();
301 : return;
302 : }
303 : #endif
304 :
305 0 : const uint64_t status = optional_status.value();
306 : // TODO(#29071) determine how to handle 101, since it is not supported by HTTP/2
307 0 : if (Http::CodeUtility::is1xx(status)) {
308 : // These are Informational 1xx headers, not the actual response headers.
309 0 : set_headers_decompressed(false);
310 0 : }
311 :
312 0 : const bool is_special_1xx = Http::HeaderUtility::isSpecial1xx(*headers);
313 0 : if (is_special_1xx && !decoded_1xx_) {
314 : // This is 100 Continue, only decode it once to support Expect:100-Continue header.
315 0 : decoded_1xx_ = true;
316 0 : response_decoder_->decode1xxHeaders(std::move(headers));
317 0 : } else if (!is_special_1xx) {
318 0 : response_decoder_->decodeHeaders(std::move(headers),
319 0 : /*end_stream=*/fin);
320 0 : if (status == enumToInt(Http::Code::NotModified)) {
321 0 : got_304_response_ = true;
322 0 : }
323 0 : }
324 :
325 0 : ConsumeHeaderList();
326 0 : }
327 :
328 0 : void EnvoyQuicClientStream::OnStreamFrame(const quic::QuicStreamFrame& frame) {
329 0 : uint64_t highest_byte_received = frame.data_length + frame.offset;
330 0 : if (highest_byte_received > bytesMeter()->wireBytesReceived()) {
331 0 : mutableBytesMeter()->addWireBytesReceived(highest_byte_received -
332 0 : bytesMeter()->wireBytesReceived());
333 0 : }
334 0 : quic::QuicSpdyClientStream::OnStreamFrame(frame);
335 0 : }
336 :
337 0 : bool EnvoyQuicClientStream::OnStopSending(quic::QuicResetStreamError error) {
338 : // Only called in IETF Quic to close write side.
339 0 : ENVOY_STREAM_LOG(debug, "received STOP_SENDING with reset code={}", *this, error.internal_code());
340 0 : bool end_stream_encoded = local_end_stream_;
341 : // This call will close write.
342 0 : if (!quic::QuicSpdyClientStream::OnStopSending(error)) {
343 0 : return false;
344 0 : }
345 :
346 0 : stats_.rx_reset_.inc();
347 :
348 0 : if (read_side_closed() && !end_stream_encoded) {
349 : // If both directions are closed but end stream hasn't been encoded yet, notify reset callbacks.
350 : // Treat this as a remote reset, since the stream will be closed in both directions.
351 0 : runResetCallbacks(quicRstErrorToEnvoyRemoteResetReason(error.internal_code()));
352 0 : }
353 0 : return true;
354 0 : }
355 :
356 0 : void EnvoyQuicClientStream::OnBodyAvailable() {
357 0 : ASSERT(FinishedReadingHeaders());
358 0 : if (read_side_closed()) {
359 0 : return;
360 0 : }
361 :
362 0 : Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
363 : // TODO(danzh): check Envoy per stream buffer limit.
364 : // Currently read out all the data.
365 0 : while (HasBytesToRead()) {
366 0 : iovec iov;
367 0 : int num_regions = GetReadableRegions(&iov, 1);
368 0 : ASSERT(num_regions > 0);
369 0 : size_t bytes_read = iov.iov_len;
370 0 : buffer->add(iov.iov_base, bytes_read);
371 0 : MarkConsumed(bytes_read);
372 0 : }
373 0 : ASSERT(buffer->length() == 0 || !end_stream_decoded_);
374 :
375 0 : bool fin_read_and_no_trailers = IsDoneReading();
376 : // If this call is triggered by an empty frame with FIN which is not from peer
377 : // but synthesized by stream itself upon receiving HEADERS with FIN or
378 : // TRAILERS, do not deliver end of stream here. Because either decodeHeaders
379 : // already delivered it or decodeTrailers will be called.
380 0 : bool skip_decoding = (buffer->length() == 0 && !fin_read_and_no_trailers) || end_stream_decoded_;
381 0 : if (!skip_decoding) {
382 0 : if (fin_read_and_no_trailers) {
383 0 : end_stream_decoded_ = true;
384 0 : }
385 0 : updateReceivedContentBytes(buffer->length(), fin_read_and_no_trailers);
386 0 : if (stream_error() != quic::QUIC_STREAM_NO_ERROR) {
387 : // A stream error has occurred, stop processing.
388 0 : return;
389 0 : }
390 0 : response_decoder_->decodeData(*buffer, fin_read_and_no_trailers);
391 0 : }
392 :
393 0 : if (!sequencer()->IsClosed() || read_side_closed()) {
394 0 : return;
395 0 : }
396 :
397 : // Trailers may arrived earlier and wait to be consumed after reading all the body. Consume it
398 : // here.
399 0 : maybeDecodeTrailers();
400 :
401 0 : OnFinRead();
402 0 : }
403 :
404 : void EnvoyQuicClientStream::OnTrailingHeadersComplete(bool fin, size_t frame_len,
405 0 : const quic::QuicHeaderList& header_list) {
406 0 : mutableBytesMeter()->addHeaderBytesReceived(frame_len);
407 0 : if (read_side_closed()) {
408 0 : return;
409 0 : }
410 0 : ENVOY_STREAM_LOG(debug, "Received trailers: {}.", *this, header_list.DebugString());
411 0 : quic::QuicSpdyStream::OnTrailingHeadersComplete(fin, frame_len, header_list);
412 0 : ASSERT(trailers_decompressed());
413 0 : if (session()->connection()->connected() && !rst_sent()) {
414 0 : maybeDecodeTrailers();
415 0 : }
416 0 : }
417 :
418 0 : void EnvoyQuicClientStream::maybeDecodeTrailers() {
419 0 : if (sequencer()->IsClosed() && !FinishedReadingTrailers()) {
420 : // Only decode trailers after finishing decoding body.
421 0 : end_stream_decoded_ = true;
422 0 : updateReceivedContentBytes(0, true);
423 0 : if (stream_error() != quic::QUIC_STREAM_NO_ERROR) {
424 : // A stream error has occurred, stop processing.
425 0 : return;
426 0 : }
427 0 : quic::QuicRstStreamErrorCode transform_rst = quic::QUIC_STREAM_NO_ERROR;
428 0 : auto client_session = static_cast<EnvoyQuicClientSession*>(session());
429 0 : auto trailers = http2HeaderBlockToEnvoyTrailers<Http::ResponseTrailerMapImpl>(
430 0 : received_trailers(), client_session->max_inbound_header_list_size(),
431 0 : filterManagerConnection()->maxIncomingHeadersCount(), *this, details_, transform_rst);
432 0 : if (trailers == nullptr) {
433 0 : onStreamError(close_connection_upon_invalid_header_, transform_rst);
434 0 : return;
435 0 : }
436 0 : response_decoder_->decodeTrailers(std::move(trailers));
437 0 : MarkTrailersConsumed();
438 0 : }
439 0 : }
440 :
441 0 : void EnvoyQuicClientStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) {
442 0 : ENVOY_STREAM_LOG(debug, "received reset code={}", *this, frame.error_code);
443 0 : stats_.rx_reset_.inc();
444 0 : bool end_stream_decoded_and_encoded = read_side_closed() && local_end_stream_;
445 : // This closes read side in IETF Quic, but doesn't close write side.
446 0 : quic::QuicSpdyClientStream::OnStreamReset(frame);
447 0 : ASSERT(read_side_closed());
448 0 : if (write_side_closed() && !end_stream_decoded_and_encoded) {
449 0 : runResetCallbacks(quicRstErrorToEnvoyRemoteResetReason(frame.error_code));
450 0 : }
451 0 : }
452 :
453 0 : void EnvoyQuicClientStream::ResetWithError(quic::QuicResetStreamError error) {
454 0 : ENVOY_STREAM_LOG(debug, "sending reset code={}", *this, error.internal_code());
455 0 : stats_.tx_reset_.inc();
456 : // Upper layers expect calling resetStream() to immediately raise reset callbacks.
457 0 : runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error.internal_code()));
458 0 : if (session()->connection()->connected()) {
459 0 : quic::QuicSpdyClientStream::ResetWithError(error);
460 0 : }
461 0 : }
462 :
463 : void EnvoyQuicClientStream::OnConnectionClosed(quic::QuicErrorCode error,
464 0 : quic::ConnectionCloseSource source) {
465 0 : if (!end_stream_decoded_) {
466 0 : runResetCallbacks(
467 0 : source == quic::ConnectionCloseSource::FROM_SELF
468 0 : ? quicErrorCodeToEnvoyLocalResetReason(error, session()->OneRttKeysAvailable())
469 0 : : quicErrorCodeToEnvoyRemoteResetReason(error));
470 0 : }
471 0 : quic::QuicSpdyClientStream::OnConnectionClosed(error, source);
472 0 : }
473 :
474 0 : void EnvoyQuicClientStream::OnClose() {
475 0 : destroy();
476 0 : quic::QuicSpdyClientStream::OnClose();
477 0 : if (isDoingWatermarkAccounting()) {
478 : // This is called in the scope of a watermark buffer updater. Clear the
479 : // buffer accounting afterwards so that the updater doesn't override the
480 : // result.
481 0 : return;
482 0 : }
483 0 : clearWatermarkBuffer();
484 0 : }
485 :
486 0 : void EnvoyQuicClientStream::clearWatermarkBuffer() {
487 0 : if (BufferedDataBytes() > 0) {
488 : // If the stream is closed without sending out all buffered data, regard
489 : // them as sent now and adjust connection buffer book keeping.
490 0 : updateBytesBuffered(BufferedDataBytes(), 0);
491 0 : }
492 0 : }
493 :
494 0 : void EnvoyQuicClientStream::OnCanWrite() {
495 0 : SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
496 0 : quic::QuicSpdyClientStream::OnCanWrite();
497 0 : }
498 :
499 0 : uint32_t EnvoyQuicClientStream::streamId() { return id(); }
500 :
501 0 : Network::Connection* EnvoyQuicClientStream::connection() { return filterManagerConnection(); }
502 :
503 0 : QuicFilterManagerConnectionImpl* EnvoyQuicClientStream::filterManagerConnection() {
504 0 : return dynamic_cast<QuicFilterManagerConnectionImpl*>(session());
505 0 : }
506 :
507 : void EnvoyQuicClientStream::onStreamError(absl::optional<bool> should_close_connection,
508 0 : quic::QuicRstStreamErrorCode rst_code) {
509 0 : if (details_.empty()) {
510 0 : details_ = Http3ResponseCodeDetailValues::invalid_http_header;
511 0 : }
512 0 : bool close_connection_upon_invalid_header;
513 0 : if (should_close_connection != absl::nullopt) {
514 0 : close_connection_upon_invalid_header = should_close_connection.value();
515 0 : } else {
516 0 : close_connection_upon_invalid_header =
517 0 : !http3_options_.override_stream_error_on_invalid_http_message().value();
518 0 : }
519 0 : if (close_connection_upon_invalid_header) {
520 0 : stream_delegate()->OnStreamError(quic::QUIC_HTTP_FRAME_ERROR, "Invalid headers");
521 0 : } else {
522 0 : Reset(rst_code);
523 0 : }
524 0 : }
525 :
526 0 : bool EnvoyQuicClientStream::hasPendingData() { return BufferedDataBytes() > 0; }
527 :
528 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
529 : // TODO(https://github.com/envoyproxy/envoy/issues/23564): Make the stream use Capsule Protocol
530 : // for CONNECT-UDP support when the headers contain "Capsule-Protocol: ?1" or "Upgrade:
531 : // connect-udp".
532 0 : void EnvoyQuicClientStream::useCapsuleProtocol() {
533 0 : http_datagram_handler_ = std::make_unique<HttpDatagramHandler>(*this);
534 0 : http_datagram_handler_->setStreamDecoder(response_decoder_);
535 0 : RegisterHttp3DatagramVisitor(http_datagram_handler_.get());
536 0 : }
537 : #endif
538 :
539 0 : void EnvoyQuicClientStream::OnInvalidHeaders() {
540 0 : onStreamError(absl::nullopt, quic::QUIC_BAD_APPLICATION_PAYLOAD);
541 0 : }
542 :
543 : } // namespace Quic
544 : } // namespace Envoy
|