Line data Source code
1 : #include "source/common/quic/envoy_quic_server_stream.h"
2 :
3 : #include <openssl/bio.h>
4 : #include <openssl/evp.h>
5 :
6 : #include <memory>
7 :
8 : #include "source/common/buffer/buffer_impl.h"
9 : #include "source/common/common/assert.h"
10 : #include "source/common/http/header_map_impl.h"
11 : #include "source/common/http/header_utility.h"
12 : #include "source/common/http/utility.h"
13 : #include "source/common/quic/envoy_quic_server_session.h"
14 : #include "source/common/quic/envoy_quic_utils.h"
15 : #include "source/common/quic/quic_stats_gatherer.h"
16 :
17 : #include "quiche/quic/core/http/quic_header_list.h"
18 : #include "quiche/quic/core/quic_session.h"
19 : #include "quiche/spdy/core/http2_header_block.h"
20 : #include "quiche_platform_impl/quiche_mem_slice_impl.h"
21 :
22 : namespace Envoy {
23 : namespace Quic {
24 :
25 : EnvoyQuicServerStream::EnvoyQuicServerStream(
26 : quic::QuicStreamId id, quic::QuicSpdySession* session, quic::StreamType type,
27 : Http::Http3::CodecStats& stats,
28 : const envoy::config::core::v3::Http3ProtocolOptions& http3_options,
29 : envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
30 : headers_with_underscores_action)
31 : : quic::QuicSpdyServerStreamBase(id, session, type),
32 : EnvoyQuicStream(
33 : // Flow control receive window should be larger than 8k to fully utilize congestion
34 : // control window before it reaches the high watermark.
35 : static_cast<uint32_t>(GetReceiveWindow().value()), *filterManagerConnection(),
36 0 : [this]() { runLowWatermarkCallbacks(); }, [this]() { runHighWatermarkCallbacks(); },
37 : stats, http3_options),
38 232 : headers_with_underscores_action_(headers_with_underscores_action) {
39 232 : ASSERT(static_cast<uint32_t>(GetReceiveWindow().value()) > 8 * 1024,
40 232 : "Send buffer limit should be larger than 8KB.");
41 :
42 232 : stats_gatherer_ = new QuicStatsGatherer(&filterManagerConnection()->dispatcher().timeSource());
43 232 : set_ack_listener(stats_gatherer_);
44 232 : }
45 :
46 0 : void EnvoyQuicServerStream::encode1xxHeaders(const Http::ResponseHeaderMap& headers) {
47 0 : ASSERT(Http::HeaderUtility::isSpecial1xx(headers));
48 0 : encodeHeaders(headers, false);
49 0 : }
50 :
51 0 : void EnvoyQuicServerStream::encodeHeaders(const Http::ResponseHeaderMap& headers, bool end_stream) {
52 0 : ENVOY_STREAM_LOG(debug, "encodeHeaders (end_stream={}) {}.", *this, end_stream, headers);
53 0 : if (write_side_closed()) {
54 0 : IS_ENVOY_BUG("encodeHeaders is called on write-closed stream.");
55 0 : return;
56 0 : }
57 :
58 : // In Envoy, both upgrade requests and extended CONNECT requests are
59 : // represented as their HTTP/1 forms, regardless of the HTTP version used.
60 : // Therefore, these need to be transformed into their HTTP/3 form, before
61 : // sending them.
62 0 : const Http::ResponseHeaderMap* header_map = &headers;
63 0 : std::unique_ptr<Http::ResponseHeaderMapImpl> modified_headers;
64 0 : #ifndef ENVOY_ENABLE_UHV
65 : // Extended CONNECT to H/1 upgrade transformation has moved to UHV
66 0 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.use_http3_header_normalisation") &&
67 0 : Http::Utility::isUpgrade(headers)) {
68 0 : modified_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers);
69 0 : Http::Utility::transformUpgradeResponseFromH1toH3(*modified_headers);
70 0 : header_map = modified_headers.get();
71 0 : }
72 0 : #endif
73 : // This is counting not serialized bytes in the send buffer.
74 0 : local_end_stream_ = end_stream;
75 0 : SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
76 0 : {
77 0 : IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), true);
78 0 : size_t bytes_sent =
79 0 : WriteHeaders(envoyHeadersToHttp2HeaderBlock(*header_map), end_stream, nullptr);
80 0 : stats_gatherer_->addBytesSent(bytes_sent, end_stream);
81 0 : ENVOY_BUG(bytes_sent != 0, "Failed to encode headers.");
82 0 : }
83 :
84 0 : if (local_end_stream_) {
85 0 : if (codec_callbacks_) {
86 0 : codec_callbacks_->onCodecEncodeComplete();
87 0 : }
88 0 : onLocalEndStream();
89 0 : }
90 0 : }
91 :
92 0 : void EnvoyQuicServerStream::encodeData(Buffer::Instance& data, bool end_stream) {
93 0 : ENVOY_STREAM_LOG(debug, "encodeData (end_stream={}) of {} bytes.", *this, end_stream,
94 0 : data.length());
95 0 : const bool has_data = data.length() > 0;
96 0 : if (!has_data && !end_stream) {
97 0 : return;
98 0 : }
99 0 : if (write_side_closed()) {
100 0 : IS_ENVOY_BUG("encodeData is called on write-closed stream.");
101 0 : return;
102 0 : }
103 0 : ASSERT(!local_end_stream_);
104 0 : local_end_stream_ = end_stream;
105 0 : SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
106 0 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
107 0 : if (http_datagram_handler_) {
108 0 : IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), false);
109 0 : if (!http_datagram_handler_->encodeCapsuleFragment(data.toString(), end_stream)) {
110 0 : Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
111 0 : return;
112 0 : }
113 0 : } else {
114 0 : #endif
115 0 : Buffer::RawSliceVector raw_slices = data.getRawSlices();
116 0 : absl::InlinedVector<quiche::QuicheMemSlice, 4> quic_slices;
117 0 : quic_slices.reserve(raw_slices.size());
118 0 : for (auto& slice : raw_slices) {
119 0 : ASSERT(slice.len_ != 0);
120 : // Move each slice into a stand-alone buffer.
121 : // TODO(danzh): investigate the cost of allocating one buffer per slice.
122 : // If it turns out to be expensive, add a new function to free data in the middle in buffer
123 : // interface and re-design QuicheMemSliceImpl.
124 0 : if (!Runtime::runtimeFeatureEnabled(
125 0 : "envoy.reloadable_features.quiche_use_mem_slice_releasor_api")) {
126 0 : quic_slices.emplace_back(quiche::QuicheMemSlice::InPlace(), data, slice.len_);
127 0 : } else {
128 0 : auto single_slice_buffer = std::make_unique<Buffer::OwnedImpl>();
129 0 : single_slice_buffer->move(data, slice.len_);
130 0 : quic_slices.emplace_back(
131 0 : reinterpret_cast<char*>(slice.mem_), slice.len_,
132 0 : [single_slice_buffer = std::move(single_slice_buffer)](const char*) mutable {
133 : // Free this memory explicitly when the callback is invoked.
134 0 : single_slice_buffer = nullptr;
135 0 : });
136 0 : }
137 0 : }
138 0 : quic::QuicConsumedData result{0, false};
139 0 : absl::Span<quiche::QuicheMemSlice> span(quic_slices);
140 0 : {
141 0 : IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), false);
142 0 : result = WriteBodySlices(span, end_stream);
143 0 : stats_gatherer_->addBytesSent(result.bytes_consumed, end_stream);
144 0 : }
145 : // QUIC stream must take all.
146 0 : if (result.bytes_consumed == 0 && has_data) {
147 0 : IS_ENVOY_BUG(fmt::format("Send buffer didn't take all the data. Stream is write {} with {} "
148 0 : "bytes in send buffer. Current write was rejected.",
149 0 : write_side_closed() ? "closed" : "open", BufferedDataBytes()));
150 0 : Reset(quic::QUIC_BAD_APPLICATION_PAYLOAD);
151 0 : return;
152 0 : }
153 0 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
154 0 : }
155 0 : #endif
156 0 : if (local_end_stream_) {
157 0 : if (codec_callbacks_) {
158 0 : codec_callbacks_->onCodecEncodeComplete();
159 0 : }
160 0 : onLocalEndStream();
161 0 : }
162 0 : }
163 :
164 0 : void EnvoyQuicServerStream::encodeTrailers(const Http::ResponseTrailerMap& trailers) {
165 0 : ENVOY_STREAM_LOG(debug, "encodeTrailers: {}.", *this, trailers);
166 0 : if (write_side_closed()) {
167 0 : IS_ENVOY_BUG("encodeTrailers is called on write-closed stream.");
168 0 : return;
169 0 : }
170 0 : ASSERT(!local_end_stream_);
171 0 : local_end_stream_ = true;
172 :
173 0 : SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
174 :
175 0 : {
176 0 : IncrementalBytesSentTracker tracker(*this, *mutableBytesMeter(), true);
177 0 : size_t bytes_sent = WriteTrailers(envoyHeadersToHttp2HeaderBlock(trailers), nullptr);
178 0 : ENVOY_BUG(bytes_sent != 0, "Failed to encode trailers.");
179 0 : stats_gatherer_->addBytesSent(bytes_sent, true);
180 0 : }
181 0 : if (codec_callbacks_) {
182 0 : codec_callbacks_->onCodecEncodeComplete();
183 0 : }
184 0 : onLocalEndStream();
185 0 : }
186 :
187 0 : void EnvoyQuicServerStream::encodeMetadata(const Http::MetadataMapVector& /*metadata_map_vector*/) {
188 : // Metadata Frame is not supported in QUIC.
189 0 : ENVOY_STREAM_LOG(debug, "METADATA is not supported in Http3.", *this);
190 0 : stats_.metadata_not_supported_error_.inc();
191 0 : }
192 :
193 0 : void EnvoyQuicServerStream::resetStream(Http::StreamResetReason reason) {
194 0 : if (buffer_memory_account_) {
195 0 : buffer_memory_account_->clearDownstream();
196 0 : }
197 :
198 0 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
199 0 : if (http_datagram_handler_) {
200 0 : UnregisterHttp3DatagramVisitor();
201 0 : }
202 0 : #endif
203 :
204 0 : if (local_end_stream_ && !reading_stopped()) {
205 : // This is after 200 early response. Reset with QUIC_STREAM_NO_ERROR instead
206 : // of propagating original reset reason. In QUICHE if a stream stops reading
207 : // before FIN or RESET received, it resets the steam with QUIC_STREAM_NO_ERROR.
208 0 : StopReading();
209 0 : runResetCallbacks(Http::StreamResetReason::LocalReset);
210 0 : } else {
211 0 : Reset(envoyResetReasonToQuicRstError(reason));
212 0 : }
213 0 : }
214 :
215 0 : void EnvoyQuicServerStream::switchStreamBlockState() {
216 : // From when the callback got scheduled till now, readDisable() might have blocked and unblocked
217 : // the stream multiple times, but those actions haven't taken any effect yet, and only the last
218 : // state of read_disable_counter_ determines whether to unblock or block the quic stream.
219 : // Unlike Envoy readDisable() the quic stream gets blocked/unblocked based on the most recent
220 : // call. So a stream will be blocked upon SetBlockedUntilFlush() no matter how many times
221 : // SetUnblocked() was called before, and vice versa.
222 0 : if (read_disable_counter_ > 0) {
223 0 : sequencer()->SetBlockedUntilFlush();
224 0 : } else {
225 0 : sequencer()->SetUnblocked();
226 0 : }
227 0 : }
228 :
229 : void EnvoyQuicServerStream::OnInitialHeadersComplete(bool fin, size_t frame_len,
230 167 : const quic::QuicHeaderList& header_list) {
231 167 : mutableBytesMeter()->addHeaderBytesReceived(frame_len);
232 : // TODO(danzh) Fix in QUICHE. If the stream has been reset in the call stack,
233 : // OnInitialHeadersComplete() shouldn't be called.
234 167 : if (read_side_closed()) {
235 0 : return;
236 0 : }
237 167 : quic::QuicSpdyServerStreamBase::OnInitialHeadersComplete(fin, frame_len, header_list);
238 167 : if (read_side_closed()) {
239 165 : return;
240 165 : }
241 :
242 2 : if (!headers_decompressed() || header_list.empty()) {
243 0 : onStreamError(absl::nullopt);
244 0 : return;
245 0 : }
246 :
247 2 : ENVOY_STREAM_LOG(debug, "Received headers: {}.", *this, header_list.DebugString());
248 2 : if (fin) {
249 0 : end_stream_decoded_ = true;
250 0 : }
251 2 : saw_regular_headers_ = false;
252 2 : quic::QuicRstStreamErrorCode rst = quic::QUIC_STREAM_NO_ERROR;
253 2 : auto server_session = static_cast<EnvoyQuicServerSession*>(session());
254 2 : std::unique_ptr<Http::RequestHeaderMapImpl> headers =
255 2 : quicHeadersToEnvoyHeaders<Http::RequestHeaderMapImpl>(
256 2 : header_list, *this, server_session->max_inbound_header_list_size(),
257 2 : filterManagerConnection()->maxIncomingHeadersCount(), details_, rst);
258 2 : if (headers == nullptr) {
259 2 : onStreamError(close_connection_upon_invalid_header_, rst);
260 2 : return;
261 2 : }
262 :
263 0 : #ifndef ENVOY_ENABLE_UHV
264 : // These checks are now part of UHV
265 0 : if (Http::HeaderUtility::checkRequiredRequestHeaders(*headers) != Http::okStatus() ||
266 0 : Http::HeaderUtility::checkValidRequestHeaders(*headers) != Http::okStatus() ||
267 0 : (headers->Protocol() && !spdy_session()->allow_extended_connect())) {
268 0 : details_ = Http3ResponseCodeDetailValues::invalid_http_header;
269 0 : onStreamError(absl::nullopt);
270 0 : return;
271 0 : }
272 :
273 : // Extended CONNECT to H/1 upgrade transformation has moved to UHV
274 0 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.use_http3_header_normalisation") &&
275 0 : Http::Utility::isH3UpgradeRequest(*headers)) {
276 : // Transform Request from H3 to H1
277 0 : Http::Utility::transformUpgradeRequestFromH3toH1(*headers);
278 0 : }
279 : #else
280 : if (Http::HeaderUtility::checkRequiredRequestHeaders(*headers) != Http::okStatus() ||
281 : (headers->Protocol() && !spdy_session()->allow_extended_connect())) {
282 : details_ = Http3ResponseCodeDetailValues::invalid_http_header;
283 : onStreamError(absl::nullopt);
284 : return;
285 : }
286 : #endif
287 :
288 0 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
289 0 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_connect_udp_support") &&
290 0 : (Http::HeaderUtility::isCapsuleProtocol(*headers) ||
291 0 : Http::HeaderUtility::isConnectUdpRequest(*headers))) {
292 0 : useCapsuleProtocol();
293 : // HTTP/3 Datagrams sent over CONNECT-UDP are already congestion controlled, so make it bypass
294 : // the default Datagram queue.
295 0 : if (Http::HeaderUtility::isConnectUdpRequest(*headers)) {
296 0 : session()->SetForceFlushForDefaultQueue(true);
297 0 : }
298 0 : }
299 0 : #endif
300 :
301 0 : request_decoder_->decodeHeaders(std::move(headers), /*end_stream=*/fin);
302 0 : ConsumeHeaderList();
303 0 : }
304 :
305 212 : void EnvoyQuicServerStream::OnStreamFrame(const quic::QuicStreamFrame& frame) {
306 212 : uint64_t highest_byte_received = frame.data_length + frame.offset;
307 212 : if (highest_byte_received > bytesMeter()->wireBytesReceived()) {
308 208 : mutableBytesMeter()->addWireBytesReceived(highest_byte_received -
309 208 : bytesMeter()->wireBytesReceived());
310 208 : }
311 212 : quic::QuicSpdyServerStreamBase::OnStreamFrame(frame);
312 212 : }
313 :
314 0 : void EnvoyQuicServerStream::OnBodyAvailable() {
315 0 : ASSERT(FinishedReadingHeaders());
316 0 : if (read_side_closed()) {
317 0 : return;
318 0 : }
319 :
320 0 : Buffer::InstancePtr buffer = std::make_unique<Buffer::OwnedImpl>();
321 : // TODO(danzh): check Envoy per stream buffer limit.
322 : // Currently read out all the data.
323 0 : while (HasBytesToRead()) {
324 0 : iovec iov;
325 0 : int num_regions = GetReadableRegions(&iov, 1);
326 0 : ASSERT(num_regions > 0);
327 0 : size_t bytes_read = iov.iov_len;
328 0 : buffer->add(iov.iov_base, bytes_read);
329 0 : MarkConsumed(bytes_read);
330 0 : }
331 :
332 0 : bool fin_read_and_no_trailers = IsDoneReading();
333 0 : ENVOY_STREAM_LOG(debug, "Received {} bytes of data {} FIN.", *this, buffer->length(),
334 0 : fin_read_and_no_trailers ? "with" : "without");
335 : // If this call is triggered by an empty frame with FIN which is not from peer
336 : // but synthesized by stream itself upon receiving HEADERS with FIN or
337 : // TRAILERS, do not deliver end of stream here. Because either decodeHeaders
338 : // already delivered it or decodeTrailers will be called.
339 0 : bool skip_decoding = (buffer->length() == 0 && !fin_read_and_no_trailers) || end_stream_decoded_;
340 0 : if (!skip_decoding) {
341 0 : if (fin_read_and_no_trailers) {
342 0 : end_stream_decoded_ = true;
343 0 : }
344 0 : updateReceivedContentBytes(buffer->length(), fin_read_and_no_trailers);
345 0 : if (stream_error() != quic::QUIC_STREAM_NO_ERROR) {
346 : // A stream error has occurred, stop processing.
347 0 : return;
348 0 : }
349 0 : request_decoder_->decodeData(*buffer, fin_read_and_no_trailers);
350 0 : }
351 :
352 0 : if (!sequencer()->IsClosed() || read_side_closed()) {
353 0 : return;
354 0 : }
355 :
356 : // Trailers may arrived earlier and wait to be consumed after reading all the body. Consume it
357 : // here.
358 0 : maybeDecodeTrailers();
359 :
360 0 : OnFinRead();
361 0 : }
362 :
363 : void EnvoyQuicServerStream::OnTrailingHeadersComplete(bool fin, size_t frame_len,
364 0 : const quic::QuicHeaderList& header_list) {
365 0 : mutableBytesMeter()->addHeaderBytesReceived(frame_len);
366 0 : ENVOY_STREAM_LOG(debug, "Received trailers: {}.", *this, received_trailers().DebugString());
367 0 : quic::QuicSpdyServerStreamBase::OnTrailingHeadersComplete(fin, frame_len, header_list);
368 0 : if (read_side_closed()) {
369 0 : return;
370 0 : }
371 0 : ASSERT(trailers_decompressed());
372 0 : if (session()->connection()->connected() && !rst_sent()) {
373 0 : maybeDecodeTrailers();
374 0 : }
375 0 : }
376 :
377 0 : void EnvoyQuicServerStream::OnHeadersTooLarge() {
378 0 : ENVOY_STREAM_LOG(debug, "Headers too large.", *this);
379 0 : details_ = Http3ResponseCodeDetailValues::headers_too_large;
380 0 : quic::QuicSpdyServerStreamBase::OnHeadersTooLarge();
381 0 : }
382 :
383 0 : void EnvoyQuicServerStream::maybeDecodeTrailers() {
384 0 : if (sequencer()->IsClosed() && !FinishedReadingTrailers()) {
385 : // Only decode trailers after finishing decoding body.
386 0 : end_stream_decoded_ = true;
387 0 : updateReceivedContentBytes(0, true);
388 0 : if (stream_error() != quic::QUIC_STREAM_NO_ERROR) {
389 : // A stream error has occurred, stop processing.
390 0 : return;
391 0 : }
392 0 : quic::QuicRstStreamErrorCode rst = quic::QUIC_STREAM_NO_ERROR;
393 0 : auto server_session = static_cast<EnvoyQuicServerSession*>(session());
394 0 : auto trailers = http2HeaderBlockToEnvoyTrailers<Http::RequestTrailerMapImpl>(
395 0 : received_trailers(), server_session->max_inbound_header_list_size(),
396 0 : filterManagerConnection()->maxIncomingHeadersCount(), *this, details_, rst);
397 0 : if (trailers == nullptr) {
398 0 : onStreamError(close_connection_upon_invalid_header_, rst);
399 0 : return;
400 0 : }
401 0 : request_decoder_->decodeTrailers(std::move(trailers));
402 0 : MarkTrailersConsumed();
403 0 : }
404 0 : }
405 :
406 8 : bool EnvoyQuicServerStream::OnStopSending(quic::QuicResetStreamError error) {
407 : // Only called in IETF Quic to close write side.
408 8 : ENVOY_STREAM_LOG(debug, "received STOP_SENDING with reset code={}", *this, error.internal_code());
409 8 : stats_.rx_reset_.inc();
410 8 : bool end_stream_encoded = local_end_stream_;
411 : // This call will close write.
412 8 : if (!quic::QuicSpdyServerStreamBase::OnStopSending(error)) {
413 1 : return false;
414 1 : }
415 7 : ASSERT(write_side_closed());
416 : // Also stop reading because the peer already didn't care about the response any more.
417 7 : if (!reading_stopped()) {
418 7 : StopReading();
419 7 : }
420 7 : if (!end_stream_encoded) {
421 : // If both directions are closed but end stream hasn't been encoded yet, notify reset callbacks.
422 : // Treat this as a remote reset, since the stream will be closed in both directions.
423 7 : runResetCallbacks(quicRstErrorToEnvoyRemoteResetReason(error.internal_code()));
424 7 : }
425 7 : return true;
426 8 : }
427 :
428 7 : void EnvoyQuicServerStream::OnStreamReset(const quic::QuicRstStreamFrame& frame) {
429 7 : ENVOY_STREAM_LOG(debug, "received RESET_STREAM with reset code={}", *this, frame.error_code);
430 7 : stats_.rx_reset_.inc();
431 7 : bool end_stream_decoded_and_encoded = read_side_closed() && local_end_stream_;
432 : // This closes read side in IETF Quic, but doesn't close write side.
433 7 : quic::QuicSpdyServerStreamBase::OnStreamReset(frame);
434 7 : ASSERT(read_side_closed());
435 7 : if (write_side_closed() && !end_stream_decoded_and_encoded) {
436 : // If both directions are closed but upstream hasn't received or sent end stream, run reset
437 : // stream callback.
438 1 : runResetCallbacks(quicRstErrorToEnvoyRemoteResetReason(frame.error_code));
439 1 : }
440 7 : }
441 :
442 2 : void EnvoyQuicServerStream::ResetWithError(quic::QuicResetStreamError error) {
443 2 : ENVOY_STREAM_LOG(debug, "sending reset code={}", *this, error.internal_code());
444 2 : stats_.tx_reset_.inc();
445 2 : if (!local_end_stream_) {
446 : // Upper layers expect calling resetStream() to immediately raise reset callbacks.
447 2 : runResetCallbacks(quicRstErrorToEnvoyLocalResetReason(error.internal_code()));
448 2 : }
449 2 : quic::QuicSpdyServerStreamBase::ResetWithError(error);
450 2 : }
451 :
452 : void EnvoyQuicServerStream::OnConnectionClosed(quic::QuicErrorCode error,
453 230 : quic::ConnectionCloseSource source) {
454 : // Run reset callback before closing the stream so that the watermark change will not trigger
455 : // callbacks.
456 230 : if (!local_end_stream_) {
457 230 : runResetCallbacks(
458 230 : source == quic::ConnectionCloseSource::FROM_SELF
459 230 : ? quicErrorCodeToEnvoyLocalResetReason(error, session()->OneRttKeysAvailable())
460 230 : : quicErrorCodeToEnvoyRemoteResetReason(error));
461 230 : }
462 230 : quic::QuicSpdyServerStreamBase::OnConnectionClosed(error, source);
463 230 : }
464 :
465 462 : void EnvoyQuicServerStream::CloseWriteSide() {
466 : // Clear the downstream since the stream should not write additional data
467 : // after this is called, e.g. cannot reset the stream.
468 : // Only the downstream stream should clear the downstream of the
469 : // memory account.
470 : //
471 : // There are cases where a corresponding upstream stream dtor might
472 : // be called, but the downstream stream isn't going to terminate soon
473 : // such as StreamDecoderFilterCallbacks::recreateStream().
474 462 : if (buffer_memory_account_) {
475 0 : buffer_memory_account_->clearDownstream();
476 0 : }
477 462 : quic::QuicSpdyServerStreamBase::CloseWriteSide();
478 462 : }
479 :
480 232 : void EnvoyQuicServerStream::OnClose() {
481 232 : destroy();
482 232 : quic::QuicSpdyServerStreamBase::OnClose();
483 232 : if (isDoingWatermarkAccounting()) {
484 0 : return;
485 0 : }
486 232 : clearWatermarkBuffer();
487 232 : if (!stats_gatherer_->loggingDone()) {
488 232 : stats_gatherer_->maybeDoDeferredLog(/* record_ack_timing */ false);
489 232 : }
490 232 : stats_gatherer_ = nullptr;
491 232 : }
492 :
493 232 : void EnvoyQuicServerStream::clearWatermarkBuffer() {
494 232 : if (BufferedDataBytes() > 0) {
495 : // If the stream is closed without sending out all buffered data, regard
496 : // them as sent now and adjust connection buffer book keeping.
497 0 : updateBytesBuffered(BufferedDataBytes(), 0);
498 0 : }
499 232 : }
500 :
501 0 : void EnvoyQuicServerStream::OnCanWrite() {
502 0 : SendBufferMonitor::ScopedWatermarkBufferUpdater updater(this, this);
503 0 : quic::QuicSpdyServerStreamBase::OnCanWrite();
504 0 : }
505 :
506 0 : uint32_t EnvoyQuicServerStream::streamId() { return id(); }
507 :
508 0 : Network::Connection* EnvoyQuicServerStream::connection() { return filterManagerConnection(); }
509 :
510 466 : QuicFilterManagerConnectionImpl* EnvoyQuicServerStream::filterManagerConnection() {
511 466 : return dynamic_cast<QuicFilterManagerConnectionImpl*>(session());
512 466 : }
513 :
514 : Http::HeaderUtility::HeaderValidationResult
515 : EnvoyQuicServerStream::validateHeader(absl::string_view header_name,
516 20 : absl::string_view header_value) {
517 20 : Http::HeaderUtility::HeaderValidationResult result =
518 20 : EnvoyQuicStream::validateHeader(header_name, header_value);
519 20 : if (result != Http::HeaderUtility::HeaderValidationResult::ACCEPT) {
520 2 : return result;
521 2 : }
522 : // Do request specific checks.
523 18 : result = Http::HeaderUtility::checkHeaderNameForUnderscores(
524 18 : header_name, headers_with_underscores_action_, stats_);
525 18 : if (result != Http::HeaderUtility::HeaderValidationResult::ACCEPT) {
526 0 : details_ = Http3ResponseCodeDetailValues::invalid_underscore;
527 0 : return result;
528 0 : }
529 18 : ASSERT(!header_name.empty());
530 18 : if (!Http::HeaderUtility::isPseudoHeader(header_name)) {
531 12 : return result;
532 12 : }
533 6 : static const absl::flat_hash_set<std::string> known_pseudo_headers{":authority", ":protocol",
534 6 : ":path", ":method", ":scheme"};
535 6 : if (header_name == ":path") {
536 2 : if (saw_path_) {
537 : // According to RFC9114, :path header should only have one value.
538 0 : return Http::HeaderUtility::HeaderValidationResult::REJECT;
539 0 : }
540 2 : saw_path_ = true;
541 4 : } else if (!known_pseudo_headers.contains(header_name)) {
542 0 : return Http::HeaderUtility::HeaderValidationResult::REJECT;
543 0 : }
544 6 : return result;
545 6 : }
546 :
547 : void EnvoyQuicServerStream::onStreamError(absl::optional<bool> should_close_connection,
548 167 : quic::QuicRstStreamErrorCode rst) {
549 167 : if (details_.empty()) {
550 167 : details_ = Http3ResponseCodeDetailValues::invalid_http_header;
551 167 : }
552 :
553 167 : bool close_connection_upon_invalid_header;
554 167 : if (should_close_connection != absl::nullopt) {
555 2 : close_connection_upon_invalid_header = should_close_connection.value();
556 165 : } else {
557 165 : close_connection_upon_invalid_header =
558 165 : !http3_options_.override_stream_error_on_invalid_http_message().value();
559 165 : }
560 167 : if (close_connection_upon_invalid_header) {
561 165 : stream_delegate()->OnStreamError(quic::QUIC_HTTP_FRAME_ERROR, std::string(details_));
562 165 : } else {
563 2 : Reset(rst);
564 2 : }
565 167 : }
566 :
567 0 : void EnvoyQuicServerStream::onPendingFlushTimer() {
568 0 : ENVOY_STREAM_LOG(debug, "pending stream flush timeout", *this);
569 0 : Http::MultiplexedStreamImplBase::onPendingFlushTimer();
570 0 : stats_.tx_flush_timeout_.inc();
571 0 : ASSERT(local_end_stream_ && !fin_sent());
572 : // Reset the stream locally. But no reset callbacks will be run because higher layers think the
573 : // stream is already finished.
574 0 : Reset(quic::QUIC_STREAM_CANCELLED);
575 0 : }
576 :
577 0 : bool EnvoyQuicServerStream::hasPendingData() {
578 : // Quic stream sends headers and trailers on the same stream, and buffers them in the same sending
579 : // buffer if needed. So checking this buffer is sufficient.
580 0 : return (!write_side_closed()) && BufferedDataBytes() > 0;
581 0 : }
582 :
583 : #ifdef ENVOY_ENABLE_HTTP_DATAGRAMS
584 0 : void EnvoyQuicServerStream::useCapsuleProtocol() {
585 0 : http_datagram_handler_ = std::make_unique<HttpDatagramHandler>(*this);
586 0 : ASSERT(request_decoder_ != nullptr);
587 0 : http_datagram_handler_->setStreamDecoder(request_decoder_);
588 0 : RegisterHttp3DatagramVisitor(http_datagram_handler_.get());
589 0 : }
590 : #endif
591 :
592 165 : void EnvoyQuicServerStream::OnInvalidHeaders() { onStreamError(absl::nullopt); }
593 :
594 : } // namespace Quic
595 : } // namespace Envoy
|