Line data Source code
1 : #include "source/common/http/conn_manager_impl.h"
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <functional>
6 : #include <iterator>
7 : #include <list>
8 : #include <memory>
9 : #include <string>
10 : #include <vector>
11 :
12 : #include "envoy/buffer/buffer.h"
13 : #include "envoy/common/time.h"
14 : #include "envoy/event/dispatcher.h"
15 : #include "envoy/event/scaled_range_timer_manager.h"
16 : #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
17 : #include "envoy/http/header_map.h"
18 : #include "envoy/http/header_validator_errors.h"
19 : #include "envoy/network/drain_decision.h"
20 : #include "envoy/router/router.h"
21 : #include "envoy/ssl/connection.h"
22 : #include "envoy/stats/scope.h"
23 : #include "envoy/stream_info/filter_state.h"
24 : #include "envoy/stream_info/stream_info.h"
25 : #include "envoy/tracing/tracer.h"
26 : #include "envoy/type/v3/percent.pb.h"
27 :
28 : #include "source/common/buffer/buffer_impl.h"
29 : #include "source/common/common/assert.h"
30 : #include "source/common/common/empty_string.h"
31 : #include "source/common/common/enum_to_int.h"
32 : #include "source/common/common/fmt.h"
33 : #include "source/common/common/perf_tracing.h"
34 : #include "source/common/common/scope_tracker.h"
35 : #include "source/common/common/utility.h"
36 : #include "source/common/http/codes.h"
37 : #include "source/common/http/conn_manager_utility.h"
38 : #include "source/common/http/exception.h"
39 : #include "source/common/http/header_map_impl.h"
40 : #include "source/common/http/header_utility.h"
41 : #include "source/common/http/headers.h"
42 : #include "source/common/http/http1/codec_impl.h"
43 : #include "source/common/http/http2/codec_impl.h"
44 : #include "source/common/http/path_utility.h"
45 : #include "source/common/http/status.h"
46 : #include "source/common/http/utility.h"
47 : #include "source/common/network/utility.h"
48 : #include "source/common/router/config_impl.h"
49 : #include "source/common/runtime/runtime_features.h"
50 : #include "source/common/stats/timespan_impl.h"
51 : #include "source/common/stream_info/utility.h"
52 :
53 : #include "absl/strings/escaping.h"
54 : #include "absl/strings/match.h"
55 : #include "absl/strings/str_cat.h"
56 :
57 : namespace Envoy {
58 : namespace Http {
59 :
60 : const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey =
61 : "overload.premature_reset_total_stream_count";
62 : const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
63 : "overload.premature_reset_min_stream_lifetime_seconds";
64 : // Runtime key for maximum number of requests that can be processed from a single connection per
65 : // I/O cycle. Requests over this limit are deferred until the next I/O cycle.
66 : const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle =
67 : "http.max_requests_per_io_cycle";
68 : // Don't attempt to intelligently delay close: https://github.com/envoyproxy/envoy/issues/30010
69 : const absl::string_view ConnectionManagerImpl::OptionallyDelayClose =
70 : "http1.optionally_delay_close";
71 :
72 226 : bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
73 226 : if (!headers) {
74 81 : return false;
75 81 : }
76 145 : if (protocol <= Protocol::Http11) {
77 143 : return HeaderUtility::isConnect(*headers);
78 143 : }
79 : // All HTTP/2 style upgrades were originally connect requests.
80 2 : return HeaderUtility::isConnect(*headers) || Utility::isUpgrade(*headers);
81 145 : }
82 :
83 : ConnectionManagerStats ConnectionManagerImpl::generateStats(const std::string& prefix,
84 274 : Stats::Scope& scope) {
85 274 : return ConnectionManagerStats(
86 274 : {ALL_HTTP_CONN_MAN_STATS(POOL_COUNTER_PREFIX(scope, prefix), POOL_GAUGE_PREFIX(scope, prefix),
87 274 : POOL_HISTOGRAM_PREFIX(scope, prefix))},
88 274 : prefix, scope);
89 274 : }
90 :
91 : ConnectionManagerTracingStats ConnectionManagerImpl::generateTracingStats(const std::string& prefix,
92 274 : Stats::Scope& scope) {
93 274 : return {CONN_MAN_TRACING_STATS(POOL_COUNTER_PREFIX(scope, prefix + "tracing."))};
94 274 : }
95 :
96 : ConnectionManagerListenerStats
97 238 : ConnectionManagerImpl::generateListenerStats(const std::string& prefix, Stats::Scope& scope) {
98 238 : return {CONN_MAN_LISTENER_STATS(POOL_COUNTER_PREFIX(scope, prefix))};
99 238 : }
100 :
101 : ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
102 : const Network::DrainDecision& drain_close,
103 : Random::RandomGenerator& random_generator,
104 : Http::Context& http_context, Runtime::Loader& runtime,
105 : const LocalInfo::LocalInfo& local_info,
106 : Upstream::ClusterManager& cluster_manager,
107 : Server::OverloadManager& overload_manager,
108 : TimeSource& time_source)
109 : : config_(config), stats_(config_.stats()),
110 : conn_length_(new Stats::HistogramCompletableTimespanImpl(
111 : stats_.named_.downstream_cx_length_ms_, time_source)),
112 : drain_close_(drain_close), user_agent_(http_context.userAgentContext()),
113 : random_generator_(random_generator), runtime_(runtime), local_info_(local_info),
114 : cluster_manager_(cluster_manager), listener_stats_(config_.listenerStats()),
115 : overload_manager_(overload_manager),
116 : overload_state_(overload_manager.getThreadLocalOverloadState()),
117 : accept_new_http_stream_(overload_manager.getLoadShedPoint(
118 : "envoy.load_shed_points.http_connection_manager_decode_headers")),
119 : overload_stop_accepting_requests_ref_(
120 : overload_state_.getState(Server::OverloadActionNames::get().StopAcceptingRequests)),
121 : overload_disable_keepalive_ref_(
122 : overload_state_.getState(Server::OverloadActionNames::get().DisableHttpKeepAlive)),
123 : time_source_(time_source), proxy_name_(StreamInfo::ProxyStatusUtils::makeProxyName(
124 : /*node_id=*/local_info_.node().id(),
125 : /*server_name=*/config_.serverName(),
126 : /*proxy_status_config=*/config_.proxyStatusConfig())),
127 : max_requests_during_dispatch_(
128 : runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)),
129 : refresh_rtt_after_request_(
130 922 : Runtime::runtimeFeatureEnabled("envoy.reloadable_features.refresh_rtt_after_request")) {
131 922 : ENVOY_LOG_ONCE_IF(
132 922 : trace, accept_new_http_stream_ == nullptr,
133 922 : "LoadShedPoint envoy.load_shed_points.http_connection_manager_decode_headers is not "
134 922 : "found. Is it configured?");
135 922 : }
136 :
137 0 : const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
138 0 : static const auto headers = createHeaderMap<ResponseHeaderMapImpl>(
139 0 : {{Http::Headers::get().Status, std::to_string(enumToInt(Code::Continue))}});
140 0 : return *headers;
141 0 : }
142 :
143 922 : void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
144 922 : read_callbacks_ = &callbacks;
145 922 : dispatcher_ = &callbacks.connection().dispatcher();
146 922 : if (max_requests_during_dispatch_ != UINT32_MAX) {
147 0 : deferred_request_processing_callback_ =
148 0 : dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); });
149 0 : }
150 :
151 922 : stats_.named_.downstream_cx_total_.inc();
152 922 : stats_.named_.downstream_cx_active_.inc();
153 922 : if (read_callbacks_->connection().ssl()) {
154 36 : stats_.named_.downstream_cx_ssl_total_.inc();
155 36 : stats_.named_.downstream_cx_ssl_active_.inc();
156 36 : }
157 :
158 922 : read_callbacks_->connection().addConnectionCallbacks(*this);
159 :
160 922 : if (config_.addProxyProtocolConnectionState() &&
161 922 : !read_callbacks_->connection()
162 922 : .streamInfo()
163 922 : .filterState()
164 922 : ->hasData<Network::ProxyProtocolFilterState>(Network::ProxyProtocolFilterState::key())) {
165 919 : read_callbacks_->connection().streamInfo().filterState()->setData(
166 919 : Network::ProxyProtocolFilterState::key(),
167 919 : std::make_unique<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
168 919 : read_callbacks_->connection().connectionInfoProvider().remoteAddress(),
169 919 : read_callbacks_->connection().connectionInfoProvider().localAddress()}),
170 919 : StreamInfo::FilterState::StateType::ReadOnly,
171 919 : StreamInfo::FilterState::LifeSpan::Connection);
172 919 : }
173 :
174 922 : if (config_.idleTimeout()) {
175 788 : connection_idle_timer_ =
176 788 : dispatcher_->createScaledTimer(Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout,
177 788 : [this]() -> void { onIdleTimeout(); });
178 788 : connection_idle_timer_->enableTimer(config_.idleTimeout().value());
179 788 : }
180 :
181 922 : if (config_.maxConnectionDuration()) {
182 0 : connection_duration_timer_ =
183 0 : dispatcher_->createTimer([this]() -> void { onConnectionDurationTimeout(); });
184 0 : connection_duration_timer_->enableTimer(config_.maxConnectionDuration().value());
185 0 : }
186 :
187 922 : read_callbacks_->connection().setDelayedCloseTimeout(config_.delayedCloseTimeout());
188 :
189 922 : read_callbacks_->connection().setConnectionStats(
190 922 : {stats_.named_.downstream_cx_rx_bytes_total_, stats_.named_.downstream_cx_rx_bytes_buffered_,
191 922 : stats_.named_.downstream_cx_tx_bytes_total_, stats_.named_.downstream_cx_tx_bytes_buffered_,
192 922 : nullptr, &stats_.named_.downstream_cx_delayed_close_timeout_});
193 922 : }
194 :
195 922 : ConnectionManagerImpl::~ConnectionManagerImpl() {
196 922 : stats_.named_.downstream_cx_destroy_.inc();
197 :
198 922 : stats_.named_.downstream_cx_active_.dec();
199 922 : if (read_callbacks_->connection().ssl()) {
200 36 : stats_.named_.downstream_cx_ssl_active_.dec();
201 36 : }
202 :
203 922 : if (codec_) {
204 761 : if (codec_->protocol() == Protocol::Http2) {
205 336 : stats_.named_.downstream_cx_http2_active_.dec();
206 732 : } else if (codec_->protocol() == Protocol::Http3) {
207 0 : stats_.named_.downstream_cx_http3_active_.dec();
208 425 : } else {
209 425 : stats_.named_.downstream_cx_http1_active_.dec();
210 425 : }
211 761 : }
212 :
213 922 : conn_length_->complete();
214 922 : user_agent_.completeConnectionLength(*conn_length_);
215 922 : }
216 :
217 1282 : void ConnectionManagerImpl::checkForDeferredClose(bool skip_delay_close) {
218 1282 : Network::ConnectionCloseType close = Network::ConnectionCloseType::FlushWriteAndDelay;
219 1282 : if (runtime_.snapshot().getBoolean(ConnectionManagerImpl::OptionallyDelayClose, true) &&
220 1282 : skip_delay_close) {
221 6 : close = Network::ConnectionCloseType::FlushWrite;
222 6 : }
223 1282 : if (drain_state_ == DrainState::Closing && streams_.empty() && !codec_->wantsToWrite()) {
224 : // We are closing a draining connection with no active streams and the codec has
225 : // nothing to write.
226 345 : doConnectionClose(close, absl::nullopt,
227 345 : StreamInfo::LocalCloseReasons::get().DeferredCloseOnDrainedConnection);
228 345 : }
229 1282 : }
230 :
231 555 : void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_deferred_close) {
232 : // The order of what happens in this routine is important and a little complicated. We first see
233 : // if the stream needs to be reset. If it needs to be, this will end up invoking reset callbacks
234 : // and then moving the stream to the deferred destruction list. If the stream has not been reset,
235 : // we move it to the deferred deletion list here. Then, we potentially close the connection. This
236 : // must be done after deleting the stream since the stream refers to the connection and must be
237 : // deleted first.
238 555 : bool reset_stream = false;
239 : // If the response encoder is still associated with the stream, reset the stream. The exception
240 : // here is when Envoy "ends" the stream by calling recreateStream at which point recreateStream
241 : // explicitly nulls out response_encoder to avoid the downstream being notified of the
242 : // Envoy-internal stream instance being ended.
243 555 : if (stream.response_encoder_ != nullptr && (!stream.filter_manager_.remoteDecodeComplete() ||
244 555 : !stream.state_.codec_saw_local_complete_)) {
245 : // Indicate local is complete at this point so that if we reset during a continuation, we don't
246 : // raise further data or trailers.
247 226 : ENVOY_STREAM_LOG(debug, "doEndStream() resetting stream", stream);
248 : // TODO(snowp): This call might not be necessary, try to clean up + remove setter function.
249 226 : stream.filter_manager_.setLocalComplete();
250 226 : stream.state_.codec_saw_local_complete_ = true;
251 :
252 : // Per https://tools.ietf.org/html/rfc7540#section-8.3 if there was an error
253 : // with the TCP connection during a CONNECT request, it should be
254 : // communicated via CONNECT_ERROR
255 226 : if (requestWasConnect(stream.request_headers_, codec_->protocol()) &&
256 226 : (stream.filter_manager_.streamInfo().hasResponseFlag(
257 0 : StreamInfo::ResponseFlag::UpstreamConnectionFailure) ||
258 0 : stream.filter_manager_.streamInfo().hasResponseFlag(
259 0 : StreamInfo::ResponseFlag::UpstreamConnectionTermination))) {
260 0 : stream.response_encoder_->getStream().resetStream(StreamResetReason::ConnectError);
261 226 : } else {
262 226 : if (stream.filter_manager_.streamInfo().hasResponseFlag(
263 226 : StreamInfo::ResponseFlag::UpstreamProtocolError)) {
264 2 : stream.response_encoder_->getStream().resetStream(StreamResetReason::ProtocolError);
265 224 : } else {
266 224 : stream.response_encoder_->getStream().resetStream(StreamResetReason::LocalReset);
267 224 : }
268 226 : }
269 226 : reset_stream = true;
270 226 : }
271 :
272 555 : if (!reset_stream) {
273 329 : doDeferredStreamDestroy(stream);
274 329 : }
275 :
276 555 : if (reset_stream && codec_->protocol() < Protocol::Http2) {
277 224 : drain_state_ = DrainState::Closing;
278 224 : }
279 :
280 : // If HTTP/1.0 has no content length, it is framed by close and won't consider
281 : // the request complete until the FIN is read. Don't delay close in this case.
282 555 : bool http_10_sans_cl = (codec_->protocol() == Protocol::Http10) &&
283 555 : (!stream.response_headers_ || !stream.response_headers_->ContentLength());
284 : // We also don't delay-close in the case of HTTP/1.1 where the request is
285 : // fully read, as there's no race condition to avoid.
286 555 : const bool connection_close =
287 555 : stream.filter_manager_.streamInfo().shouldDrainConnectionUponCompletion();
288 555 : bool request_complete = stream.filter_manager_.remoteDecodeComplete();
289 :
290 555 : if (check_for_deferred_close) {
291 : // Don't do delay close for HTTP/1.0 or if the request is complete.
292 555 : checkForDeferredClose(connection_close && (request_complete || http_10_sans_cl));
293 555 : }
294 555 : }
295 :
296 765 : void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
297 765 : if (!stream.state_.is_internally_destroyed_) {
298 765 : ++closed_non_internally_destroyed_requests_;
299 765 : if (isPrematureRstStream(stream)) {
300 108 : ++number_premature_stream_resets_;
301 108 : }
302 765 : }
303 765 : if (stream.max_stream_duration_timer_ != nullptr) {
304 0 : stream.max_stream_duration_timer_->disableTimer();
305 0 : stream.max_stream_duration_timer_ = nullptr;
306 0 : }
307 765 : if (stream.stream_idle_timer_ != nullptr) {
308 517 : stream.stream_idle_timer_->disableTimer();
309 517 : stream.stream_idle_timer_ = nullptr;
310 517 : }
311 765 : stream.filter_manager_.disarmRequestTimeout();
312 765 : if (stream.request_header_timer_ != nullptr) {
313 0 : stream.request_header_timer_->disableTimer();
314 0 : stream.request_header_timer_ = nullptr;
315 0 : }
316 765 : if (stream.access_log_flush_timer_ != nullptr) {
317 0 : stream.access_log_flush_timer_->disableTimer();
318 0 : stream.access_log_flush_timer_ = nullptr;
319 0 : }
320 :
321 : // Only destroy the active stream if the underlying codec has notified us of
322 : // completion or we've internal redirect the stream.
323 765 : if (!stream.canDestroyStream()) {
324 : // Track that this stream is not expecting any additional calls apart from
325 : // codec notification.
326 94 : stream.state_.is_zombie_stream_ = true;
327 94 : return;
328 94 : }
329 :
330 671 : if (stream.response_encoder_ != nullptr) {
331 671 : stream.response_encoder_->getStream().registerCodecEventCallbacks(nullptr);
332 671 : }
333 :
334 671 : stream.completeRequest();
335 :
336 : // If refresh rtt after request is required explicitly, then try to get rtt again set it into
337 : // connection info.
338 671 : if (refresh_rtt_after_request_) {
339 : // Set roundtrip time in connectionInfoSetter before OnStreamComplete
340 0 : absl::optional<std::chrono::milliseconds> t = read_callbacks_->connection().lastRoundTripTime();
341 0 : if (t.has_value()) {
342 0 : read_callbacks_->connection().connectionInfoSetter().setRoundTripTime(t.value());
343 0 : }
344 0 : }
345 :
346 671 : stream.filter_manager_.onStreamComplete();
347 :
348 : // For HTTP/3, skip access logging here and add deferred logging info
349 : // to stream info for QuicStatsGatherer to use later.
350 671 : if (codec_ && codec_->protocol() == Protocol::Http3 &&
351 : // There was a downstream reset, log immediately.
352 671 : !stream.filter_manager_.sawDownstreamReset() &&
353 : // On recreate stream, log immediately.
354 671 : stream.response_encoder_ != nullptr &&
355 671 : Runtime::runtimeFeatureEnabled(
356 0 : "envoy.reloadable_features.quic_defer_logging_to_ack_listener")) {
357 0 : stream.deferHeadersAndTrailers();
358 671 : } else {
359 : // For HTTP/1 and HTTP/2, log here as usual.
360 671 : stream.filter_manager_.log(AccessLog::AccessLogType::DownstreamEnd);
361 671 : }
362 :
363 671 : stream.filter_manager_.destroyFilters();
364 :
365 671 : dispatcher_->deferredDelete(stream.removeFromList(streams_));
366 :
367 : // The response_encoder should never be dangling (unless we're destroying a
368 : // stream we are recreating) as the codec level stream will either outlive the
369 : // ActiveStream, or be alive in deferred deletion queue at this point.
370 671 : if (stream.response_encoder_) {
371 671 : stream.response_encoder_->getStream().removeCallbacks(stream);
372 671 : }
373 :
374 671 : if (connection_idle_timer_ && streams_.empty()) {
375 408 : connection_idle_timer_->enableTimer(config_.idleTimeout().value());
376 408 : }
377 671 : maybeDrainDueToPrematureResets();
378 671 : }
379 :
380 : RequestDecoderHandlePtr ConnectionManagerImpl::newStreamHandle(ResponseEncoder& response_encoder,
381 0 : bool is_internally_created) {
382 0 : RequestDecoder& decoder = newStream(response_encoder, is_internally_created);
383 0 : return std::make_unique<ActiveStreamHandle>(static_cast<ActiveStream&>(decoder));
384 0 : }
385 :
386 : RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder,
387 671 : bool is_internally_created) {
388 671 : TRACE_EVENT("core", "ConnectionManagerImpl::newStream");
389 671 : if (connection_idle_timer_) {
390 517 : connection_idle_timer_->disableTimer();
391 517 : }
392 :
393 671 : ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());
394 :
395 671 : Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
396 671 : response_encoder.getStream().account();
397 :
398 671 : if (downstream_stream_account == nullptr) {
399 : // Create account, wiring the stream to use it for tracking bytes.
400 : // If tracking is disabled, the wiring becomes a NOP.
401 671 : auto& buffer_factory = dispatcher_->getWatermarkFactory();
402 671 : downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
403 671 : response_encoder.getStream().setAccount(downstream_stream_account);
404 671 : }
405 :
406 671 : auto new_stream = std::make_unique<ActiveStream>(
407 671 : *this, response_encoder.getStream().bufferLimit(), std::move(downstream_stream_account));
408 :
409 671 : accumulated_requests_++;
410 671 : if (config_.maxRequestsPerConnection() > 0 &&
411 671 : accumulated_requests_ >= config_.maxRequestsPerConnection()) {
412 0 : if (codec_->protocol() < Protocol::Http2) {
413 0 : new_stream->filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
414 : // Prevent erroneous debug log of closing due to incoming connection close header.
415 0 : drain_state_ = DrainState::Closing;
416 0 : } else if (drain_state_ == DrainState::NotDraining) {
417 0 : startDrainSequence();
418 0 : }
419 0 : ENVOY_CONN_LOG(debug, "max requests per connection reached", read_callbacks_->connection());
420 0 : stats_.named_.downstream_cx_max_requests_reached_.inc();
421 0 : }
422 :
423 671 : new_stream->state_.is_internally_created_ = is_internally_created;
424 671 : new_stream->response_encoder_ = &response_encoder;
425 671 : new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
426 671 : new_stream->response_encoder_->getStream().registerCodecEventCallbacks(new_stream.get());
427 671 : new_stream->response_encoder_->getStream().setFlushTimeout(new_stream->idle_timeout_ms_);
428 671 : new_stream->streamInfo().setDownstreamBytesMeter(response_encoder.getStream().bytesMeter());
429 : // If the network connection is backed up, the stream should be made aware of it on creation.
430 : // Both HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacksHelper.
431 671 : ASSERT(read_callbacks_->connection().aboveHighWatermark() == false ||
432 671 : new_stream->filter_manager_.aboveHighWatermark());
433 671 : LinkedList::moveIntoList(std::move(new_stream), streams_);
434 671 : return **streams_.begin();
435 671 : }
436 :
437 : void ConnectionManagerImpl::handleCodecErrorImpl(absl::string_view error, absl::string_view details,
438 227 : StreamInfo::ResponseFlag response_flag) {
439 227 : ENVOY_CONN_LOG(debug, "dispatch error: {}", read_callbacks_->connection(), error);
440 227 : read_callbacks_->connection().streamInfo().setResponseFlag(response_flag);
441 :
442 : // HTTP/1.1 codec has already sent a 400 response if possible. HTTP/2 codec has already sent
443 : // GOAWAY.
444 227 : doConnectionClose(Network::ConnectionCloseType::FlushWriteAndDelay, response_flag, details);
445 227 : }
446 :
447 227 : void ConnectionManagerImpl::handleCodecError(absl::string_view error) {
448 227 : handleCodecErrorImpl(error, absl::StrCat("codec_error:", StringUtil::replaceAllEmptySpace(error)),
449 227 : StreamInfo::ResponseFlag::DownstreamProtocolError);
450 227 : }
451 :
452 0 : void ConnectionManagerImpl::handleCodecOverloadError(absl::string_view error) {
453 0 : handleCodecErrorImpl(error,
454 0 : absl::StrCat("overload_error:", StringUtil::replaceAllEmptySpace(error)),
455 0 : StreamInfo::ResponseFlag::OverloadManager);
456 0 : }
457 :
458 761 : void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
459 761 : ASSERT(!codec_);
460 761 : codec_ = config_.createCodec(read_callbacks_->connection(), data, *this, overload_manager_);
461 :
462 761 : switch (codec_->protocol()) {
463 0 : case Protocol::Http3:
464 0 : stats_.named_.downstream_cx_http3_total_.inc();
465 0 : stats_.named_.downstream_cx_http3_active_.inc();
466 0 : break;
467 336 : case Protocol::Http2:
468 336 : stats_.named_.downstream_cx_http2_total_.inc();
469 336 : stats_.named_.downstream_cx_http2_active_.inc();
470 336 : break;
471 425 : case Protocol::Http11:
472 425 : case Protocol::Http10:
473 425 : stats_.named_.downstream_cx_http1_total_.inc();
474 425 : stats_.named_.downstream_cx_http1_active_.inc();
475 425 : break;
476 761 : }
477 761 : }
478 :
479 952 : Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
480 952 : requests_during_dispatch_count_ = 0;
481 952 : if (!codec_) {
482 : // Http3 codec should have been instantiated by now.
483 761 : createCodec(data);
484 761 : }
485 :
486 952 : bool redispatch;
487 952 : do {
488 952 : redispatch = false;
489 :
490 952 : const Status status = codec_->dispatch(data);
491 :
492 952 : if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
493 0 : handleCodecError(status.message());
494 0 : return Network::FilterStatus::StopIteration;
495 952 : } else if (isCodecProtocolError(status)) {
496 225 : stats_.named_.downstream_cx_protocol_error_.inc();
497 225 : handleCodecError(status.message());
498 225 : return Network::FilterStatus::StopIteration;
499 730 : } else if (isEnvoyOverloadError(status)) {
500 : // The other codecs aren't wired to send this status.
501 0 : ASSERT(codec_->protocol() < Protocol::Http2,
502 0 : "Expected only HTTP1.1 and below to send overload error.");
503 0 : stats_.named_.downstream_rq_overload_close_.inc();
504 0 : handleCodecOverloadError(status.message());
505 0 : return Network::FilterStatus::StopIteration;
506 0 : }
507 727 : ASSERT(status.ok());
508 :
509 : // Processing incoming data may release outbound data so check for closure here as well.
510 727 : checkForDeferredClose(false);
511 :
512 : // The HTTP/1 codec will pause dispatch after a single message is complete. We want to
513 : // either redispatch if there are no streams and we have more data. If we have a single
514 : // complete non-WebSocket stream but have not responded yet we will pause socket reads
515 : // to apply back pressure.
516 727 : if (codec_->protocol() < Protocol::Http2) {
517 391 : if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
518 391 : data.length() > 0 && streams_.empty()) {
519 0 : redispatch = true;
520 0 : }
521 391 : }
522 727 : } while (redispatch);
523 :
524 727 : if (!read_callbacks_->connection().streamInfo().protocol()) {
525 602 : read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
526 602 : }
527 :
528 727 : return Network::FilterStatus::StopIteration;
529 952 : }
530 :
531 884 : Network::FilterStatus ConnectionManagerImpl::onNewConnection() {
532 884 : if (!read_callbacks_->connection().streamInfo().protocol()) {
533 : // For Non-QUIC traffic, continue passing data to filters.
534 884 : return Network::FilterStatus::Continue;
535 884 : }
536 : // Only QUIC connection's stream_info_ specifies protocol.
537 0 : Buffer::OwnedImpl dummy;
538 0 : createCodec(dummy);
539 0 : ASSERT(codec_->protocol() == Protocol::Http3);
540 : // Stop iterating through network filters for QUIC. Currently QUIC connections bypass the
541 : // onData() interface because QUICHE already handles de-multiplexing.
542 0 : return Network::FilterStatus::StopIteration;
543 884 : }
544 :
545 : void ConnectionManagerImpl::resetAllStreams(absl::optional<StreamInfo::ResponseFlag> response_flag,
546 130 : absl::string_view details) {
547 270 : while (!streams_.empty()) {
548 : // Mimic a downstream reset in this case. We must also remove callbacks here. Though we are
549 : // about to close the connection and will disable further reads, it is possible that flushing
550 : // data out can cause stream callbacks to fire (e.g., low watermark callbacks).
551 : //
552 : // TODO(mattklein123): I tried to actually reset through the codec here, but ran into issues
553 : // with nghttp2 state and being unhappy about sending reset frames after the connection had
554 : // been terminated via GOAWAY. It might be possible to do something better here inside the h2
555 : // codec but there are no easy answers and this seems simpler.
556 140 : auto& stream = *streams_.front();
557 140 : stream.response_encoder_->getStream().removeCallbacks(stream);
558 140 : if (!stream.response_encoder_->getStream().responseDetails().empty()) {
559 22 : stream.filter_manager_.streamInfo().setResponseCodeDetails(
560 22 : stream.response_encoder_->getStream().responseDetails());
561 118 : } else if (!details.empty()) {
562 118 : stream.filter_manager_.streamInfo().setResponseCodeDetails(details);
563 118 : }
564 140 : if (response_flag.has_value()) {
565 50 : stream.filter_manager_.streamInfo().setResponseFlag(response_flag.value());
566 50 : }
567 140 : stream.onResetStream(StreamResetReason::ConnectionTermination, absl::string_view());
568 140 : }
569 130 : }
570 :
571 1706 : void ConnectionManagerImpl::onEvent(Network::ConnectionEvent event) {
572 1706 : if (event == Network::ConnectionEvent::LocalClose) {
573 68 : stats_.named_.downstream_cx_destroy_local_.inc();
574 68 : }
575 :
576 1706 : if (event == Network::ConnectionEvent::RemoteClose ||
577 1706 : event == Network::ConnectionEvent::LocalClose) {
578 :
579 922 : std::string details;
580 922 : if (event == Network::ConnectionEvent::RemoteClose) {
581 854 : remote_close_ = true;
582 854 : stats_.named_.downstream_cx_destroy_remote_.inc();
583 854 : details = StreamInfo::ResponseCodeDetails::get().DownstreamRemoteDisconnect;
584 894 : } else {
585 68 : absl::string_view local_close_reason = read_callbacks_->connection().localCloseReason();
586 68 : ENVOY_BUG(!local_close_reason.empty(), "Local Close Reason was not set!");
587 68 : details = fmt::format(
588 68 : fmt::runtime(StreamInfo::ResponseCodeDetails::get().DownstreamLocalDisconnect),
589 68 : StringUtil::replaceAllEmptySpace(local_close_reason));
590 68 : }
591 :
592 : // TODO(mattklein123): It is technically possible that something outside of the filter causes
593 : // a local connection close, so we still guard against that here. A better solution would be to
594 : // have some type of "pre-close" callback that we could hook for cleanup that would get called
595 : // regardless of where local close is invoked from.
596 : // NOTE: that this will cause doConnectionClose() to get called twice in the common local close
597 : // cases, but the method protects against that.
598 : // NOTE: In the case where a local close comes from outside the filter, this will cause any
599 : // stream closures to increment remote close stats. We should do better here in the future,
600 : // via the pre-close callback mentioned above.
601 922 : doConnectionClose(absl::nullopt, absl::nullopt, details);
602 922 : }
603 1706 : }
604 :
605 : void ConnectionManagerImpl::doConnectionClose(
606 : absl::optional<Network::ConnectionCloseType> close_type,
607 1494 : absl::optional<StreamInfo::ResponseFlag> response_flag, absl::string_view details) {
608 1494 : if (connection_idle_timer_) {
609 788 : connection_idle_timer_->disableTimer();
610 788 : connection_idle_timer_.reset();
611 788 : }
612 :
613 1494 : if (connection_duration_timer_) {
614 0 : connection_duration_timer_->disableTimer();
615 0 : connection_duration_timer_.reset();
616 0 : }
617 :
618 1494 : if (drain_timer_) {
619 0 : drain_timer_->disableTimer();
620 0 : drain_timer_.reset();
621 0 : }
622 :
623 1494 : if (!streams_.empty()) {
624 130 : const Network::ConnectionEvent event = close_type.has_value()
625 130 : ? Network::ConnectionEvent::LocalClose
626 130 : : Network::ConnectionEvent::RemoteClose;
627 130 : if (event == Network::ConnectionEvent::LocalClose) {
628 45 : stats_.named_.downstream_cx_destroy_local_active_rq_.inc();
629 45 : }
630 130 : if (event == Network::ConnectionEvent::RemoteClose) {
631 85 : stats_.named_.downstream_cx_destroy_remote_active_rq_.inc();
632 85 : }
633 :
634 130 : stats_.named_.downstream_cx_destroy_active_rq_.inc();
635 130 : user_agent_.onConnectionDestroy(event, true);
636 : // Note that resetAllStreams() does not actually write anything to the wire. It just resets
637 : // all upstream streams and their filter stacks. Thus, there are no issues around recursive
638 : // entry.
639 130 : resetAllStreams(response_flag, details);
640 130 : }
641 :
642 1494 : if (close_type.has_value()) {
643 572 : read_callbacks_->connection().close(close_type.value(), details);
644 572 : }
645 1494 : }
646 :
647 765 : bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const {
648 : // Check if the request was prematurely reset, by comparing its lifetime to the configured
649 : // threshold.
650 765 : ASSERT(!stream.state_.is_internally_destroyed_);
651 765 : absl::optional<std::chrono::nanoseconds> duration =
652 765 : stream.filter_manager_.streamInfo().currentDuration();
653 :
654 : // Check if request lifetime is longer than the premature reset threshold.
655 765 : if (duration) {
656 765 : const uint64_t lifetime = std::chrono::duration_cast<std::chrono::seconds>(*duration).count();
657 765 : const uint64_t min_lifetime = runtime_.snapshot().getInteger(
658 765 : ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1);
659 765 : if (lifetime > min_lifetime) {
660 0 : return false;
661 0 : }
662 765 : }
663 :
664 : // If request has completed before configured threshold, also check if the Envoy proxied the
665 : // response from the upstream. Requests without the response status were reset.
666 : // TODO(RyanTheOptimist): Possibly support half_closed_local instead.
667 765 : return !stream.filter_manager_.streamInfo().responseCode();
668 765 : }
669 :
670 : // Sends a GOAWAY if too many streams have been reset prematurely on this
671 : // connection.
672 671 : void ConnectionManagerImpl::maybeDrainDueToPrematureResets() {
673 671 : if (!Runtime::runtimeFeatureEnabled(
674 671 : "envoy.restart_features.send_goaway_for_premature_rst_streams") ||
675 671 : closed_non_internally_destroyed_requests_ == 0) {
676 0 : return;
677 0 : }
678 :
679 671 : const uint64_t limit =
680 671 : runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500);
681 :
682 671 : if (closed_non_internally_destroyed_requests_ < limit) {
683 : // Even though the total number of streams have not reached `limit`, check if the number of bad
684 : // streams is high enough that even if every subsequent stream is good, the connection
685 : // would be closed once the limit is reached, and if so close the connection now.
686 671 : if (number_premature_stream_resets_ * 2 < limit) {
687 671 : return;
688 671 : }
689 671 : } else {
690 0 : if (number_premature_stream_resets_ * 2 < closed_non_internally_destroyed_requests_) {
691 0 : return;
692 0 : }
693 0 : }
694 :
695 0 : if (read_callbacks_->connection().state() == Network::Connection::State::Open) {
696 0 : stats_.named_.downstream_rq_too_many_premature_resets_.inc();
697 0 : doConnectionClose(Network::ConnectionCloseType::Abort, absl::nullopt,
698 0 : "too_many_premature_resets");
699 0 : }
700 0 : }
701 :
702 6 : void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) {
703 : // Currently we do nothing with remote go away frames. In the future we can decide to no longer
704 : // push resources if applicable.
705 6 : }
706 :
707 0 : void ConnectionManagerImpl::onIdleTimeout() {
708 0 : ENVOY_CONN_LOG(debug, "idle timeout", read_callbacks_->connection());
709 0 : stats_.named_.downstream_cx_idle_timeout_.inc();
710 0 : if (!codec_) {
711 : // No need to delay close after flushing since an idle timeout has already fired. Attempt to
712 : // write out buffered data one last time and issue a local close if successful.
713 0 : doConnectionClose(Network::ConnectionCloseType::FlushWrite, absl::nullopt,
714 0 : StreamInfo::LocalCloseReasons::get().IdleTimeoutOnConnection);
715 0 : } else if (drain_state_ == DrainState::NotDraining) {
716 0 : startDrainSequence();
717 0 : }
718 0 : }
719 :
720 0 : void ConnectionManagerImpl::onConnectionDurationTimeout() {
721 0 : ENVOY_CONN_LOG(debug, "max connection duration reached", read_callbacks_->connection());
722 0 : stats_.named_.downstream_cx_max_duration_reached_.inc();
723 0 : if (!codec_) {
724 : // Attempt to write out buffered data one last time and issue a local close if successful.
725 0 : doConnectionClose(Network::ConnectionCloseType::FlushWrite,
726 0 : StreamInfo::ResponseFlag::DurationTimeout,
727 0 : StreamInfo::ResponseCodeDetails::get().DurationTimeout);
728 0 : } else if (drain_state_ == DrainState::NotDraining) {
729 0 : startDrainSequence();
730 0 : }
731 0 : }
732 :
733 0 : void ConnectionManagerImpl::onDrainTimeout() {
734 0 : ASSERT(drain_state_ != DrainState::NotDraining);
735 0 : codec_->goAway();
736 0 : drain_state_ = DrainState::Closing;
737 0 : checkForDeferredClose(false);
738 0 : }
739 :
740 : void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_reason,
741 0 : ConnectionManagerTracingStats& tracing_stats) {
742 0 : switch (tracing_reason) {
743 0 : case Tracing::Reason::ClientForced:
744 0 : tracing_stats.client_enabled_.inc();
745 0 : break;
746 0 : case Tracing::Reason::Sampling:
747 0 : tracing_stats.random_sampling_.inc();
748 0 : break;
749 0 : case Tracing::Reason::ServiceForced:
750 0 : tracing_stats.service_forced_.inc();
751 0 : break;
752 0 : default:
753 0 : tracing_stats.not_traceable_.inc();
754 0 : break;
755 0 : }
756 0 : }
757 :
758 : // TODO(chaoqin-li1123): Make on demand vhds and on demand srds works at the same time.
759 : void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestRouteConfigUpdate(
760 0 : Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
761 0 : absl::optional<Router::ConfigConstSharedPtr> route_config = parent_.routeConfig();
762 0 : Event::Dispatcher& thread_local_dispatcher = *parent_.connection_manager_.dispatcher_;
763 0 : if (route_config.has_value() && route_config.value()->usesVhds()) {
764 0 : ASSERT(!parent_.request_headers_->Host()->value().empty());
765 0 : const auto& host_header = absl::AsciiStrToLower(parent_.request_headers_->getHostValue());
766 0 : requestVhdsUpdate(host_header, thread_local_dispatcher, std::move(route_config_updated_cb));
767 0 : return;
768 0 : } else if (scope_key_builder_.has_value()) {
769 0 : Router::ScopeKeyPtr scope_key = scope_key_builder_->computeScopeKey(*parent_.request_headers_);
770 : // If scope_key is not null, the scope exists but RouteConfiguration is not initialized.
771 0 : if (scope_key != nullptr) {
772 0 : requestSrdsUpdate(std::move(scope_key), thread_local_dispatcher,
773 0 : std::move(route_config_updated_cb));
774 0 : return;
775 0 : }
776 0 : }
777 : // Continue the filter chain if no on demand update is requested.
778 0 : (*route_config_updated_cb)(false);
779 0 : }
780 :
781 : void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestVhdsUpdate(
782 : const std::string& host_header, Event::Dispatcher& thread_local_dispatcher,
783 0 : Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
784 0 : route_config_provider_->requestVirtualHostsUpdate(host_header, thread_local_dispatcher,
785 0 : std::move(route_config_updated_cb));
786 0 : }
787 :
788 : void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestSrdsUpdate(
789 : Router::ScopeKeyPtr scope_key, Event::Dispatcher& thread_local_dispatcher,
790 0 : Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
791 : // Since inline scope_route_config_provider is not fully implemented and never used,
792 : // dynamic cast in constructor always succeed and the pointer should not be null here.
793 0 : ASSERT(scoped_route_config_provider_ != nullptr);
794 0 : Http::RouteConfigUpdatedCallback scoped_route_config_updated_cb =
795 0 : Http::RouteConfigUpdatedCallback(
796 0 : [this, weak_route_config_updated_cb = std::weak_ptr<Http::RouteConfigUpdatedCallback>(
797 0 : route_config_updated_cb)](bool scope_exist) {
798 : // If the callback can be locked, this ActiveStream is still alive.
799 0 : if (auto cb = weak_route_config_updated_cb.lock()) {
800 : // Refresh the route before continue the filter chain.
801 0 : if (scope_exist) {
802 0 : parent_.refreshCachedRoute();
803 0 : }
804 0 : (*cb)(scope_exist && parent_.hasCachedRoute());
805 0 : }
806 0 : });
807 0 : scoped_route_config_provider_->onDemandRdsUpdate(std::move(scope_key), thread_local_dispatcher,
808 0 : std::move(scoped_route_config_updated_cb));
809 0 : }
810 :
811 : absl::optional<absl::string_view>
812 0 : ConnectionManagerImpl::HttpStreamIdProviderImpl::toStringView() const {
813 0 : if (parent_.request_headers_ == nullptr) {
814 0 : return {};
815 0 : }
816 0 : ASSERT(parent_.connection_manager_.config_.requestIDExtension() != nullptr);
817 0 : return parent_.connection_manager_.config_.requestIDExtension()->get(*parent_.request_headers_);
818 0 : }
819 :
820 0 : absl::optional<uint64_t> ConnectionManagerImpl::HttpStreamIdProviderImpl::toInteger() const {
821 0 : if (parent_.request_headers_ == nullptr) {
822 0 : return {};
823 0 : }
824 0 : ASSERT(parent_.connection_manager_.config_.requestIDExtension() != nullptr);
825 0 : return parent_.connection_manager_.config_.requestIDExtension()->getInteger(
826 0 : *parent_.request_headers_);
827 0 : }
828 :
829 : ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager,
830 : uint32_t buffer_limit,
831 : Buffer::BufferMemoryAccountSharedPtr account)
832 : : connection_manager_(connection_manager),
833 : connection_manager_tracing_config_(connection_manager_.config_.tracingConfig() == nullptr
834 : ? absl::nullopt
835 : : makeOptRef<const TracingConnectionManagerConfig>(
836 : *connection_manager_.config_.tracingConfig())),
837 : stream_id_(connection_manager.random_generator_.random()),
838 : filter_manager_(*this, *connection_manager_.dispatcher_,
839 : connection_manager_.read_callbacks_->connection(), stream_id_,
840 : std::move(account), connection_manager_.config_.proxy100Continue(),
841 : buffer_limit, connection_manager_.config_.filterFactory(),
842 : connection_manager_.config_.localReply(),
843 : connection_manager_.codec_->protocol(), connection_manager_.timeSource(),
844 : connection_manager_.read_callbacks_->connection().streamInfo().filterState(),
845 : StreamInfo::FilterState::LifeSpan::Connection),
846 : request_response_timespan_(new Stats::HistogramCompletableTimespanImpl(
847 : connection_manager_.stats_.named_.downstream_rq_time_, connection_manager_.timeSource())),
848 : header_validator_(
849 671 : connection_manager.config_.makeHeaderValidator(connection_manager.codec_->protocol())) {
850 671 : ASSERT(!connection_manager.config_.isRoutable() ||
851 671 : ((connection_manager.config_.routeConfigProvider() == nullptr &&
852 671 : connection_manager.config_.scopedRouteConfigProvider() != nullptr &&
853 671 : connection_manager.config_.scopeKeyBuilder().has_value()) ||
854 671 : (connection_manager.config_.routeConfigProvider() != nullptr &&
855 671 : connection_manager.config_.scopedRouteConfigProvider() == nullptr &&
856 671 : !connection_manager.config_.scopeKeyBuilder().has_value())),
857 671 : "Either routeConfigProvider or (scopedRouteConfigProvider and scopeKeyBuilder) should be "
858 671 : "set in "
859 671 : "ConnectionManagerImpl.");
860 671 : for (const AccessLog::InstanceSharedPtr& access_log : connection_manager_.config_.accessLogs()) {
861 668 : filter_manager_.addAccessLogHandler(access_log);
862 668 : }
863 :
864 671 : filter_manager_.streamInfo().setStreamIdProvider(
865 671 : std::make_shared<HttpStreamIdProviderImpl>(*this));
866 :
867 671 : if (connection_manager_.config_.isRoutable() &&
868 671 : connection_manager.config_.routeConfigProvider() != nullptr) {
869 573 : route_config_update_requester_ =
870 573 : std::make_unique<ConnectionManagerImpl::RdsRouteConfigUpdateRequester>(
871 573 : connection_manager.config_.routeConfigProvider(), *this);
872 605 : } else if (connection_manager_.config_.isRoutable() &&
873 98 : connection_manager.config_.scopedRouteConfigProvider() != nullptr &&
874 98 : connection_manager.config_.scopeKeyBuilder().has_value()) {
875 0 : route_config_update_requester_ =
876 0 : std::make_unique<ConnectionManagerImpl::RdsRouteConfigUpdateRequester>(
877 0 : connection_manager.config_.scopedRouteConfigProvider(),
878 0 : connection_manager.config_.scopeKeyBuilder(), *this);
879 0 : }
880 671 : ScopeTrackerScopeState scope(this,
881 671 : connection_manager_.read_callbacks_->connection().dispatcher());
882 :
883 671 : connection_manager_.stats_.named_.downstream_rq_total_.inc();
884 671 : connection_manager_.stats_.named_.downstream_rq_active_.inc();
885 671 : if (connection_manager_.codec_->protocol() == Protocol::Http2) {
886 212 : connection_manager_.stats_.named_.downstream_rq_http2_total_.inc();
887 645 : } else if (connection_manager_.codec_->protocol() == Protocol::Http3) {
888 0 : connection_manager_.stats_.named_.downstream_rq_http3_total_.inc();
889 459 : } else {
890 459 : connection_manager_.stats_.named_.downstream_rq_http1_total_.inc();
891 459 : }
892 :
893 671 : if (connection_manager_.config_.streamIdleTimeout().count()) {
894 517 : idle_timeout_ms_ = connection_manager_.config_.streamIdleTimeout();
895 517 : stream_idle_timer_ = connection_manager_.dispatcher_->createScaledTimer(
896 517 : Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
897 517 : [this]() -> void { onIdleTimeout(); });
898 517 : resetIdleTimer();
899 517 : }
900 :
901 671 : if (connection_manager_.config_.requestTimeout().count()) {
902 0 : std::chrono::milliseconds request_timeout = connection_manager_.config_.requestTimeout();
903 0 : request_timer_ =
904 0 : connection_manager.dispatcher_->createTimer([this]() -> void { onRequestTimeout(); });
905 0 : request_timer_->enableTimer(request_timeout, this);
906 0 : }
907 :
908 671 : if (connection_manager_.config_.requestHeadersTimeout().count()) {
909 0 : std::chrono::milliseconds request_headers_timeout =
910 0 : connection_manager_.config_.requestHeadersTimeout();
911 0 : request_header_timer_ =
912 0 : connection_manager.dispatcher_->createTimer([this]() -> void { onRequestHeaderTimeout(); });
913 0 : request_header_timer_->enableTimer(request_headers_timeout, this);
914 0 : }
915 :
916 671 : const auto max_stream_duration = connection_manager_.config_.maxStreamDuration();
917 671 : if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
918 0 : max_stream_duration_timer_ = connection_manager.dispatcher_->createTimer(
919 0 : [this]() -> void { onStreamMaxDurationReached(); });
920 0 : max_stream_duration_timer_->enableTimer(connection_manager_.config_.maxStreamDuration().value(),
921 0 : this);
922 0 : }
923 :
924 671 : if (connection_manager_.config_.accessLogFlushInterval().has_value()) {
925 0 : access_log_flush_timer_ = connection_manager.dispatcher_->createTimer([this]() -> void {
926 : // If the request is complete, we've already done the stream-end access-log, and shouldn't
927 : // do the periodic log.
928 0 : if (!streamInfo().requestComplete().has_value()) {
929 0 : filter_manager_.log(AccessLog::AccessLogType::DownstreamPeriodic);
930 0 : refreshAccessLogFlushTimer();
931 0 : }
932 0 : const SystemTime now = connection_manager_.timeSource().systemTime();
933 : // Downstream bytes meter is guaranteed to be non-null because ActiveStream and the timer
934 : // event are created on the same thread that sets the meter in
935 : // ConnectionManagerImpl::newStream.
936 0 : filter_manager_.streamInfo().getDownstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(
937 0 : now);
938 0 : if (auto& upstream_bytes_meter = filter_manager_.streamInfo().getUpstreamBytesMeter();
939 0 : upstream_bytes_meter != nullptr) {
940 0 : upstream_bytes_meter->takeDownstreamPeriodicLoggingSnapshot(now);
941 0 : }
942 0 : });
943 0 : refreshAccessLogFlushTimer();
944 0 : }
945 671 : }
946 :
947 671 : void ConnectionManagerImpl::ActiveStream::completeRequest() {
948 671 : filter_manager_.streamInfo().onRequestComplete();
949 :
950 671 : if (connection_manager_.remote_close_) {
951 64 : filter_manager_.streamInfo().setResponseCodeDetails(
952 64 : StreamInfo::ResponseCodeDetails::get().DownstreamRemoteDisconnect);
953 64 : filter_manager_.streamInfo().setResponseFlag(
954 64 : StreamInfo::ResponseFlag::DownstreamConnectionTermination);
955 64 : }
956 671 : connection_manager_.stats_.named_.downstream_rq_active_.dec();
957 671 : if (filter_manager_.streamInfo().healthCheck()) {
958 0 : connection_manager_.config_.tracingStats().health_check_.inc();
959 0 : }
960 :
961 671 : if (active_span_) {
962 0 : Tracing::HttpTracerUtility::finalizeDownstreamSpan(
963 0 : *active_span_, request_headers_.get(), response_headers_.get(), response_trailers_.get(),
964 0 : filter_manager_.streamInfo(), *this);
965 0 : }
966 671 : if (state_.successful_upgrade_) {
967 2 : connection_manager_.stats_.named_.downstream_cx_upgrades_active_.dec();
968 2 : }
969 671 : }
970 :
971 2403 : void ConnectionManagerImpl::ActiveStream::resetIdleTimer() {
972 2403 : if (stream_idle_timer_ != nullptr) {
973 : // TODO(htuch): If this shows up in performance profiles, optimize by only
974 : // updating a timestamp here and doing periodic checks for idle timeouts
975 : // instead, or reducing the accuracy of timers.
976 1825 : stream_idle_timer_->enableTimer(idle_timeout_ms_);
977 1825 : }
978 2403 : }
979 :
980 0 : void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
981 0 : connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();
982 :
983 0 : filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::StreamIdleTimeout);
984 0 : sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
985 0 : "stream timeout", nullptr, absl::nullopt,
986 0 : StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
987 0 : }
988 :
989 0 : void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
990 0 : connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
991 0 : sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
992 0 : "request timeout", nullptr, absl::nullopt,
993 0 : StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
994 0 : }
995 :
996 0 : void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
997 0 : connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
998 0 : sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
999 0 : "request header timeout", nullptr, absl::nullopt,
1000 0 : StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
1001 0 : }
1002 :
1003 0 : void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
1004 0 : ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
1005 0 : connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
1006 0 : sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
1007 0 : "downstream duration timeout", nullptr,
1008 0 : Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
1009 0 : StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
1010 0 : }
1011 :
1012 565 : void ConnectionManagerImpl::ActiveStream::chargeStats(const ResponseHeaderMap& headers) {
1013 565 : uint64_t response_code = Utility::getResponseStatus(headers);
1014 565 : filter_manager_.streamInfo().setResponseCode(response_code);
1015 :
1016 565 : if (filter_manager_.streamInfo().health_check_request_) {
1017 0 : return;
1018 0 : }
1019 :
1020 : // No response is sent back downstream for internal redirects, so don't charge downstream stats.
1021 565 : const absl::optional<std::string>& response_code_details =
1022 565 : filter_manager_.streamInfo().responseCodeDetails();
1023 565 : if (response_code_details.has_value() &&
1024 565 : response_code_details == Envoy::StreamInfo::ResponseCodeDetails::get().InternalRedirect) {
1025 0 : return;
1026 0 : }
1027 :
1028 565 : connection_manager_.stats_.named_.downstream_rq_completed_.inc();
1029 565 : connection_manager_.listener_stats_.downstream_rq_completed_.inc();
1030 565 : if (CodeUtility::is1xx(response_code)) {
1031 4 : connection_manager_.stats_.named_.downstream_rq_1xx_.inc();
1032 4 : connection_manager_.listener_stats_.downstream_rq_1xx_.inc();
1033 561 : } else if (CodeUtility::is2xx(response_code)) {
1034 384 : connection_manager_.stats_.named_.downstream_rq_2xx_.inc();
1035 384 : connection_manager_.listener_stats_.downstream_rq_2xx_.inc();
1036 457 : } else if (CodeUtility::is3xx(response_code)) {
1037 0 : connection_manager_.stats_.named_.downstream_rq_3xx_.inc();
1038 0 : connection_manager_.listener_stats_.downstream_rq_3xx_.inc();
1039 177 : } else if (CodeUtility::is4xx(response_code)) {
1040 116 : connection_manager_.stats_.named_.downstream_rq_4xx_.inc();
1041 116 : connection_manager_.listener_stats_.downstream_rq_4xx_.inc();
1042 159 : } else if (CodeUtility::is5xx(response_code)) {
1043 61 : connection_manager_.stats_.named_.downstream_rq_5xx_.inc();
1044 61 : connection_manager_.listener_stats_.downstream_rq_5xx_.inc();
1045 61 : }
1046 565 : }
1047 :
1048 507 : const Network::Connection* ConnectionManagerImpl::ActiveStream::connection() {
1049 507 : return &connection_manager_.read_callbacks_->connection();
1050 507 : }
1051 :
1052 507 : uint32_t ConnectionManagerImpl::ActiveStream::localPort() {
1053 507 : auto ip = connection()->connectionInfoProvider().localAddress()->ip();
1054 507 : if (ip == nullptr) {
1055 0 : return 0;
1056 0 : }
1057 507 : return ip->port();
1058 507 : }
1059 :
1060 : namespace {
1061 0 : bool streamErrorOnlyErrors(absl::string_view error_details) {
1062 : // Pre UHV HCM did not respect stream_error_on_invalid_http_message
1063 : // and only sent 400 for specific errors.
1064 : // TODO(#28555): make these errors respect the stream_error_on_invalid_http_message
1065 0 : return error_details == UhvResponseCodeDetail::get().FragmentInUrlPath ||
1066 0 : error_details == UhvResponseCodeDetail::get().EscapedSlashesInPath ||
1067 0 : error_details == UhvResponseCodeDetail::get().Percent00InPath;
1068 0 : }
1069 : } // namespace
1070 :
1071 547 : bool ConnectionManagerImpl::ActiveStream::validateHeaders() {
1072 547 : if (header_validator_) {
1073 0 : auto validation_result = header_validator_->validateRequestHeaders(*request_headers_);
1074 0 : bool failure = !validation_result.ok();
1075 0 : bool redirect = false;
1076 0 : bool is_grpc = Grpc::Common::hasGrpcContentType(*request_headers_);
1077 0 : std::string failure_details(validation_result.details());
1078 0 : if (!failure) {
1079 0 : auto transformation_result = header_validator_->transformRequestHeaders(*request_headers_);
1080 0 : failure = !transformation_result.ok();
1081 0 : redirect = transformation_result.action() ==
1082 0 : Http::ServerHeaderValidator::RequestHeadersTransformationResult::Action::Redirect;
1083 0 : failure_details = std::string(transformation_result.details());
1084 0 : if (redirect && !is_grpc) {
1085 0 : connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
1086 0 : } else if (failure) {
1087 0 : connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
1088 0 : }
1089 0 : }
1090 0 : if (failure) {
1091 0 : std::function<void(ResponseHeaderMap & headers)> modify_headers;
1092 0 : Code response_code = failure_details == Http1ResponseCodeDetail::get().InvalidTransferEncoding
1093 0 : ? Code::NotImplemented
1094 0 : : Code::BadRequest;
1095 0 : absl::optional<Grpc::Status::GrpcStatus> grpc_status;
1096 0 : if (redirect && !is_grpc) {
1097 0 : response_code = Code::TemporaryRedirect;
1098 0 : modify_headers = [new_path = request_headers_->Path()->value().getStringView()](
1099 0 : Http::ResponseHeaderMap& response_headers) -> void {
1100 0 : response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
1101 0 : };
1102 0 : } else if (is_grpc) {
1103 0 : grpc_status = Grpc::Status::WellKnownGrpcStatus::Internal;
1104 0 : }
1105 :
1106 : // H/2 codec was resetting requests that were rejected due to headers with underscores,
1107 : // instead of sending 400. Preserving this behavior for now.
1108 : // TODO(#24466): Make H/2 behavior consistent with H/1 and H/3.
1109 0 : if (failure_details == UhvResponseCodeDetail::get().InvalidUnderscore &&
1110 0 : connection_manager_.codec_->protocol() == Protocol::Http2) {
1111 0 : filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
1112 0 : resetStream();
1113 0 : } else {
1114 0 : sendLocalReply(response_code, "", modify_headers, grpc_status, failure_details);
1115 0 : if (!response_encoder_->streamErrorOnInvalidHttpMessage() &&
1116 0 : !streamErrorOnlyErrors(failure_details)) {
1117 0 : connection_manager_.handleCodecError(failure_details);
1118 0 : }
1119 0 : }
1120 0 : return false;
1121 0 : }
1122 0 : }
1123 :
1124 547 : return true;
1125 547 : }
1126 :
1127 6 : bool ConnectionManagerImpl::ActiveStream::validateTrailers() {
1128 6 : if (!header_validator_) {
1129 6 : return true;
1130 6 : }
1131 :
1132 0 : auto validation_result = header_validator_->validateRequestTrailers(*request_trailers_);
1133 0 : std::string failure_details(validation_result.details());
1134 0 : if (validation_result.ok()) {
1135 0 : auto transformation_result = header_validator_->transformRequestTrailers(*request_trailers_);
1136 0 : if (transformation_result.ok()) {
1137 0 : return true;
1138 0 : }
1139 0 : failure_details = std::string(transformation_result.details());
1140 0 : }
1141 :
1142 0 : Code response_code = Code::BadRequest;
1143 0 : absl::optional<Grpc::Status::GrpcStatus> grpc_status;
1144 0 : if (Grpc::Common::hasGrpcContentType(*request_headers_)) {
1145 0 : grpc_status = Grpc::Status::WellKnownGrpcStatus::Internal;
1146 0 : }
1147 :
1148 : // H/2 codec was resetting requests that were rejected due to headers with underscores,
1149 : // instead of sending 400. Preserving this behavior for now.
1150 : // TODO(#24466): Make H/2 behavior consistent with H/1 and H/3.
1151 0 : if (failure_details == UhvResponseCodeDetail::get().InvalidUnderscore &&
1152 0 : connection_manager_.codec_->protocol() == Protocol::Http2) {
1153 0 : filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
1154 0 : resetStream();
1155 0 : } else {
1156 : // TODO(#24735): Harmonize H/2 and H/3 behavior with H/1
1157 0 : if (connection_manager_.codec_->protocol() < Protocol::Http2) {
1158 0 : sendLocalReply(response_code, "", nullptr, grpc_status, failure_details);
1159 0 : } else {
1160 0 : filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
1161 0 : resetStream();
1162 0 : }
1163 0 : if (!response_encoder_->streamErrorOnInvalidHttpMessage()) {
1164 0 : connection_manager_.handleCodecError(failure_details);
1165 0 : }
1166 0 : }
1167 0 : return false;
1168 0 : }
1169 :
1170 725 : void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) {
1171 : // If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
1172 725 : if (end_stream && !filter_manager_.remoteDecodeComplete()) {
1173 388 : filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
1174 388 : connection_manager_.dispatcher_->timeSource());
1175 388 : ENVOY_STREAM_LOG(debug, "request end stream", *this);
1176 388 : }
1177 725 : }
1178 :
1179 : // Ordering in this function is complicated, but important.
1180 : //
1181 : // We want to do minimal work before selecting route and creating a filter
1182 : // chain to maximize the number of requests which get custom filter behavior,
1183 : // e.g. registering access logging.
1184 : //
1185 : // This must be balanced by doing sanity checking for invalid requests (one
1186 : // can't route select properly without full headers), checking state required to
1187 : // serve error responses (connection close, head requests, etc), and
1188 : // modifications which may themselves affect route selection.
1189 : void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPtr&& headers,
1190 547 : bool end_stream) {
1191 547 : ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
1192 547 : *headers);
1193 : // We only want to record this when reading the headers the first time, not when recreating
1194 : // a stream.
1195 547 : if (!filter_manager_.remoteDecodeComplete()) {
1196 547 : filter_manager_.streamInfo().downstreamTiming().onLastDownstreamHeaderRxByteReceived(
1197 547 : connection_manager_.dispatcher_->timeSource());
1198 547 : }
1199 547 : ScopeTrackerScopeState scope(this,
1200 547 : connection_manager_.read_callbacks_->connection().dispatcher());
1201 547 : request_headers_ = std::move(headers);
1202 547 : filter_manager_.requestHeadersInitialized();
1203 547 : if (request_header_timer_ != nullptr) {
1204 0 : request_header_timer_->disableTimer();
1205 0 : request_header_timer_.reset();
1206 0 : }
1207 :
1208 : // Both shouldDrainConnectionUponCompletion() and is_head_request_ affect local replies: set them
1209 : // as early as possible.
1210 547 : const Protocol protocol = connection_manager_.codec_->protocol();
1211 547 : if (Runtime::runtimeFeatureEnabled(
1212 547 : "envoy.reloadable_features.http1_connection_close_header_in_redirect")) {
1213 547 : if (HeaderUtility::shouldCloseConnection(protocol, *request_headers_)) {
1214 : // Only mark the connection to be closed if the request indicates so. The connection might
1215 : // already be marked so before this step, in which case if shouldCloseConnection() returns
1216 : // false, the stream info value shouldn't be overridden.
1217 0 : filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
1218 0 : }
1219 547 : } else {
1220 0 : filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(
1221 0 : HeaderUtility::shouldCloseConnection(protocol, *request_headers_));
1222 0 : }
1223 :
1224 547 : filter_manager_.streamInfo().protocol(protocol);
1225 :
1226 : // We end the decode here to mark that the downstream stream is complete.
1227 547 : maybeEndDecode(end_stream);
1228 :
1229 547 : if (!validateHeaders()) {
1230 0 : ENVOY_STREAM_LOG(debug, "request headers validation failed\n{}", *this, *request_headers_);
1231 0 : return;
1232 0 : }
1233 :
1234 : // We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
1235 547 : if (connection_manager_.config_.isRoutable()) {
1236 449 : if (connection_manager_.config_.routeConfigProvider() != nullptr) {
1237 449 : snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
1238 449 : } else if (connection_manager_.config_.scopedRouteConfigProvider() != nullptr &&
1239 0 : connection_manager_.config_.scopeKeyBuilder().has_value()) {
1240 0 : snapped_scoped_routes_config_ =
1241 0 : connection_manager_.config_.scopedRouteConfigProvider()->config<Router::ScopedConfig>();
1242 0 : snapScopedRouteConfig();
1243 0 : }
1244 486 : } else {
1245 98 : snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
1246 98 : }
1247 :
1248 : // Drop new requests when overloaded as soon as we have decoded the headers.
1249 547 : const bool drop_request_due_to_overload =
1250 547 : (connection_manager_.accept_new_http_stream_ != nullptr &&
1251 547 : connection_manager_.accept_new_http_stream_->shouldShedLoad()) ||
1252 547 : connection_manager_.random_generator_.bernoulli(
1253 547 : connection_manager_.overload_stop_accepting_requests_ref_.value());
1254 :
1255 547 : if (drop_request_due_to_overload) {
1256 : // In this one special case, do not create the filter chain. If there is a risk of memory
1257 : // overload it is more important to avoid unnecessary allocation than to create the filters.
1258 0 : filter_manager_.skipFilterChainCreation();
1259 0 : connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
1260 0 : sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, absl::nullopt,
1261 0 : StreamInfo::ResponseCodeDetails::get().Overload);
1262 0 : return;
1263 0 : }
1264 :
1265 547 : if (!connection_manager_.config_.proxy100Continue() && request_headers_->Expect() &&
1266 : // The Expect field-value is case-insensitive.
1267 : // https://tools.ietf.org/html/rfc7231#section-5.1.1
1268 547 : absl::EqualsIgnoreCase((request_headers_->Expect()->value().getStringView()),
1269 0 : Headers::get().ExpectValues._100Continue)) {
1270 : // Note in the case Envoy is handling 100-Continue complexity, it skips the filter chain
1271 : // and sends the 100-Continue directly to the encoder.
1272 0 : chargeStats(continueHeader());
1273 0 : response_encoder_->encode1xxHeaders(continueHeader());
1274 : // Remove the Expect header so it won't be handled again upstream.
1275 0 : request_headers_->removeExpect();
1276 0 : }
1277 :
1278 547 : connection_manager_.user_agent_.initializeFromHeaders(*request_headers_,
1279 547 : connection_manager_.stats_.prefixStatName(),
1280 547 : connection_manager_.stats_.scope_);
1281 :
1282 547 : if (!request_headers_->Host()) {
1283 : // Require host header. For HTTP/1.1 Host has already been translated to :authority.
1284 6 : sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
1285 6 : StreamInfo::ResponseCodeDetails::get().MissingHost);
1286 6 : return;
1287 6 : }
1288 :
1289 : // Apply header sanity checks.
1290 541 : absl::optional<std::reference_wrapper<const absl::string_view>> error =
1291 541 : HeaderUtility::requestHeadersValid(*request_headers_);
1292 541 : if (error != absl::nullopt) {
1293 2 : sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt, error.value().get());
1294 2 : if (!response_encoder_->streamErrorOnInvalidHttpMessage()) {
1295 2 : connection_manager_.handleCodecError(error.value().get());
1296 2 : }
1297 2 : return;
1298 2 : }
1299 :
1300 : // Check for the existence of the :path header for non-CONNECT requests, or present-but-empty
1301 : // :path header for CONNECT requests. We expect the codec to have broken the path into pieces if
1302 : // applicable. NOTE: Currently the HTTP/1.1 codec only does this when the allow_absolute_url flag
1303 : // is enabled on the HCM.
1304 539 : if ((!HeaderUtility::isConnect(*request_headers_) || request_headers_->Path()) &&
1305 539 : request_headers_->getPathValue().empty()) {
1306 32 : sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
1307 32 : StreamInfo::ResponseCodeDetails::get().MissingPath);
1308 32 : return;
1309 32 : }
1310 :
1311 : // Rewrites the host of CONNECT-UDP requests.
1312 507 : if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_connect_udp_support") &&
1313 507 : HeaderUtility::isConnectUdpRequest(*request_headers_) &&
1314 507 : !HeaderUtility::rewriteAuthorityForConnectUdp(*request_headers_)) {
1315 0 : sendLocalReply(Code::NotFound, "The path is incorrect for CONNECT-UDP", nullptr, absl::nullopt,
1316 0 : StreamInfo::ResponseCodeDetails::get().InvalidPath);
1317 0 : return;
1318 0 : }
1319 :
1320 : // Currently we only support relative paths at the application layer.
1321 507 : if (!request_headers_->getPathValue().empty() && request_headers_->getPathValue()[0] != '/') {
1322 0 : connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
1323 0 : sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
1324 0 : StreamInfo::ResponseCodeDetails::get().AbsolutePath);
1325 0 : return;
1326 0 : }
1327 :
1328 507 : #ifndef ENVOY_ENABLE_UHV
1329 : // In UHV mode path normalization is done in the UHV
1330 : // Path sanitization should happen before any path access other than the above sanity check.
1331 507 : const auto action =
1332 507 : ConnectionManagerUtility::maybeNormalizePath(*request_headers_, connection_manager_.config_);
1333 : // gRPC requests are rejected if Envoy is configured to redirect post-normalization. This is
1334 : // because gRPC clients do not support redirect.
1335 507 : if (action == ConnectionManagerUtility::NormalizePathAction::Reject ||
1336 507 : (action == ConnectionManagerUtility::NormalizePathAction::Redirect &&
1337 507 : Grpc::Common::hasGrpcContentType(*request_headers_))) {
1338 0 : connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
1339 0 : sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
1340 0 : StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
1341 0 : return;
1342 507 : } else if (action == ConnectionManagerUtility::NormalizePathAction::Redirect) {
1343 0 : connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
1344 0 : sendLocalReply(
1345 0 : Code::TemporaryRedirect, "",
1346 0 : [new_path = request_headers_->Path()->value().getStringView()](
1347 0 : Http::ResponseHeaderMap& response_headers) -> void {
1348 0 : response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
1349 0 : },
1350 0 : absl::nullopt, StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
1351 0 : return;
1352 0 : }
1353 :
1354 507 : ASSERT(action == ConnectionManagerUtility::NormalizePathAction::Continue);
1355 507 : #endif
1356 507 : auto optional_port = ConnectionManagerUtility::maybeNormalizeHost(
1357 507 : *request_headers_, connection_manager_.config_, localPort());
1358 507 : if (optional_port.has_value() &&
1359 507 : requestWasConnect(request_headers_, connection_manager_.codec_->protocol())) {
1360 0 : filter_manager_.streamInfo().filterState()->setData(
1361 0 : Router::OriginalConnectPort::key(),
1362 0 : std::make_unique<Router::OriginalConnectPort>(optional_port.value()),
1363 0 : StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Request);
1364 0 : }
1365 :
1366 507 : if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
1367 : // Modify the downstream remote address depending on configuration and headers.
1368 507 : const auto mutate_result = ConnectionManagerUtility::mutateRequestHeaders(
1369 507 : *request_headers_, connection_manager_.read_callbacks_->connection(),
1370 507 : connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_,
1371 507 : filter_manager_.streamInfo());
1372 :
1373 : // IP detection failed, reject the request.
1374 507 : if (mutate_result.reject_request.has_value()) {
1375 0 : const auto& reject_request_params = mutate_result.reject_request.value();
1376 0 : connection_manager_.stats_.named_.downstream_rq_rejected_via_ip_detection_.inc();
1377 0 : sendLocalReply(reject_request_params.response_code, reject_request_params.body, nullptr,
1378 0 : absl::nullopt,
1379 0 : StreamInfo::ResponseCodeDetails::get().OriginalIPDetectionFailed);
1380 0 : return;
1381 0 : }
1382 :
1383 507 : filter_manager_.setDownstreamRemoteAddress(mutate_result.final_remote_address);
1384 507 : }
1385 507 : ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);
1386 :
1387 507 : ASSERT(!cached_route_);
1388 507 : refreshCachedRoute();
1389 :
1390 507 : if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
1391 507 : filter_manager_.streamInfo().setTraceReason(
1392 507 : ConnectionManagerUtility::mutateTracingRequestHeader(
1393 507 : *request_headers_, connection_manager_.runtime_, connection_manager_.config_,
1394 507 : cached_route_.value().get()));
1395 507 : }
1396 :
1397 507 : filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
1398 :
1399 507 : const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
1400 :
1401 507 : if (connection_manager_.config_.flushAccessLogOnNewRequest()) {
1402 0 : filter_manager_.log(AccessLog::AccessLogType::DownstreamStart);
1403 0 : }
1404 :
1405 : // TODO if there are no filters when starting a filter iteration, the connection manager
1406 : // should return 404. The current returns no response if there is no router filter.
1407 507 : if (hasCachedRoute()) {
1408 : // Do not allow upgrades if the route does not support it.
1409 409 : if (upgrade_rejected) {
1410 : // While downstream servers should not send upgrade payload without the upgrade being
1411 : // accepted, err on the side of caution and refuse to process any further requests on this
1412 : // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
1413 : // contains a smuggled HTTP request.
1414 0 : filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
1415 0 : connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
1416 0 : sendLocalReply(Code::Forbidden, "", nullptr, absl::nullopt,
1417 0 : StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
1418 0 : return;
1419 0 : }
1420 : // Allow non websocket requests to go through websocket enabled routes.
1421 409 : }
1422 :
1423 : // Check if tracing is enabled.
1424 507 : if (connection_manager_tracing_config_.has_value()) {
1425 0 : traceRequest();
1426 0 : }
1427 :
1428 507 : if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
1429 507 : filter_manager_.decodeHeaders(*request_headers_, end_stream);
1430 507 : } else {
1431 0 : state_.deferred_to_next_io_iteration_ = true;
1432 0 : state_.deferred_end_stream_ = end_stream;
1433 0 : }
1434 :
1435 : // Reset it here for both global and overridden cases.
1436 507 : resetIdleTimer();
1437 507 : }
1438 :
1439 0 : void ConnectionManagerImpl::ActiveStream::traceRequest() {
1440 0 : const Tracing::Decision tracing_decision =
1441 0 : Tracing::TracerUtility::shouldTraceRequest(filter_manager_.streamInfo());
1442 0 : ConnectionManagerImpl::chargeTracingStats(tracing_decision.reason,
1443 0 : connection_manager_.config_.tracingStats());
1444 :
1445 0 : Tracing::HttpTraceContext trace_context(*request_headers_);
1446 0 : active_span_ = connection_manager_.tracer().startSpan(
1447 0 : *this, trace_context, filter_manager_.streamInfo(), tracing_decision);
1448 :
1449 0 : if (!active_span_) {
1450 0 : return;
1451 0 : }
1452 :
1453 : // TODO: Need to investigate the following code based on the cached route, as may
1454 : // be broken in the case a filter changes the route.
1455 :
1456 : // If a decorator has been defined, apply it to the active span.
1457 0 : if (hasCachedRoute() && cached_route_.value()->decorator()) {
1458 0 : const Router::Decorator* decorator = cached_route_.value()->decorator();
1459 :
1460 0 : decorator->apply(*active_span_);
1461 :
1462 0 : state_.decorated_propagate_ = decorator->propagate();
1463 :
1464 : // Cache decorated operation.
1465 0 : if (!decorator->getOperation().empty()) {
1466 0 : decorated_operation_ = &decorator->getOperation();
1467 0 : }
1468 0 : }
1469 :
1470 0 : if (connection_manager_tracing_config_->operation_name_ == Tracing::OperationName::Egress) {
1471 : // For egress (outbound) requests, pass the decorator's operation name (if defined and
1472 : // propagation enabled) as a request header to enable the receiving service to use it in its
1473 : // server span.
1474 0 : if (decorated_operation_ && state_.decorated_propagate_) {
1475 0 : request_headers_->setEnvoyDecoratorOperation(*decorated_operation_);
1476 0 : }
1477 0 : } else {
1478 0 : const HeaderEntry* req_operation_override = request_headers_->EnvoyDecoratorOperation();
1479 :
1480 : // For ingress (inbound) requests, if a decorator operation name has been provided, it
1481 : // should be used to override the active span's operation.
1482 0 : if (req_operation_override) {
1483 0 : if (!req_operation_override->value().empty()) {
1484 0 : active_span_->setOperation(req_operation_override->value().getStringView());
1485 :
1486 : // Clear the decorated operation so won't be used in the response header, as
1487 : // it has been overridden by the inbound decorator operation request header.
1488 0 : decorated_operation_ = nullptr;
1489 0 : }
1490 : // Remove header so not propagated to service
1491 0 : request_headers_->removeEnvoyDecoratorOperation();
1492 0 : }
1493 0 : }
1494 0 : }
1495 :
1496 172 : void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) {
1497 172 : ScopeTrackerScopeState scope(this,
1498 172 : connection_manager_.read_callbacks_->connection().dispatcher());
1499 172 : maybeEndDecode(end_stream);
1500 172 : filter_manager_.streamInfo().addBytesReceived(data.length());
1501 172 : if (!state_.deferred_to_next_io_iteration_) {
1502 172 : filter_manager_.decodeData(data, end_stream);
1503 172 : } else {
1504 0 : if (!deferred_data_) {
1505 0 : deferred_data_ = std::make_unique<Buffer::OwnedImpl>();
1506 0 : }
1507 0 : deferred_data_->move(data);
1508 0 : state_.deferred_end_stream_ = end_stream;
1509 0 : }
1510 172 : }
1511 :
1512 6 : void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) {
1513 6 : ENVOY_STREAM_LOG(debug, "request trailers complete:\n{}", *this, *trailers);
1514 6 : ScopeTrackerScopeState scope(this,
1515 6 : connection_manager_.read_callbacks_->connection().dispatcher());
1516 6 : resetIdleTimer();
1517 :
1518 6 : ASSERT(!request_trailers_);
1519 6 : request_trailers_ = std::move(trailers);
1520 6 : if (!validateTrailers()) {
1521 0 : ENVOY_STREAM_LOG(debug, "request trailers validation failed:\n{}", *this, *request_trailers_);
1522 0 : return;
1523 0 : }
1524 6 : maybeEndDecode(true);
1525 6 : if (!state_.deferred_to_next_io_iteration_) {
1526 6 : filter_manager_.decodeTrailers(*request_trailers_);
1527 6 : }
1528 6 : }
1529 :
1530 0 : void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
1531 0 : resetIdleTimer();
1532 0 : if (!state_.deferred_to_next_io_iteration_) {
1533 : // After going through filters, the ownership of metadata_map will be passed to terminal filter.
1534 : // The terminal filter may encode metadata_map to the next hop immediately or store metadata_map
1535 : // and encode later when connection pool is ready.
1536 0 : filter_manager_.decodeMetadata(*metadata_map);
1537 0 : } else {
1538 0 : deferred_metadata_.push(std::move(metadata_map));
1539 0 : }
1540 0 : }
1541 :
1542 1728 : void ConnectionManagerImpl::ActiveStream::disarmRequestTimeout() {
1543 1728 : if (request_timer_) {
1544 0 : request_timer_->disableTimer();
1545 0 : }
1546 1728 : }
1547 :
1548 0 : void ConnectionManagerImpl::startDrainSequence() {
1549 0 : ASSERT(drain_state_ == DrainState::NotDraining);
1550 0 : drain_state_ = DrainState::Draining;
1551 0 : codec_->shutdownNotice();
1552 0 : drain_timer_ = dispatcher_->createTimer([this]() -> void { onDrainTimeout(); });
1553 0 : drain_timer_->enableTimer(config_.drainTimeout());
1554 0 : }
1555 :
1556 0 : void ConnectionManagerImpl::ActiveStream::snapScopedRouteConfig() {
1557 : // NOTE: if a RDS subscription hasn't got a RouteConfiguration back, a Router::NullConfigImpl is
1558 : // returned, in that case we let it pass.
1559 0 : auto scope_key =
1560 0 : connection_manager_.config_.scopeKeyBuilder()->computeScopeKey(*request_headers_);
1561 0 : snapped_route_config_ = snapped_scoped_routes_config_->getRouteConfig(scope_key);
1562 0 : if (snapped_route_config_ == nullptr) {
1563 0 : ENVOY_STREAM_LOG(trace, "can't find SRDS scope.", *this);
1564 : // TODO(stevenzzzz): Consider to pass an error message to router filter, so that it can
1565 : // send back 404 with some more details.
1566 0 : snapped_route_config_ = std::make_shared<Router::NullConfigImpl>();
1567 0 : }
1568 0 : }
1569 :
1570 507 : void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { refreshCachedRoute(nullptr); }
1571 :
1572 628 : void ConnectionManagerImpl::ActiveStream::refreshDurationTimeout() {
1573 628 : if (!filter_manager_.streamInfo().route() ||
1574 628 : !filter_manager_.streamInfo().route()->routeEntry() || !request_headers_) {
1575 387 : return;
1576 387 : }
1577 241 : const auto& route = filter_manager_.streamInfo().route()->routeEntry();
1578 :
1579 241 : auto grpc_timeout = Grpc::Common::getGrpcTimeout(*request_headers_);
1580 241 : std::chrono::milliseconds timeout;
1581 241 : bool disable_timer = false;
1582 :
1583 241 : if (!grpc_timeout || !route->grpcTimeoutHeaderMax()) {
1584 : // Either there is no grpc-timeout header or special timeouts for it are not
1585 : // configured. Use stream duration.
1586 241 : if (route->maxStreamDuration()) {
1587 0 : timeout = route->maxStreamDuration().value();
1588 0 : if (timeout == std::chrono::milliseconds(0)) {
1589 : // Explicitly configured 0 means no timeout.
1590 0 : disable_timer = true;
1591 0 : }
1592 241 : } else {
1593 : // Fall back to HCM config. If no HCM duration limit exists, disable
1594 : // timers set by any prior route configuration.
1595 241 : const auto max_stream_duration = connection_manager_.config_.maxStreamDuration();
1596 241 : if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
1597 0 : timeout = max_stream_duration.value();
1598 241 : } else {
1599 241 : disable_timer = true;
1600 241 : }
1601 241 : }
1602 241 : } else {
1603 : // Start with the timeout equal to the gRPC timeout header.
1604 0 : timeout = grpc_timeout.value();
1605 : // If there's a valid cap, apply it.
1606 0 : if (timeout > route->grpcTimeoutHeaderMax().value() &&
1607 0 : route->grpcTimeoutHeaderMax().value() != std::chrono::milliseconds(0)) {
1608 0 : timeout = route->grpcTimeoutHeaderMax().value();
1609 0 : }
1610 :
1611 : // Apply the configured offset.
1612 0 : if (timeout != std::chrono::milliseconds(0) && route->grpcTimeoutHeaderOffset()) {
1613 0 : const auto offset = route->grpcTimeoutHeaderOffset().value();
1614 0 : if (offset < timeout) {
1615 0 : timeout -= offset;
1616 0 : } else {
1617 0 : timeout = std::chrono::milliseconds(0);
1618 0 : }
1619 0 : }
1620 0 : }
1621 :
1622 : // Disable any existing timer if configured to do so.
1623 241 : if (disable_timer) {
1624 241 : if (max_stream_duration_timer_) {
1625 0 : max_stream_duration_timer_->disableTimer();
1626 0 : if (route->usingNewTimeouts() && Grpc::Common::isGrpcRequestHeaders(*request_headers_)) {
1627 0 : request_headers_->removeGrpcTimeout();
1628 0 : }
1629 0 : }
1630 241 : return;
1631 241 : }
1632 :
1633 : // Set the header timeout before doing used-time adjustments.
1634 : // This may result in the upstream not getting the latest results, but also
1635 : // avoids every request getting a custom timeout based on envoy think time.
1636 0 : if (route->usingNewTimeouts() && Grpc::Common::isGrpcRequestHeaders(*request_headers_)) {
1637 0 : Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(timeout), *request_headers_);
1638 0 : }
1639 :
1640 : // See how long this stream has been alive, and adjust the timeout
1641 : // accordingly.
1642 0 : std::chrono::duration time_used = std::chrono::duration_cast<std::chrono::milliseconds>(
1643 0 : connection_manager_.timeSource().monotonicTime() -
1644 0 : filter_manager_.streamInfo().startTimeMonotonic());
1645 0 : if (timeout > time_used) {
1646 0 : timeout -= time_used;
1647 0 : } else {
1648 0 : timeout = std::chrono::milliseconds(0);
1649 0 : }
1650 :
1651 : // Finally create (if necessary) and enable the timer.
1652 0 : if (!max_stream_duration_timer_) {
1653 0 : max_stream_duration_timer_ = connection_manager_.dispatcher_->createTimer(
1654 0 : [this]() -> void { onStreamMaxDurationReached(); });
1655 0 : }
1656 0 : max_stream_duration_timer_->enableTimer(timeout);
1657 0 : }
1658 :
1659 628 : void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
1660 : // If the cached route is blocked then any attempt to clear it or refresh it
1661 : // will be ignored.
1662 628 : if (routeCacheBlocked()) {
1663 0 : return;
1664 0 : }
1665 :
1666 628 : Router::RouteConstSharedPtr route;
1667 628 : if (request_headers_ != nullptr) {
1668 547 : if (connection_manager_.config_.isRoutable() &&
1669 547 : connection_manager_.config_.scopedRouteConfigProvider() != nullptr &&
1670 547 : connection_manager_.config_.scopeKeyBuilder().has_value()) {
1671 : // NOTE: re-select scope as well in case the scope key header has been changed by a filter.
1672 0 : snapScopedRouteConfig();
1673 0 : }
1674 547 : if (snapped_route_config_ != nullptr) {
1675 547 : route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
1676 547 : stream_id_);
1677 547 : }
1678 547 : }
1679 :
1680 628 : setRoute(route);
1681 628 : }
1682 :
1683 628 : void ConnectionManagerImpl::ActiveStream::refreshCachedTracingCustomTags() {
1684 628 : if (!connection_manager_tracing_config_.has_value()) {
1685 628 : return;
1686 628 : }
1687 0 : const Tracing::CustomTagMap& conn_manager_tags = connection_manager_tracing_config_->custom_tags_;
1688 0 : const Tracing::CustomTagMap* route_tags = nullptr;
1689 0 : if (hasCachedRoute() && cached_route_.value()->tracingConfig()) {
1690 0 : route_tags = &cached_route_.value()->tracingConfig()->getCustomTags();
1691 0 : }
1692 0 : const bool configured_in_conn = !conn_manager_tags.empty();
1693 0 : const bool configured_in_route = route_tags && !route_tags->empty();
1694 0 : if (!configured_in_conn && !configured_in_route) {
1695 0 : return;
1696 0 : }
1697 0 : Tracing::CustomTagMap& custom_tag_map = getOrMakeTracingCustomTagMap();
1698 0 : if (configured_in_route) {
1699 0 : custom_tag_map.insert(route_tags->begin(), route_tags->end());
1700 0 : }
1701 0 : if (configured_in_conn) {
1702 0 : custom_tag_map.insert(conn_manager_tags.begin(), conn_manager_tags.end());
1703 0 : }
1704 0 : }
1705 :
1706 : // TODO(chaoqin-li1123): Make on demand vhds and on demand srds works at the same time.
1707 : void ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate(
1708 0 : Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
1709 0 : route_config_update_requester_->requestRouteConfigUpdate(route_config_updated_cb);
1710 0 : }
1711 :
1712 0 : absl::optional<Router::ConfigConstSharedPtr> ConnectionManagerImpl::ActiveStream::routeConfig() {
1713 0 : if (connection_manager_.config_.routeConfigProvider() != nullptr) {
1714 0 : return {connection_manager_.config_.routeConfigProvider()->configCast()};
1715 0 : }
1716 0 : return {};
1717 0 : }
1718 :
1719 383 : void ConnectionManagerImpl::ActiveStream::onLocalReply(Code code) {
1720 : // The BadRequest error code indicates there has been a messaging error.
1721 383 : if (code == Http::Code::BadRequest && connection_manager_.codec_->protocol() < Protocol::Http2 &&
1722 383 : !response_encoder_->streamErrorOnInvalidHttpMessage()) {
1723 67 : filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
1724 67 : }
1725 383 : }
1726 :
1727 3 : void ConnectionManagerImpl::ActiveStream::encode1xxHeaders(ResponseHeaderMap& response_headers) {
1728 : // Strip the T-E headers etc. Defer other header additions as well as drain-close logic to the
1729 : // continuation headers.
1730 3 : ConnectionManagerUtility::mutateResponseHeaders(
1731 3 : response_headers, request_headers_.get(), connection_manager_.config_, EMPTY_STRING,
1732 3 : filter_manager_.streamInfo(), connection_manager_.proxy_name_,
1733 3 : connection_manager_.clear_hop_by_hop_response_headers_);
1734 :
1735 : // Count both the 1xx and follow-up response code in stats.
1736 3 : chargeStats(response_headers);
1737 :
1738 3 : ENVOY_STREAM_LOG(debug, "encoding 100 continue headers via codec:\n{}", *this, response_headers);
1739 :
1740 : // Now actually encode via the codec.
1741 3 : response_encoder_->encode1xxHeaders(response_headers);
1742 3 : }
1743 :
1744 : void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& headers,
1745 562 : bool end_stream) {
1746 : // Base headers.
1747 :
1748 : // We want to preserve the original date header, but we add a date header if it is absent
1749 562 : if (!headers.Date()) {
1750 562 : connection_manager_.config_.dateProvider().setDateHeader(headers);
1751 562 : }
1752 :
1753 : // Following setReference() is safe because serverName() is constant for the life of the
1754 : // listener.
1755 562 : const auto transformation = connection_manager_.config_.serverHeaderTransformation();
1756 562 : if (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::OVERWRITE ||
1757 562 : (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::APPEND_IF_ABSENT &&
1758 562 : headers.Server() == nullptr)) {
1759 562 : headers.setReferenceServer(connection_manager_.config_.serverName());
1760 562 : }
1761 562 : ConnectionManagerUtility::mutateResponseHeaders(
1762 562 : headers, request_headers_.get(), connection_manager_.config_,
1763 562 : connection_manager_.config_.via(), filter_manager_.streamInfo(),
1764 562 : connection_manager_.proxy_name_, connection_manager_.clear_hop_by_hop_response_headers_);
1765 :
1766 562 : bool drain_connection_due_to_overload = false;
1767 562 : if (connection_manager_.drain_state_ == DrainState::NotDraining &&
1768 562 : connection_manager_.random_generator_.bernoulli(
1769 547 : connection_manager_.overload_disable_keepalive_ref_.value())) {
1770 0 : ENVOY_STREAM_LOG(debug, "disabling keepalive due to envoy overload", *this);
1771 0 : drain_connection_due_to_overload = true;
1772 0 : connection_manager_.stats_.named_.downstream_cx_overload_disable_keepalive_.inc();
1773 0 : }
1774 :
1775 : // See if we want to drain/close the connection. Send the go away frame prior to encoding the
1776 : // header block.
1777 562 : if (connection_manager_.drain_state_ == DrainState::NotDraining &&
1778 562 : (connection_manager_.drain_close_.drainClose() || drain_connection_due_to_overload)) {
1779 :
1780 : // This doesn't really do anything for HTTP/1.1 other then give the connection another boost
1781 : // of time to race with incoming requests. For HTTP/2 connections, send a GOAWAY frame to
1782 : // prevent any new streams.
1783 0 : connection_manager_.startDrainSequence();
1784 0 : connection_manager_.stats_.named_.downstream_cx_drain_close_.inc();
1785 0 : ENVOY_STREAM_LOG(debug, "drain closing connection", *this);
1786 0 : }
1787 :
1788 562 : if (connection_manager_.codec_->protocol() == Protocol::Http10) {
1789 : // As HTTP/1.0 and below can not do chunked encoding, if there is no content
1790 : // length the response will be framed by connection close.
1791 19 : if (!headers.ContentLength()) {
1792 0 : filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
1793 0 : }
1794 : // If the request came with a keep-alive and no other factor resulted in a
1795 : // connection close header, send an explicit keep-alive header.
1796 19 : if (!filter_manager_.streamInfo().shouldDrainConnectionUponCompletion()) {
1797 19 : headers.setConnection(Headers::get().ConnectionValues.KeepAlive);
1798 19 : }
1799 19 : }
1800 :
1801 562 : if (connection_manager_.drain_state_ == DrainState::NotDraining &&
1802 562 : filter_manager_.streamInfo().shouldDrainConnectionUponCompletion()) {
1803 67 : ENVOY_STREAM_LOG(debug, "closing connection due to connection close header", *this);
1804 67 : connection_manager_.drain_state_ = DrainState::Closing;
1805 67 : }
1806 :
1807 : // If we are destroying a stream before remote is complete and the connection does not support
1808 : // multiplexing, we should disconnect since we don't want to wait around for the request to
1809 : // finish.
1810 562 : if (!filter_manager_.remoteDecodeComplete()) {
1811 234 : if (connection_manager_.codec_->protocol() < Protocol::Http2) {
1812 232 : connection_manager_.drain_state_ = DrainState::Closing;
1813 232 : }
1814 :
1815 234 : connection_manager_.stats_.named_.downstream_rq_response_before_rq_complete_.inc();
1816 234 : }
1817 :
1818 562 : if (Utility::isUpgrade(headers) ||
1819 562 : HeaderUtility::isConnectResponse(request_headers_.get(), *responseHeaders())) {
1820 1 : state_.is_tunneling_ = true;
1821 1 : }
1822 :
1823 : // Block route cache if the response headers is received and processed. Because after this
1824 : // point, the cached route should never be updated or refreshed.
1825 562 : blockRouteCache();
1826 :
1827 562 : if (connection_manager_.drain_state_ != DrainState::NotDraining &&
1828 562 : connection_manager_.codec_->protocol() < Protocol::Http2) {
1829 : // If the connection manager is draining send "Connection: Close" on HTTP/1.1 connections.
1830 : // Do not do this for H2 (which drains via GOAWAY) or Upgrade or CONNECT (as the
1831 : // payload is no longer HTTP/1.1)
1832 241 : if (!state_.is_tunneling_) {
1833 240 : headers.setReferenceConnection(Headers::get().ConnectionValues.Close);
1834 240 : }
1835 241 : }
1836 :
1837 562 : if (connection_manager_tracing_config_.has_value()) {
1838 0 : if (connection_manager_tracing_config_->operation_name_ == Tracing::OperationName::Ingress) {
1839 : // For ingress (inbound) responses, if the request headers do not include a
1840 : // decorator operation (override), and the decorated operation should be
1841 : // propagated, then pass the decorator's operation name (if defined)
1842 : // as a response header to enable the client service to use it in its client span.
1843 0 : if (decorated_operation_ && state_.decorated_propagate_) {
1844 0 : headers.setEnvoyDecoratorOperation(*decorated_operation_);
1845 0 : }
1846 0 : } else if (connection_manager_tracing_config_->operation_name_ ==
1847 0 : Tracing::OperationName::Egress) {
1848 0 : const HeaderEntry* resp_operation_override = headers.EnvoyDecoratorOperation();
1849 :
1850 : // For Egress (outbound) response, if a decorator operation name has been provided, it
1851 : // should be used to override the active span's operation.
1852 0 : if (resp_operation_override) {
1853 0 : if (!resp_operation_override->value().empty() && active_span_) {
1854 0 : active_span_->setOperation(resp_operation_override->value().getStringView());
1855 0 : }
1856 : // Remove header so not propagated to service.
1857 0 : headers.removeEnvoyDecoratorOperation();
1858 0 : }
1859 0 : }
1860 0 : }
1861 :
1862 562 : chargeStats(headers);
1863 :
1864 562 : if (state_.is_tunneling_ &&
1865 562 : connection_manager_.config_.flushAccessLogOnTunnelSuccessfullyEstablished()) {
1866 0 : filter_manager_.log(AccessLog::AccessLogType::DownstreamTunnelSuccessfullyEstablished);
1867 0 : }
1868 562 : ENVOY_STREAM_LOG(debug, "encoding headers via codec (end_stream={}):\n{}", *this, end_stream,
1869 562 : headers);
1870 :
1871 562 : filter_manager_.streamInfo().downstreamTiming().onFirstDownstreamTxByteSent(
1872 562 : connection_manager_.time_source_);
1873 :
1874 562 : if (header_validator_) {
1875 0 : auto result = header_validator_->transformResponseHeaders(headers);
1876 0 : if (!result.status.ok()) {
1877 : // It is possible that the header map is invalid if an encoder filter makes invalid
1878 : // modifications
1879 : // TODO(yanavlasov): add handling for this case.
1880 0 : } else if (result.new_headers) {
1881 0 : response_encoder_->encodeHeaders(*result.new_headers, end_stream);
1882 0 : return;
1883 0 : }
1884 0 : }
1885 :
1886 : // Now actually encode via the codec.
1887 562 : response_encoder_->encodeHeaders(headers, end_stream);
1888 562 : }
1889 :
1890 582 : void ConnectionManagerImpl::ActiveStream::encodeData(Buffer::Instance& data, bool end_stream) {
1891 582 : ENVOY_STREAM_LOG(trace, "encoding data via codec (size={} end_stream={})", *this, data.length(),
1892 582 : end_stream);
1893 :
1894 582 : filter_manager_.streamInfo().addBytesSent(data.length());
1895 582 : response_encoder_->encodeData(data, end_stream);
1896 582 : }
1897 :
1898 1 : void ConnectionManagerImpl::ActiveStream::encodeTrailers(ResponseTrailerMap& trailers) {
1899 1 : ENVOY_STREAM_LOG(debug, "encoding trailers via codec:\n{}", *this, trailers);
1900 :
1901 1 : response_encoder_->encodeTrailers(trailers);
1902 1 : }
1903 :
1904 2 : void ConnectionManagerImpl::ActiveStream::encodeMetadata(MetadataMapPtr&& metadata) {
1905 2 : MetadataMapVector metadata_map_vector;
1906 2 : metadata_map_vector.emplace_back(std::move(metadata));
1907 2 : ENVOY_STREAM_LOG(debug, "encoding metadata via codec:\n{}", *this, metadata_map_vector);
1908 2 : response_encoder_->encodeMetadata(metadata_map_vector);
1909 2 : }
1910 :
1911 17 : void ConnectionManagerImpl::ActiveStream::onDecoderFilterBelowWriteBufferLowWatermark() {
1912 17 : ENVOY_STREAM_LOG(debug, "Read-enabling downstream stream due to filter callbacks.", *this);
1913 : // If the state is destroyed, the codec's stream is already torn down. On
1914 : // teardown the codec will unwind any remaining read disable calls.
1915 17 : if (!filter_manager_.destroyed()) {
1916 17 : response_encoder_->getStream().readDisable(false);
1917 17 : }
1918 17 : connection_manager_.stats_.named_.downstream_flow_control_resumed_reading_total_.inc();
1919 17 : }
1920 :
1921 19 : void ConnectionManagerImpl::ActiveStream::onDecoderFilterAboveWriteBufferHighWatermark() {
1922 19 : ENVOY_STREAM_LOG(debug, "Read-disabling downstream stream due to filter callbacks.", *this);
1923 19 : response_encoder_->getStream().readDisable(true);
1924 19 : connection_manager_.stats_.named_.downstream_flow_control_paused_reading_total_.inc();
1925 19 : }
1926 :
1927 : void ConnectionManagerImpl::ActiveStream::onResetStream(StreamResetReason reset_reason,
1928 366 : absl::string_view) {
1929 : // NOTE: This function gets called in all of the following cases:
1930 : // 1) We TX an app level reset
1931 : // 2) The codec TX a codec level reset
1932 : // 3) The codec RX a reset
1933 : // 4) The overload manager reset the stream
1934 : // If we need to differentiate we need to do it inside the codec. Can start with this.
1935 366 : const absl::string_view encoder_details = response_encoder_->getStream().responseDetails();
1936 366 : ENVOY_STREAM_LOG(debug, "stream reset: reset reason: {}, response details: {}", *this,
1937 366 : Http::Utility::resetReasonToString(reset_reason),
1938 366 : encoder_details.empty() ? absl::string_view{"-"} : encoder_details);
1939 366 : connection_manager_.stats_.named_.downstream_rq_rx_reset_.inc();
1940 366 : state_.on_reset_stream_called_ = true;
1941 :
1942 : // If the codec sets its responseDetails() for a reason other than peer reset, set a
1943 : // DownstreamProtocolError. Either way, propagate details.
1944 366 : if (!encoder_details.empty() && reset_reason == StreamResetReason::LocalReset) {
1945 81 : filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DownstreamProtocolError);
1946 81 : }
1947 366 : if (!encoder_details.empty()) {
1948 103 : filter_manager_.streamInfo().setResponseCodeDetails(encoder_details);
1949 103 : }
1950 :
1951 : // Check if we're in the overload manager reset case.
1952 : // encoder_details should be empty in this case as we don't have a codec error.
1953 366 : if (encoder_details.empty() && reset_reason == StreamResetReason::OverloadManager) {
1954 0 : filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::OverloadManager);
1955 0 : filter_manager_.streamInfo().setResponseCodeDetails(
1956 0 : StreamInfo::ResponseCodeDetails::get().Overload);
1957 0 : }
1958 366 : filter_manager_.onDownstreamReset();
1959 366 : connection_manager_.doDeferredStreamDestroy(*this);
1960 366 : }
1961 :
1962 0 : void ConnectionManagerImpl::ActiveStream::onAboveWriteBufferHighWatermark() {
1963 0 : ENVOY_STREAM_LOG(debug, "Disabling upstream stream due to downstream stream watermark.", *this);
1964 0 : filter_manager_.callHighWatermarkCallbacks();
1965 0 : }
1966 :
1967 0 : void ConnectionManagerImpl::ActiveStream::onBelowWriteBufferLowWatermark() {
1968 0 : ENVOY_STREAM_LOG(debug, "Enabling upstream stream due to downstream stream watermark.", *this);
1969 0 : filter_manager_.callLowWatermarkCallbacks();
1970 0 : }
1971 :
1972 505 : void ConnectionManagerImpl::ActiveStream::onCodecEncodeComplete() {
1973 505 : ASSERT(!state_.codec_encode_complete_);
1974 505 : ENVOY_STREAM_LOG(debug, "Codec completed encoding stream.", *this);
1975 505 : state_.codec_encode_complete_ = true;
1976 :
1977 : // Update timing
1978 505 : filter_manager_.streamInfo().downstreamTiming().onLastDownstreamTxByteSent(
1979 505 : connection_manager_.time_source_);
1980 505 : request_response_timespan_->complete();
1981 :
1982 : // Only reap stream once.
1983 505 : if (state_.is_zombie_stream_) {
1984 70 : connection_manager_.doDeferredStreamDestroy(*this);
1985 70 : }
1986 505 : }
1987 :
1988 0 : void ConnectionManagerImpl::ActiveStream::onCodecLowLevelReset() {
1989 0 : ASSERT(!state_.codec_encode_complete_);
1990 0 : state_.on_reset_stream_called_ = true;
1991 0 : ENVOY_STREAM_LOG(debug, "Codec timed out flushing stream", *this);
1992 :
1993 : // TODO(kbaichoo): Update streamInfo to account for the reset.
1994 :
1995 : // Only reap stream once.
1996 0 : if (state_.is_zombie_stream_) {
1997 0 : connection_manager_.doDeferredStreamDestroy(*this);
1998 0 : }
1999 0 : }
2000 :
2001 0 : Tracing::OperationName ConnectionManagerImpl::ActiveStream::operationName() const {
2002 0 : ASSERT(connection_manager_tracing_config_.has_value());
2003 0 : return connection_manager_tracing_config_->operation_name_;
2004 0 : }
2005 :
2006 0 : const Tracing::CustomTagMap* ConnectionManagerImpl::ActiveStream::customTags() const {
2007 0 : return tracing_custom_tags_.get();
2008 0 : }
2009 :
2010 0 : bool ConnectionManagerImpl::ActiveStream::verbose() const {
2011 0 : ASSERT(connection_manager_tracing_config_.has_value());
2012 0 : return connection_manager_tracing_config_->verbose_;
2013 0 : }
2014 :
2015 0 : uint32_t ConnectionManagerImpl::ActiveStream::maxPathTagLength() const {
2016 0 : ASSERT(connection_manager_tracing_config_.has_value());
2017 0 : return connection_manager_tracing_config_->max_path_tag_length_;
2018 0 : }
2019 :
2020 0 : bool ConnectionManagerImpl::ActiveStream::spawnUpstreamSpan() const {
2021 0 : ASSERT(connection_manager_tracing_config_.has_value());
2022 0 : return connection_manager_tracing_config_->spawn_upstream_span_;
2023 0 : }
2024 :
2025 2 : const Router::RouteEntry::UpgradeMap* ConnectionManagerImpl::ActiveStream::upgradeMap() {
2026 : // We must check if the 'cached_route_' optional is populated since this function can be called
2027 : // early via sendLocalReply(), before the cached route is populated.
2028 2 : if (hasCachedRoute() && cached_route_.value()->routeEntry()) {
2029 2 : return &cached_route_.value()->routeEntry()->upgradeMap();
2030 2 : }
2031 :
2032 0 : return nullptr;
2033 2 : }
2034 :
2035 183 : Tracing::Span& ConnectionManagerImpl::ActiveStream::activeSpan() {
2036 183 : if (active_span_) {
2037 0 : return *active_span_;
2038 183 : } else {
2039 183 : return Tracing::NullSpan::instance();
2040 183 : }
2041 183 : }
2042 :
2043 183 : OptRef<const Tracing::Config> ConnectionManagerImpl::ActiveStream::tracingConfig() const {
2044 183 : if (connection_manager_tracing_config_.has_value()) {
2045 0 : return makeOptRef<const Tracing::Config>(*this);
2046 0 : }
2047 183 : return {};
2048 183 : }
2049 :
2050 407 : const ScopeTrackedObject& ConnectionManagerImpl::ActiveStream::scope() { return *this; }
2051 :
2052 0 : Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStream::clusterInfo() {
2053 : // NOTE: Refreshing route caches clusterInfo as well.
2054 0 : if (!cached_route_.has_value()) {
2055 0 : refreshCachedRoute();
2056 0 : }
2057 :
2058 0 : return cached_cluster_info_.value();
2059 0 : }
2060 :
2061 : Router::RouteConstSharedPtr
2062 1125 : ConnectionManagerImpl::ActiveStream::route(const Router::RouteCallback& cb) {
2063 1125 : if (cached_route_.has_value()) {
2064 1004 : return cached_route_.value();
2065 1004 : }
2066 121 : refreshCachedRoute(cb);
2067 121 : return cached_route_.value();
2068 1125 : }
2069 :
2070 : /**
2071 : * Sets the cached route to the RouteConstSharedPtr argument passed in. Handles setting the
2072 : * cached_route_/cached_cluster_info_ ActiveStream attributes, the FilterManager streamInfo, tracing
2073 : * tags, and timeouts.
2074 : *
2075 : * Declared as a StreamFilterCallbacks member function for filters to call directly, but also
2076 : * functions as a helper to refreshCachedRoute(const Router::RouteCallback& cb).
2077 : */
2078 628 : void ConnectionManagerImpl::ActiveStream::setRoute(Router::RouteConstSharedPtr route) {
2079 : // If the cached route is blocked then any attempt to clear it or refresh it
2080 : // will be ignored.
2081 : // setRoute() may be called directly by the interface of DownstreamStreamFilterCallbacks,
2082 : // so check for routeCacheBlocked() here again.
2083 628 : if (routeCacheBlocked()) {
2084 0 : return;
2085 0 : }
2086 :
2087 : // Update the cached route.
2088 628 : setCachedRoute({route});
2089 : // Update the cached cluster info based on the new route.
2090 628 : if (nullptr == route || nullptr == route->routeEntry()) {
2091 387 : cached_cluster_info_ = nullptr;
2092 575 : } else {
2093 241 : auto* cluster = connection_manager_.cluster_manager_.getThreadLocalCluster(
2094 241 : route->routeEntry()->clusterName());
2095 241 : cached_cluster_info_ = (nullptr == cluster) ? nullptr : cluster->info();
2096 241 : }
2097 :
2098 : // Update route and cluster info in the filter manager's stream info.
2099 628 : filter_manager_.streamInfo().route_ = std::move(route); // Now can move route here safely.
2100 628 : filter_manager_.streamInfo().setUpstreamClusterInfo(cached_cluster_info_.value());
2101 :
2102 628 : refreshCachedTracingCustomTags();
2103 628 : refreshDurationTimeout();
2104 628 : refreshIdleTimeout();
2105 628 : }
2106 :
2107 628 : void ConnectionManagerImpl::ActiveStream::refreshIdleTimeout() {
2108 628 : if (hasCachedRoute()) {
2109 443 : const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
2110 443 : if (route_entry != nullptr && route_entry->idleTimeout()) {
2111 0 : idle_timeout_ms_ = route_entry->idleTimeout().value();
2112 0 : response_encoder_->getStream().setFlushTimeout(idle_timeout_ms_);
2113 0 : if (idle_timeout_ms_.count()) {
2114 : // If we have a route-level idle timeout but no global stream idle timeout, create a timer.
2115 0 : if (stream_idle_timer_ == nullptr) {
2116 0 : stream_idle_timer_ = connection_manager_.dispatcher_->createScaledTimer(
2117 0 : Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
2118 0 : [this]() -> void { onIdleTimeout(); });
2119 0 : }
2120 0 : } else if (stream_idle_timer_ != nullptr) {
2121 : // If we had a global stream idle timeout but the route-level idle timeout is set to zero
2122 : // (to override), we disable the idle timer.
2123 0 : stream_idle_timer_->disableTimer();
2124 0 : stream_idle_timer_ = nullptr;
2125 0 : }
2126 0 : }
2127 443 : }
2128 628 : }
2129 :
2130 0 : void ConnectionManagerImpl::ActiveStream::refreshAccessLogFlushTimer() {
2131 0 : if (connection_manager_.config_.accessLogFlushInterval().has_value()) {
2132 0 : access_log_flush_timer_->enableTimer(
2133 0 : connection_manager_.config_.accessLogFlushInterval().value(), this);
2134 0 : }
2135 0 : }
2136 :
2137 0 : void ConnectionManagerImpl::ActiveStream::clearRouteCache() {
2138 : // If the cached route is blocked then any attempt to clear it or refresh it
2139 : // will be ignored.
2140 0 : if (routeCacheBlocked()) {
2141 0 : return;
2142 0 : }
2143 :
2144 0 : setCachedRoute({});
2145 :
2146 0 : cached_cluster_info_ = absl::optional<Upstream::ClusterInfoConstSharedPtr>();
2147 0 : if (tracing_custom_tags_) {
2148 0 : tracing_custom_tags_->clear();
2149 0 : }
2150 0 : }
2151 :
2152 : void ConnectionManagerImpl::ActiveStream::setCachedRoute(
2153 628 : absl::optional<Router::RouteConstSharedPtr>&& route) {
2154 628 : if (hasCachedRoute()) {
2155 : // The configuration of the route may be referenced by some filters.
2156 : // Cache the route to avoid it being destroyed before the stream is destroyed.
2157 0 : cleared_cached_routes_.emplace_back(std::move(cached_route_.value()));
2158 0 : }
2159 628 : cached_route_ = std::move(route);
2160 628 : }
2161 :
2162 562 : void ConnectionManagerImpl::ActiveStream::blockRouteCache() {
2163 562 : route_cache_blocked_ = true;
2164 : // Clear the snapped route configuration because it is unnecessary to keep it.
2165 562 : snapped_route_config_.reset();
2166 562 : snapped_scoped_routes_config_.reset();
2167 562 : }
2168 :
2169 0 : void ConnectionManagerImpl::ActiveStream::onRequestDataTooLarge() {
2170 0 : connection_manager_.stats_.named_.downstream_rq_too_large_.inc();
2171 0 : }
2172 :
2173 : void ConnectionManagerImpl::ActiveStream::recreateStream(
2174 0 : StreamInfo::FilterStateSharedPtr filter_state) {
2175 0 : ResponseEncoder* response_encoder = response_encoder_;
2176 0 : response_encoder_ = nullptr;
2177 :
2178 0 : Buffer::InstancePtr request_data = std::make_unique<Buffer::OwnedImpl>();
2179 0 : const auto& buffered_request_data = filter_manager_.bufferedRequestData();
2180 0 : const bool proxy_body = buffered_request_data != nullptr && buffered_request_data->length() > 0;
2181 0 : if (proxy_body) {
2182 0 : request_data->move(*buffered_request_data);
2183 0 : }
2184 :
2185 0 : response_encoder->getStream().removeCallbacks(*this);
2186 :
2187 : // This functionally deletes the stream (via deferred delete) so do not
2188 : // reference anything beyond this point.
2189 : // Make sure to not check for deferred close as we'll be immediately creating a new stream.
2190 0 : state_.is_internally_destroyed_ = true;
2191 0 : connection_manager_.doEndStream(*this, /*check_for_deferred_close*/ false);
2192 :
2193 0 : RequestDecoder& new_stream = connection_manager_.newStream(*response_encoder, true);
2194 :
2195 : // Set the new RequestDecoder on the ResponseEncoder. Even though all of the decoder callbacks
2196 : // have already been called at this point, the encoder still needs the new decoder for deferred
2197 : // logging in some cases.
2198 : // This doesn't currently work for HTTP/1 as the H/1 ResponseEncoder doesn't hold the active
2199 : // stream's pointer to the RequestDecoder.
2200 0 : response_encoder->setRequestDecoder(new_stream);
2201 : // We don't need to copy over the old parent FilterState from the old StreamInfo if it did not
2202 : // store any objects with a LifeSpan at or above DownstreamRequest. This is to avoid unnecessary
2203 : // heap allocation.
2204 : // TODO(snowp): In the case where connection level filter state has been set on the connection
2205 : // FilterState that we inherit, we'll end up copying this every time even though we could get
2206 : // away with just resetting it to the HCM filter_state_.
2207 0 : if (filter_state->hasDataAtOrAboveLifeSpan(StreamInfo::FilterState::LifeSpan::Request)) {
2208 0 : (*connection_manager_.streams_.begin())->filter_manager_.streamInfo().filter_state_ =
2209 0 : std::make_shared<StreamInfo::FilterStateImpl>(
2210 0 : filter_state->parent(), StreamInfo::FilterState::LifeSpan::FilterChain);
2211 0 : }
2212 :
2213 : // Make sure that relevant information makes it from the original stream info
2214 : // to the new one. Generally this should consist of all downstream related
2215 : // data, and not include upstream related data.
2216 0 : (*connection_manager_.streams_.begin())
2217 0 : ->filter_manager_.streamInfo()
2218 0 : .setFromForRecreateStream(filter_manager_.streamInfo());
2219 0 : new_stream.decodeHeaders(std::move(request_headers_), !proxy_body);
2220 0 : if (proxy_body) {
2221 : // This functionality is currently only used for internal redirects, which the router only
2222 : // allows if the full request has been read (end_stream = true) so we don't need to handle the
2223 : // case of upstream sending an early response mid-request.
2224 0 : new_stream.decodeData(*request_data, true);
2225 0 : }
2226 0 : }
2227 :
2228 0 : Http1StreamEncoderOptionsOptRef ConnectionManagerImpl::ActiveStream::http1StreamEncoderOptions() {
2229 0 : return response_encoder_->http1StreamEncoderOptions();
2230 0 : }
2231 :
2232 0 : void ConnectionManagerImpl::ActiveStream::onResponseDataTooLarge() {
2233 0 : connection_manager_.stats_.named_.rs_too_large_.inc();
2234 0 : }
2235 :
2236 0 : void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, absl::string_view) {
2237 0 : connection_manager_.stats_.named_.downstream_rq_tx_reset_.inc();
2238 0 : connection_manager_.doEndStream(*this);
2239 0 : }
2240 :
2241 0 : bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
2242 : // TODO(yanavlasov): Merge this with the filter manager continueIteration() method
2243 0 : if (!state_.deferred_to_next_io_iteration_) {
2244 0 : return false;
2245 0 : }
2246 0 : state_.deferred_to_next_io_iteration_ = false;
2247 0 : bool end_stream = state_.deferred_end_stream_ && deferred_data_ == nullptr &&
2248 0 : request_trailers_ == nullptr && deferred_metadata_.empty();
2249 0 : filter_manager_.decodeHeaders(*request_headers_, end_stream);
2250 0 : if (end_stream) {
2251 0 : return true;
2252 0 : }
2253 : // Send metadata before data, as data may have an associated end_stream.
2254 0 : while (!deferred_metadata_.empty()) {
2255 0 : MetadataMapPtr& metadata = deferred_metadata_.front();
2256 0 : filter_manager_.decodeMetadata(*metadata);
2257 0 : deferred_metadata_.pop();
2258 0 : }
2259 : // Filter manager will return early from decodeData and decodeTrailers if
2260 : // request has completed.
2261 0 : if (deferred_data_ != nullptr) {
2262 0 : end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
2263 0 : filter_manager_.decodeData(*deferred_data_, end_stream);
2264 0 : }
2265 0 : if (request_trailers_ != nullptr) {
2266 0 : filter_manager_.decodeTrailers(*request_trailers_);
2267 0 : }
2268 0 : return true;
2269 0 : }
2270 :
2271 507 : bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
2272 : // Do not defer this stream if stream deferral is disabled
2273 507 : if (deferred_request_processing_callback_ == nullptr) {
2274 507 : return false;
2275 507 : }
2276 : // Defer this stream if there are already deferred streams, so they are not
2277 : // processed out of order
2278 0 : if (deferred_request_processing_callback_->enabled()) {
2279 0 : return true;
2280 0 : }
2281 0 : ++requests_during_dispatch_count_;
2282 0 : bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_;
2283 0 : if (defer) {
2284 0 : deferred_request_processing_callback_->scheduleCallbackNextIteration();
2285 0 : }
2286 0 : return defer;
2287 0 : }
2288 :
2289 0 : void ConnectionManagerImpl::onDeferredRequestProcessing() {
2290 0 : if (streams_.empty()) {
2291 0 : return;
2292 0 : }
2293 0 : requests_during_dispatch_count_ = 1; // 1 stream is always let through
2294 : // Streams are inserted at the head of the list. As such process deferred
2295 : // streams in the reverse order.
2296 0 : auto reverse_iter = std::prev(streams_.end());
2297 0 : bool at_first_element = false;
2298 0 : do {
2299 0 : at_first_element = reverse_iter == streams_.begin();
2300 : // Move the iterator to the previous item in case the `onDeferredRequestProcessing` call removes
2301 : // the stream from the list.
2302 0 : auto previous_element = std::prev(reverse_iter);
2303 0 : bool was_deferred = (*reverse_iter)->onDeferredRequestProcessing();
2304 0 : if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
2305 0 : break;
2306 0 : }
2307 0 : reverse_iter = previous_element;
2308 : // TODO(yanavlasov): see if `rend` can be used.
2309 0 : } while (!at_first_element);
2310 0 : }
2311 :
2312 : } // namespace Http
2313 : } // namespace Envoy
|