LCOV - code coverage report
Current view: top level - source/common/http - conn_manager_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 814 1583 51.4 %
Date: 2024-01-05 06:35:25 Functions: 59 116 50.9 %

          Line data    Source code
       1             : #include "source/common/http/conn_manager_impl.h"
       2             : 
       3             : #include <chrono>
       4             : #include <cstdint>
       5             : #include <functional>
       6             : #include <iterator>
       7             : #include <list>
       8             : #include <memory>
       9             : #include <string>
      10             : #include <vector>
      11             : 
      12             : #include "envoy/buffer/buffer.h"
      13             : #include "envoy/common/time.h"
      14             : #include "envoy/event/dispatcher.h"
      15             : #include "envoy/event/scaled_range_timer_manager.h"
      16             : #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
      17             : #include "envoy/http/header_map.h"
      18             : #include "envoy/http/header_validator_errors.h"
      19             : #include "envoy/network/drain_decision.h"
      20             : #include "envoy/router/router.h"
      21             : #include "envoy/ssl/connection.h"
      22             : #include "envoy/stats/scope.h"
      23             : #include "envoy/stream_info/filter_state.h"
      24             : #include "envoy/stream_info/stream_info.h"
      25             : #include "envoy/tracing/tracer.h"
      26             : #include "envoy/type/v3/percent.pb.h"
      27             : 
      28             : #include "source/common/buffer/buffer_impl.h"
      29             : #include "source/common/common/assert.h"
      30             : #include "source/common/common/empty_string.h"
      31             : #include "source/common/common/enum_to_int.h"
      32             : #include "source/common/common/fmt.h"
      33             : #include "source/common/common/perf_tracing.h"
      34             : #include "source/common/common/scope_tracker.h"
      35             : #include "source/common/common/utility.h"
      36             : #include "source/common/http/codes.h"
      37             : #include "source/common/http/conn_manager_utility.h"
      38             : #include "source/common/http/exception.h"
      39             : #include "source/common/http/header_map_impl.h"
      40             : #include "source/common/http/header_utility.h"
      41             : #include "source/common/http/headers.h"
      42             : #include "source/common/http/http1/codec_impl.h"
      43             : #include "source/common/http/http2/codec_impl.h"
      44             : #include "source/common/http/path_utility.h"
      45             : #include "source/common/http/status.h"
      46             : #include "source/common/http/utility.h"
      47             : #include "source/common/network/utility.h"
      48             : #include "source/common/router/config_impl.h"
      49             : #include "source/common/runtime/runtime_features.h"
      50             : #include "source/common/stats/timespan_impl.h"
      51             : #include "source/common/stream_info/utility.h"
      52             : 
      53             : #include "absl/strings/escaping.h"
      54             : #include "absl/strings/match.h"
      55             : #include "absl/strings/str_cat.h"
      56             : 
      57             : namespace Envoy {
      58             : namespace Http {
      59             : 
      60             : const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey =
      61             :     "overload.premature_reset_total_stream_count";
      62             : const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
      63             :     "overload.premature_reset_min_stream_lifetime_seconds";
      64             : // Runtime key for maximum number of requests that can be processed from a single connection per
      65             : // I/O cycle. Requests over this limit are deferred until the next I/O cycle.
      66             : const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle =
      67             :     "http.max_requests_per_io_cycle";
      68             : // Don't attempt to intelligently delay close: https://github.com/envoyproxy/envoy/issues/30010
      69             : const absl::string_view ConnectionManagerImpl::OptionallyDelayClose =
      70             :     "http1.optionally_delay_close";
      71             : 
      72         226 : bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
      73         226 :   if (!headers) {
      74          81 :     return false;
      75          81 :   }
      76         145 :   if (protocol <= Protocol::Http11) {
      77         143 :     return HeaderUtility::isConnect(*headers);
      78         143 :   }
      79             :   // All HTTP/2 style upgrades were originally connect requests.
      80           2 :   return HeaderUtility::isConnect(*headers) || Utility::isUpgrade(*headers);
      81         145 : }
      82             : 
      83             : ConnectionManagerStats ConnectionManagerImpl::generateStats(const std::string& prefix,
      84         274 :                                                             Stats::Scope& scope) {
      85         274 :   return ConnectionManagerStats(
      86         274 :       {ALL_HTTP_CONN_MAN_STATS(POOL_COUNTER_PREFIX(scope, prefix), POOL_GAUGE_PREFIX(scope, prefix),
      87         274 :                                POOL_HISTOGRAM_PREFIX(scope, prefix))},
      88         274 :       prefix, scope);
      89         274 : }
      90             : 
      91             : ConnectionManagerTracingStats ConnectionManagerImpl::generateTracingStats(const std::string& prefix,
      92         274 :                                                                           Stats::Scope& scope) {
      93         274 :   return {CONN_MAN_TRACING_STATS(POOL_COUNTER_PREFIX(scope, prefix + "tracing."))};
      94         274 : }
      95             : 
      96             : ConnectionManagerListenerStats
      97         238 : ConnectionManagerImpl::generateListenerStats(const std::string& prefix, Stats::Scope& scope) {
      98         238 :   return {CONN_MAN_LISTENER_STATS(POOL_COUNTER_PREFIX(scope, prefix))};
      99         238 : }
     100             : 
     101             : ConnectionManagerImpl::ConnectionManagerImpl(ConnectionManagerConfig& config,
     102             :                                              const Network::DrainDecision& drain_close,
     103             :                                              Random::RandomGenerator& random_generator,
     104             :                                              Http::Context& http_context, Runtime::Loader& runtime,
     105             :                                              const LocalInfo::LocalInfo& local_info,
     106             :                                              Upstream::ClusterManager& cluster_manager,
     107             :                                              Server::OverloadManager& overload_manager,
     108             :                                              TimeSource& time_source)
     109             :     : config_(config), stats_(config_.stats()),
     110             :       conn_length_(new Stats::HistogramCompletableTimespanImpl(
     111             :           stats_.named_.downstream_cx_length_ms_, time_source)),
     112             :       drain_close_(drain_close), user_agent_(http_context.userAgentContext()),
     113             :       random_generator_(random_generator), runtime_(runtime), local_info_(local_info),
     114             :       cluster_manager_(cluster_manager), listener_stats_(config_.listenerStats()),
     115             :       overload_manager_(overload_manager),
     116             :       overload_state_(overload_manager.getThreadLocalOverloadState()),
     117             :       accept_new_http_stream_(overload_manager.getLoadShedPoint(
     118             :           "envoy.load_shed_points.http_connection_manager_decode_headers")),
     119             :       overload_stop_accepting_requests_ref_(
     120             :           overload_state_.getState(Server::OverloadActionNames::get().StopAcceptingRequests)),
     121             :       overload_disable_keepalive_ref_(
     122             :           overload_state_.getState(Server::OverloadActionNames::get().DisableHttpKeepAlive)),
     123             :       time_source_(time_source), proxy_name_(StreamInfo::ProxyStatusUtils::makeProxyName(
     124             :                                      /*node_id=*/local_info_.node().id(),
     125             :                                      /*server_name=*/config_.serverName(),
     126             :                                      /*proxy_status_config=*/config_.proxyStatusConfig())),
     127             :       max_requests_during_dispatch_(
     128             :           runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)),
     129             :       refresh_rtt_after_request_(
     130         922 :           Runtime::runtimeFeatureEnabled("envoy.reloadable_features.refresh_rtt_after_request")) {
     131         922 :   ENVOY_LOG_ONCE_IF(
     132         922 :       trace, accept_new_http_stream_ == nullptr,
     133         922 :       "LoadShedPoint envoy.load_shed_points.http_connection_manager_decode_headers is not "
     134         922 :       "found. Is it configured?");
     135         922 : }
     136             : 
     137           0 : const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
     138           0 :   static const auto headers = createHeaderMap<ResponseHeaderMapImpl>(
     139           0 :       {{Http::Headers::get().Status, std::to_string(enumToInt(Code::Continue))}});
     140           0 :   return *headers;
     141           0 : }
     142             : 
     143         922 : void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
     144         922 :   read_callbacks_ = &callbacks;
     145         922 :   dispatcher_ = &callbacks.connection().dispatcher();
     146         922 :   if (max_requests_during_dispatch_ != UINT32_MAX) {
     147           0 :     deferred_request_processing_callback_ =
     148           0 :         dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); });
     149           0 :   }
     150             : 
     151         922 :   stats_.named_.downstream_cx_total_.inc();
     152         922 :   stats_.named_.downstream_cx_active_.inc();
     153         922 :   if (read_callbacks_->connection().ssl()) {
     154          36 :     stats_.named_.downstream_cx_ssl_total_.inc();
     155          36 :     stats_.named_.downstream_cx_ssl_active_.inc();
     156          36 :   }
     157             : 
     158         922 :   read_callbacks_->connection().addConnectionCallbacks(*this);
     159             : 
     160         922 :   if (config_.addProxyProtocolConnectionState() &&
     161         922 :       !read_callbacks_->connection()
     162         922 :            .streamInfo()
     163         922 :            .filterState()
     164         922 :            ->hasData<Network::ProxyProtocolFilterState>(Network::ProxyProtocolFilterState::key())) {
     165         919 :     read_callbacks_->connection().streamInfo().filterState()->setData(
     166         919 :         Network::ProxyProtocolFilterState::key(),
     167         919 :         std::make_unique<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
     168         919 :             read_callbacks_->connection().connectionInfoProvider().remoteAddress(),
     169         919 :             read_callbacks_->connection().connectionInfoProvider().localAddress()}),
     170         919 :         StreamInfo::FilterState::StateType::ReadOnly,
     171         919 :         StreamInfo::FilterState::LifeSpan::Connection);
     172         919 :   }
     173             : 
     174         922 :   if (config_.idleTimeout()) {
     175         788 :     connection_idle_timer_ =
     176         788 :         dispatcher_->createScaledTimer(Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout,
     177         788 :                                        [this]() -> void { onIdleTimeout(); });
     178         788 :     connection_idle_timer_->enableTimer(config_.idleTimeout().value());
     179         788 :   }
     180             : 
     181         922 :   if (config_.maxConnectionDuration()) {
     182           0 :     connection_duration_timer_ =
     183           0 :         dispatcher_->createTimer([this]() -> void { onConnectionDurationTimeout(); });
     184           0 :     connection_duration_timer_->enableTimer(config_.maxConnectionDuration().value());
     185           0 :   }
     186             : 
     187         922 :   read_callbacks_->connection().setDelayedCloseTimeout(config_.delayedCloseTimeout());
     188             : 
     189         922 :   read_callbacks_->connection().setConnectionStats(
     190         922 :       {stats_.named_.downstream_cx_rx_bytes_total_, stats_.named_.downstream_cx_rx_bytes_buffered_,
     191         922 :        stats_.named_.downstream_cx_tx_bytes_total_, stats_.named_.downstream_cx_tx_bytes_buffered_,
     192         922 :        nullptr, &stats_.named_.downstream_cx_delayed_close_timeout_});
     193         922 : }
     194             : 
     195         922 : ConnectionManagerImpl::~ConnectionManagerImpl() {
     196         922 :   stats_.named_.downstream_cx_destroy_.inc();
     197             : 
     198         922 :   stats_.named_.downstream_cx_active_.dec();
     199         922 :   if (read_callbacks_->connection().ssl()) {
     200          36 :     stats_.named_.downstream_cx_ssl_active_.dec();
     201          36 :   }
     202             : 
     203         922 :   if (codec_) {
     204         761 :     if (codec_->protocol() == Protocol::Http2) {
     205         336 :       stats_.named_.downstream_cx_http2_active_.dec();
     206         732 :     } else if (codec_->protocol() == Protocol::Http3) {
     207           0 :       stats_.named_.downstream_cx_http3_active_.dec();
     208         425 :     } else {
     209         425 :       stats_.named_.downstream_cx_http1_active_.dec();
     210         425 :     }
     211         761 :   }
     212             : 
     213         922 :   conn_length_->complete();
     214         922 :   user_agent_.completeConnectionLength(*conn_length_);
     215         922 : }
     216             : 
     217        1282 : void ConnectionManagerImpl::checkForDeferredClose(bool skip_delay_close) {
     218        1282 :   Network::ConnectionCloseType close = Network::ConnectionCloseType::FlushWriteAndDelay;
     219        1282 :   if (runtime_.snapshot().getBoolean(ConnectionManagerImpl::OptionallyDelayClose, true) &&
     220        1282 :       skip_delay_close) {
     221           6 :     close = Network::ConnectionCloseType::FlushWrite;
     222           6 :   }
     223        1282 :   if (drain_state_ == DrainState::Closing && streams_.empty() && !codec_->wantsToWrite()) {
     224             :     // We are closing a draining connection with no active streams and the codec has
     225             :     // nothing to write.
     226         345 :     doConnectionClose(close, absl::nullopt,
     227         345 :                       StreamInfo::LocalCloseReasons::get().DeferredCloseOnDrainedConnection);
     228         345 :   }
     229        1282 : }
     230             : 
     231         555 : void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_deferred_close) {
     232             :   // The order of what happens in this routine is important and a little complicated. We first see
     233             :   // if the stream needs to be reset. If it needs to be, this will end up invoking reset callbacks
     234             :   // and then moving the stream to the deferred destruction list. If the stream has not been reset,
     235             :   // we move it to the deferred deletion list here. Then, we potentially close the connection. This
     236             :   // must be done after deleting the stream since the stream refers to the connection and must be
     237             :   // deleted first.
     238         555 :   bool reset_stream = false;
     239             :   // If the response encoder is still associated with the stream, reset the stream. The exception
     240             :   // here is when Envoy "ends" the stream by calling recreateStream at which point recreateStream
     241             :   // explicitly nulls out response_encoder to avoid the downstream being notified of the
     242             :   // Envoy-internal stream instance being ended.
     243         555 :   if (stream.response_encoder_ != nullptr && (!stream.filter_manager_.remoteDecodeComplete() ||
     244         555 :                                               !stream.state_.codec_saw_local_complete_)) {
     245             :     // Indicate local is complete at this point so that if we reset during a continuation, we don't
     246             :     // raise further data or trailers.
     247         226 :     ENVOY_STREAM_LOG(debug, "doEndStream() resetting stream", stream);
     248             :     // TODO(snowp): This call might not be necessary, try to clean up + remove setter function.
     249         226 :     stream.filter_manager_.setLocalComplete();
     250         226 :     stream.state_.codec_saw_local_complete_ = true;
     251             : 
     252             :     // Per https://tools.ietf.org/html/rfc7540#section-8.3 if there was an error
     253             :     // with the TCP connection during a CONNECT request, it should be
     254             :     // communicated via CONNECT_ERROR
     255         226 :     if (requestWasConnect(stream.request_headers_, codec_->protocol()) &&
     256         226 :         (stream.filter_manager_.streamInfo().hasResponseFlag(
     257           0 :              StreamInfo::ResponseFlag::UpstreamConnectionFailure) ||
     258           0 :          stream.filter_manager_.streamInfo().hasResponseFlag(
     259           0 :              StreamInfo::ResponseFlag::UpstreamConnectionTermination))) {
     260           0 :       stream.response_encoder_->getStream().resetStream(StreamResetReason::ConnectError);
     261         226 :     } else {
     262         226 :       if (stream.filter_manager_.streamInfo().hasResponseFlag(
     263         226 :               StreamInfo::ResponseFlag::UpstreamProtocolError)) {
     264           2 :         stream.response_encoder_->getStream().resetStream(StreamResetReason::ProtocolError);
     265         224 :       } else {
     266         224 :         stream.response_encoder_->getStream().resetStream(StreamResetReason::LocalReset);
     267         224 :       }
     268         226 :     }
     269         226 :     reset_stream = true;
     270         226 :   }
     271             : 
     272         555 :   if (!reset_stream) {
     273         329 :     doDeferredStreamDestroy(stream);
     274         329 :   }
     275             : 
     276         555 :   if (reset_stream && codec_->protocol() < Protocol::Http2) {
     277         224 :     drain_state_ = DrainState::Closing;
     278         224 :   }
     279             : 
     280             :   // If HTTP/1.0 has no content length, it is framed by close and won't consider
     281             :   // the request complete until the FIN is read. Don't delay close in this case.
     282         555 :   bool http_10_sans_cl = (codec_->protocol() == Protocol::Http10) &&
     283         555 :                          (!stream.response_headers_ || !stream.response_headers_->ContentLength());
     284             :   // We also don't delay-close in the case of HTTP/1.1 where the request is
     285             :   // fully read, as there's no race condition to avoid.
     286         555 :   const bool connection_close =
     287         555 :       stream.filter_manager_.streamInfo().shouldDrainConnectionUponCompletion();
     288         555 :   bool request_complete = stream.filter_manager_.remoteDecodeComplete();
     289             : 
     290         555 :   if (check_for_deferred_close) {
     291             :     // Don't do delay close for HTTP/1.0 or if the request is complete.
     292         555 :     checkForDeferredClose(connection_close && (request_complete || http_10_sans_cl));
     293         555 :   }
     294         555 : }
     295             : 
     296         765 : void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
     297         765 :   if (!stream.state_.is_internally_destroyed_) {
     298         765 :     ++closed_non_internally_destroyed_requests_;
     299         765 :     if (isPrematureRstStream(stream)) {
     300         108 :       ++number_premature_stream_resets_;
     301         108 :     }
     302         765 :   }
     303         765 :   if (stream.max_stream_duration_timer_ != nullptr) {
     304           0 :     stream.max_stream_duration_timer_->disableTimer();
     305           0 :     stream.max_stream_duration_timer_ = nullptr;
     306           0 :   }
     307         765 :   if (stream.stream_idle_timer_ != nullptr) {
     308         517 :     stream.stream_idle_timer_->disableTimer();
     309         517 :     stream.stream_idle_timer_ = nullptr;
     310         517 :   }
     311         765 :   stream.filter_manager_.disarmRequestTimeout();
     312         765 :   if (stream.request_header_timer_ != nullptr) {
     313           0 :     stream.request_header_timer_->disableTimer();
     314           0 :     stream.request_header_timer_ = nullptr;
     315           0 :   }
     316         765 :   if (stream.access_log_flush_timer_ != nullptr) {
     317           0 :     stream.access_log_flush_timer_->disableTimer();
     318           0 :     stream.access_log_flush_timer_ = nullptr;
     319           0 :   }
     320             : 
     321             :   // Only destroy the active stream if the underlying codec has notified us of
     322             :   // completion or we've internal redirect the stream.
     323         765 :   if (!stream.canDestroyStream()) {
     324             :     // Track that this stream is not expecting any additional calls apart from
     325             :     // codec notification.
     326          94 :     stream.state_.is_zombie_stream_ = true;
     327          94 :     return;
     328          94 :   }
     329             : 
     330         671 :   if (stream.response_encoder_ != nullptr) {
     331         671 :     stream.response_encoder_->getStream().registerCodecEventCallbacks(nullptr);
     332         671 :   }
     333             : 
     334         671 :   stream.completeRequest();
     335             : 
     336             :   // If refresh rtt after request is required explicitly, then try to get rtt again set it into
     337             :   // connection info.
     338         671 :   if (refresh_rtt_after_request_) {
     339             :     // Set roundtrip time in connectionInfoSetter before OnStreamComplete
     340           0 :     absl::optional<std::chrono::milliseconds> t = read_callbacks_->connection().lastRoundTripTime();
     341           0 :     if (t.has_value()) {
     342           0 :       read_callbacks_->connection().connectionInfoSetter().setRoundTripTime(t.value());
     343           0 :     }
     344           0 :   }
     345             : 
     346         671 :   stream.filter_manager_.onStreamComplete();
     347             : 
     348             :   // For HTTP/3, skip access logging here and add deferred logging info
     349             :   // to stream info for QuicStatsGatherer to use later.
     350         671 :   if (codec_ && codec_->protocol() == Protocol::Http3 &&
     351             :       // There was a downstream reset, log immediately.
     352         671 :       !stream.filter_manager_.sawDownstreamReset() &&
     353             :       // On recreate stream, log immediately.
     354         671 :       stream.response_encoder_ != nullptr &&
     355         671 :       Runtime::runtimeFeatureEnabled(
     356           0 :           "envoy.reloadable_features.quic_defer_logging_to_ack_listener")) {
     357           0 :     stream.deferHeadersAndTrailers();
     358         671 :   } else {
     359             :     // For HTTP/1 and HTTP/2, log here as usual.
     360         671 :     stream.filter_manager_.log(AccessLog::AccessLogType::DownstreamEnd);
     361         671 :   }
     362             : 
     363         671 :   stream.filter_manager_.destroyFilters();
     364             : 
     365         671 :   dispatcher_->deferredDelete(stream.removeFromList(streams_));
     366             : 
     367             :   // The response_encoder should never be dangling (unless we're destroying a
     368             :   // stream we are recreating) as the codec level stream will either outlive the
     369             :   // ActiveStream, or be alive in deferred deletion queue at this point.
     370         671 :   if (stream.response_encoder_) {
     371         671 :     stream.response_encoder_->getStream().removeCallbacks(stream);
     372         671 :   }
     373             : 
     374         671 :   if (connection_idle_timer_ && streams_.empty()) {
     375         408 :     connection_idle_timer_->enableTimer(config_.idleTimeout().value());
     376         408 :   }
     377         671 :   maybeDrainDueToPrematureResets();
     378         671 : }
     379             : 
     380             : RequestDecoderHandlePtr ConnectionManagerImpl::newStreamHandle(ResponseEncoder& response_encoder,
     381           0 :                                                                bool is_internally_created) {
     382           0 :   RequestDecoder& decoder = newStream(response_encoder, is_internally_created);
     383           0 :   return std::make_unique<ActiveStreamHandle>(static_cast<ActiveStream&>(decoder));
     384           0 : }
     385             : 
     386             : RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder,
     387         671 :                                                  bool is_internally_created) {
     388         671 :   TRACE_EVENT("core", "ConnectionManagerImpl::newStream");
     389         671 :   if (connection_idle_timer_) {
     390         517 :     connection_idle_timer_->disableTimer();
     391         517 :   }
     392             : 
     393         671 :   ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());
     394             : 
     395         671 :   Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
     396         671 :       response_encoder.getStream().account();
     397             : 
     398         671 :   if (downstream_stream_account == nullptr) {
     399             :     // Create account, wiring the stream to use it for tracking bytes.
     400             :     // If tracking is disabled, the wiring becomes a NOP.
     401         671 :     auto& buffer_factory = dispatcher_->getWatermarkFactory();
     402         671 :     downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
     403         671 :     response_encoder.getStream().setAccount(downstream_stream_account);
     404         671 :   }
     405             : 
     406         671 :   auto new_stream = std::make_unique<ActiveStream>(
     407         671 :       *this, response_encoder.getStream().bufferLimit(), std::move(downstream_stream_account));
     408             : 
     409         671 :   accumulated_requests_++;
     410         671 :   if (config_.maxRequestsPerConnection() > 0 &&
     411         671 :       accumulated_requests_ >= config_.maxRequestsPerConnection()) {
     412           0 :     if (codec_->protocol() < Protocol::Http2) {
     413           0 :       new_stream->filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
     414             :       // Prevent erroneous debug log of closing due to incoming connection close header.
     415           0 :       drain_state_ = DrainState::Closing;
     416           0 :     } else if (drain_state_ == DrainState::NotDraining) {
     417           0 :       startDrainSequence();
     418           0 :     }
     419           0 :     ENVOY_CONN_LOG(debug, "max requests per connection reached", read_callbacks_->connection());
     420           0 :     stats_.named_.downstream_cx_max_requests_reached_.inc();
     421           0 :   }
     422             : 
     423         671 :   new_stream->state_.is_internally_created_ = is_internally_created;
     424         671 :   new_stream->response_encoder_ = &response_encoder;
     425         671 :   new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
     426         671 :   new_stream->response_encoder_->getStream().registerCodecEventCallbacks(new_stream.get());
     427         671 :   new_stream->response_encoder_->getStream().setFlushTimeout(new_stream->idle_timeout_ms_);
     428         671 :   new_stream->streamInfo().setDownstreamBytesMeter(response_encoder.getStream().bytesMeter());
     429             :   // If the network connection is backed up, the stream should be made aware of it on creation.
     430             :   // Both HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacksHelper.
     431         671 :   ASSERT(read_callbacks_->connection().aboveHighWatermark() == false ||
     432         671 :          new_stream->filter_manager_.aboveHighWatermark());
     433         671 :   LinkedList::moveIntoList(std::move(new_stream), streams_);
     434         671 :   return **streams_.begin();
     435         671 : }
     436             : 
     437             : void ConnectionManagerImpl::handleCodecErrorImpl(absl::string_view error, absl::string_view details,
     438         227 :                                                  StreamInfo::ResponseFlag response_flag) {
     439         227 :   ENVOY_CONN_LOG(debug, "dispatch error: {}", read_callbacks_->connection(), error);
     440         227 :   read_callbacks_->connection().streamInfo().setResponseFlag(response_flag);
     441             : 
     442             :   // HTTP/1.1 codec has already sent a 400 response if possible. HTTP/2 codec has already sent
     443             :   // GOAWAY.
     444         227 :   doConnectionClose(Network::ConnectionCloseType::FlushWriteAndDelay, response_flag, details);
     445         227 : }
     446             : 
     447         227 : void ConnectionManagerImpl::handleCodecError(absl::string_view error) {
     448         227 :   handleCodecErrorImpl(error, absl::StrCat("codec_error:", StringUtil::replaceAllEmptySpace(error)),
     449         227 :                        StreamInfo::ResponseFlag::DownstreamProtocolError);
     450         227 : }
     451             : 
     452           0 : void ConnectionManagerImpl::handleCodecOverloadError(absl::string_view error) {
     453           0 :   handleCodecErrorImpl(error,
     454           0 :                        absl::StrCat("overload_error:", StringUtil::replaceAllEmptySpace(error)),
     455           0 :                        StreamInfo::ResponseFlag::OverloadManager);
     456           0 : }
     457             : 
     458         761 : void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
     459         761 :   ASSERT(!codec_);
     460         761 :   codec_ = config_.createCodec(read_callbacks_->connection(), data, *this, overload_manager_);
     461             : 
     462         761 :   switch (codec_->protocol()) {
     463           0 :   case Protocol::Http3:
     464           0 :     stats_.named_.downstream_cx_http3_total_.inc();
     465           0 :     stats_.named_.downstream_cx_http3_active_.inc();
     466           0 :     break;
     467         336 :   case Protocol::Http2:
     468         336 :     stats_.named_.downstream_cx_http2_total_.inc();
     469         336 :     stats_.named_.downstream_cx_http2_active_.inc();
     470         336 :     break;
     471         425 :   case Protocol::Http11:
     472         425 :   case Protocol::Http10:
     473         425 :     stats_.named_.downstream_cx_http1_total_.inc();
     474         425 :     stats_.named_.downstream_cx_http1_active_.inc();
     475         425 :     break;
     476         761 :   }
     477         761 : }
     478             : 
     479         952 : Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
     480         952 :   requests_during_dispatch_count_ = 0;
     481         952 :   if (!codec_) {
     482             :     // Http3 codec should have been instantiated by now.
     483         761 :     createCodec(data);
     484         761 :   }
     485             : 
     486         952 :   bool redispatch;
     487         952 :   do {
     488         952 :     redispatch = false;
     489             : 
     490         952 :     const Status status = codec_->dispatch(data);
     491             : 
     492         952 :     if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
     493           0 :       handleCodecError(status.message());
     494           0 :       return Network::FilterStatus::StopIteration;
     495         952 :     } else if (isCodecProtocolError(status)) {
     496         225 :       stats_.named_.downstream_cx_protocol_error_.inc();
     497         225 :       handleCodecError(status.message());
     498         225 :       return Network::FilterStatus::StopIteration;
     499         730 :     } else if (isEnvoyOverloadError(status)) {
     500             :       // The other codecs aren't wired to send this status.
     501           0 :       ASSERT(codec_->protocol() < Protocol::Http2,
     502           0 :              "Expected only HTTP1.1 and below to send overload error.");
     503           0 :       stats_.named_.downstream_rq_overload_close_.inc();
     504           0 :       handleCodecOverloadError(status.message());
     505           0 :       return Network::FilterStatus::StopIteration;
     506           0 :     }
     507         727 :     ASSERT(status.ok());
     508             : 
     509             :     // Processing incoming data may release outbound data so check for closure here as well.
     510         727 :     checkForDeferredClose(false);
     511             : 
     512             :     // The HTTP/1 codec will pause dispatch after a single message is complete. We want to
     513             :     // either redispatch if there are no streams and we have more data. If we have a single
     514             :     // complete non-WebSocket stream but have not responded yet we will pause socket reads
     515             :     // to apply back pressure.
     516         727 :     if (codec_->protocol() < Protocol::Http2) {
     517         391 :       if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
     518         391 :           data.length() > 0 && streams_.empty()) {
     519           0 :         redispatch = true;
     520           0 :       }
     521         391 :     }
     522         727 :   } while (redispatch);
     523             : 
     524         727 :   if (!read_callbacks_->connection().streamInfo().protocol()) {
     525         602 :     read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
     526         602 :   }
     527             : 
     528         727 :   return Network::FilterStatus::StopIteration;
     529         952 : }
     530             : 
     531         884 : Network::FilterStatus ConnectionManagerImpl::onNewConnection() {
     532         884 :   if (!read_callbacks_->connection().streamInfo().protocol()) {
     533             :     // For Non-QUIC traffic, continue passing data to filters.
     534         884 :     return Network::FilterStatus::Continue;
     535         884 :   }
     536             :   // Only QUIC connection's stream_info_ specifies protocol.
     537           0 :   Buffer::OwnedImpl dummy;
     538           0 :   createCodec(dummy);
     539           0 :   ASSERT(codec_->protocol() == Protocol::Http3);
     540             :   // Stop iterating through network filters for QUIC. Currently QUIC connections bypass the
     541             :   // onData() interface because QUICHE already handles de-multiplexing.
     542           0 :   return Network::FilterStatus::StopIteration;
     543         884 : }
     544             : 
     545             : void ConnectionManagerImpl::resetAllStreams(absl::optional<StreamInfo::ResponseFlag> response_flag,
     546         130 :                                             absl::string_view details) {
     547         270 :   while (!streams_.empty()) {
     548             :     // Mimic a downstream reset in this case. We must also remove callbacks here. Though we are
     549             :     // about to close the connection and will disable further reads, it is possible that flushing
     550             :     // data out can cause stream callbacks to fire (e.g., low watermark callbacks).
     551             :     //
     552             :     // TODO(mattklein123): I tried to actually reset through the codec here, but ran into issues
     553             :     // with nghttp2 state and being unhappy about sending reset frames after the connection had
     554             :     // been terminated via GOAWAY. It might be possible to do something better here inside the h2
     555             :     // codec but there are no easy answers and this seems simpler.
     556         140 :     auto& stream = *streams_.front();
     557         140 :     stream.response_encoder_->getStream().removeCallbacks(stream);
     558         140 :     if (!stream.response_encoder_->getStream().responseDetails().empty()) {
     559          22 :       stream.filter_manager_.streamInfo().setResponseCodeDetails(
     560          22 :           stream.response_encoder_->getStream().responseDetails());
     561         118 :     } else if (!details.empty()) {
     562         118 :       stream.filter_manager_.streamInfo().setResponseCodeDetails(details);
     563         118 :     }
     564         140 :     if (response_flag.has_value()) {
     565          50 :       stream.filter_manager_.streamInfo().setResponseFlag(response_flag.value());
     566          50 :     }
     567         140 :     stream.onResetStream(StreamResetReason::ConnectionTermination, absl::string_view());
     568         140 :   }
     569         130 : }
     570             : 
     571        1706 : void ConnectionManagerImpl::onEvent(Network::ConnectionEvent event) {
     572        1706 :   if (event == Network::ConnectionEvent::LocalClose) {
     573          68 :     stats_.named_.downstream_cx_destroy_local_.inc();
     574          68 :   }
     575             : 
     576        1706 :   if (event == Network::ConnectionEvent::RemoteClose ||
     577        1706 :       event == Network::ConnectionEvent::LocalClose) {
     578             : 
     579         922 :     std::string details;
     580         922 :     if (event == Network::ConnectionEvent::RemoteClose) {
     581         854 :       remote_close_ = true;
     582         854 :       stats_.named_.downstream_cx_destroy_remote_.inc();
     583         854 :       details = StreamInfo::ResponseCodeDetails::get().DownstreamRemoteDisconnect;
     584         894 :     } else {
     585          68 :       absl::string_view local_close_reason = read_callbacks_->connection().localCloseReason();
     586          68 :       ENVOY_BUG(!local_close_reason.empty(), "Local Close Reason was not set!");
     587          68 :       details = fmt::format(
     588          68 :           fmt::runtime(StreamInfo::ResponseCodeDetails::get().DownstreamLocalDisconnect),
     589          68 :           StringUtil::replaceAllEmptySpace(local_close_reason));
     590          68 :     }
     591             : 
     592             :     // TODO(mattklein123): It is technically possible that something outside of the filter causes
     593             :     // a local connection close, so we still guard against that here. A better solution would be to
     594             :     // have some type of "pre-close" callback that we could hook for cleanup that would get called
     595             :     // regardless of where local close is invoked from.
     596             :     // NOTE: that this will cause doConnectionClose() to get called twice in the common local close
     597             :     // cases, but the method protects against that.
     598             :     // NOTE: In the case where a local close comes from outside the filter, this will cause any
     599             :     // stream closures to increment remote close stats. We should do better here in the future,
     600             :     // via the pre-close callback mentioned above.
     601         922 :     doConnectionClose(absl::nullopt, absl::nullopt, details);
     602         922 :   }
     603        1706 : }
     604             : 
     605             : void ConnectionManagerImpl::doConnectionClose(
     606             :     absl::optional<Network::ConnectionCloseType> close_type,
     607        1494 :     absl::optional<StreamInfo::ResponseFlag> response_flag, absl::string_view details) {
     608        1494 :   if (connection_idle_timer_) {
     609         788 :     connection_idle_timer_->disableTimer();
     610         788 :     connection_idle_timer_.reset();
     611         788 :   }
     612             : 
     613        1494 :   if (connection_duration_timer_) {
     614           0 :     connection_duration_timer_->disableTimer();
     615           0 :     connection_duration_timer_.reset();
     616           0 :   }
     617             : 
     618        1494 :   if (drain_timer_) {
     619           0 :     drain_timer_->disableTimer();
     620           0 :     drain_timer_.reset();
     621           0 :   }
     622             : 
     623        1494 :   if (!streams_.empty()) {
     624         130 :     const Network::ConnectionEvent event = close_type.has_value()
     625         130 :                                                ? Network::ConnectionEvent::LocalClose
     626         130 :                                                : Network::ConnectionEvent::RemoteClose;
     627         130 :     if (event == Network::ConnectionEvent::LocalClose) {
     628          45 :       stats_.named_.downstream_cx_destroy_local_active_rq_.inc();
     629          45 :     }
     630         130 :     if (event == Network::ConnectionEvent::RemoteClose) {
     631          85 :       stats_.named_.downstream_cx_destroy_remote_active_rq_.inc();
     632          85 :     }
     633             : 
     634         130 :     stats_.named_.downstream_cx_destroy_active_rq_.inc();
     635         130 :     user_agent_.onConnectionDestroy(event, true);
     636             :     // Note that resetAllStreams() does not actually write anything to the wire. It just resets
     637             :     // all upstream streams and their filter stacks. Thus, there are no issues around recursive
     638             :     // entry.
     639         130 :     resetAllStreams(response_flag, details);
     640         130 :   }
     641             : 
     642        1494 :   if (close_type.has_value()) {
     643         572 :     read_callbacks_->connection().close(close_type.value(), details);
     644         572 :   }
     645        1494 : }
     646             : 
     647         765 : bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const {
     648             :   // Check if the request was prematurely reset, by comparing its lifetime to the configured
     649             :   // threshold.
     650         765 :   ASSERT(!stream.state_.is_internally_destroyed_);
     651         765 :   absl::optional<std::chrono::nanoseconds> duration =
     652         765 :       stream.filter_manager_.streamInfo().currentDuration();
     653             : 
     654             :   // Check if request lifetime is longer than the premature reset threshold.
     655         765 :   if (duration) {
     656         765 :     const uint64_t lifetime = std::chrono::duration_cast<std::chrono::seconds>(*duration).count();
     657         765 :     const uint64_t min_lifetime = runtime_.snapshot().getInteger(
     658         765 :         ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1);
     659         765 :     if (lifetime > min_lifetime) {
     660           0 :       return false;
     661           0 :     }
     662         765 :   }
     663             : 
     664             :   // If request has completed before configured threshold, also check if the Envoy proxied the
     665             :   // response from the upstream. Requests without the response status were reset.
     666             :   // TODO(RyanTheOptimist): Possibly support half_closed_local instead.
     667         765 :   return !stream.filter_manager_.streamInfo().responseCode();
     668         765 : }
     669             : 
     670             : // Sends a GOAWAY if too many streams have been reset prematurely on this
     671             : // connection.
     672         671 : void ConnectionManagerImpl::maybeDrainDueToPrematureResets() {
     673         671 :   if (!Runtime::runtimeFeatureEnabled(
     674         671 :           "envoy.restart_features.send_goaway_for_premature_rst_streams") ||
     675         671 :       closed_non_internally_destroyed_requests_ == 0) {
     676           0 :     return;
     677           0 :   }
     678             : 
     679         671 :   const uint64_t limit =
     680         671 :       runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500);
     681             : 
     682         671 :   if (closed_non_internally_destroyed_requests_ < limit) {
     683             :     // Even though the total number of streams have not reached `limit`, check if the number of bad
     684             :     // streams is high enough that even if every subsequent stream is good, the connection
     685             :     // would be closed once the limit is reached, and if so close the connection now.
     686         671 :     if (number_premature_stream_resets_ * 2 < limit) {
     687         671 :       return;
     688         671 :     }
     689         671 :   } else {
     690           0 :     if (number_premature_stream_resets_ * 2 < closed_non_internally_destroyed_requests_) {
     691           0 :       return;
     692           0 :     }
     693           0 :   }
     694             : 
     695           0 :   if (read_callbacks_->connection().state() == Network::Connection::State::Open) {
     696           0 :     stats_.named_.downstream_rq_too_many_premature_resets_.inc();
     697           0 :     doConnectionClose(Network::ConnectionCloseType::Abort, absl::nullopt,
     698           0 :                       "too_many_premature_resets");
     699           0 :   }
     700           0 : }
     701             : 
     702           6 : void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) {
     703             :   // Currently we do nothing with remote go away frames. In the future we can decide to no longer
     704             :   // push resources if applicable.
     705           6 : }
     706             : 
     707           0 : void ConnectionManagerImpl::onIdleTimeout() {
     708           0 :   ENVOY_CONN_LOG(debug, "idle timeout", read_callbacks_->connection());
     709           0 :   stats_.named_.downstream_cx_idle_timeout_.inc();
     710           0 :   if (!codec_) {
     711             :     // No need to delay close after flushing since an idle timeout has already fired. Attempt to
     712             :     // write out buffered data one last time and issue a local close if successful.
     713           0 :     doConnectionClose(Network::ConnectionCloseType::FlushWrite, absl::nullopt,
     714           0 :                       StreamInfo::LocalCloseReasons::get().IdleTimeoutOnConnection);
     715           0 :   } else if (drain_state_ == DrainState::NotDraining) {
     716           0 :     startDrainSequence();
     717           0 :   }
     718           0 : }
     719             : 
     720           0 : void ConnectionManagerImpl::onConnectionDurationTimeout() {
     721           0 :   ENVOY_CONN_LOG(debug, "max connection duration reached", read_callbacks_->connection());
     722           0 :   stats_.named_.downstream_cx_max_duration_reached_.inc();
     723           0 :   if (!codec_) {
     724             :     // Attempt to write out buffered data one last time and issue a local close if successful.
     725           0 :     doConnectionClose(Network::ConnectionCloseType::FlushWrite,
     726           0 :                       StreamInfo::ResponseFlag::DurationTimeout,
     727           0 :                       StreamInfo::ResponseCodeDetails::get().DurationTimeout);
     728           0 :   } else if (drain_state_ == DrainState::NotDraining) {
     729           0 :     startDrainSequence();
     730           0 :   }
     731           0 : }
     732             : 
     733           0 : void ConnectionManagerImpl::onDrainTimeout() {
     734           0 :   ASSERT(drain_state_ != DrainState::NotDraining);
     735           0 :   codec_->goAway();
     736           0 :   drain_state_ = DrainState::Closing;
     737           0 :   checkForDeferredClose(false);
     738           0 : }
     739             : 
     740             : void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_reason,
     741           0 :                                                ConnectionManagerTracingStats& tracing_stats) {
     742           0 :   switch (tracing_reason) {
     743           0 :   case Tracing::Reason::ClientForced:
     744           0 :     tracing_stats.client_enabled_.inc();
     745           0 :     break;
     746           0 :   case Tracing::Reason::Sampling:
     747           0 :     tracing_stats.random_sampling_.inc();
     748           0 :     break;
     749           0 :   case Tracing::Reason::ServiceForced:
     750           0 :     tracing_stats.service_forced_.inc();
     751           0 :     break;
     752           0 :   default:
     753           0 :     tracing_stats.not_traceable_.inc();
     754           0 :     break;
     755           0 :   }
     756           0 : }
     757             : 
     758             : // TODO(chaoqin-li1123): Make on demand vhds and on demand srds works at the same time.
     759             : void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestRouteConfigUpdate(
     760           0 :     Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
     761           0 :   absl::optional<Router::ConfigConstSharedPtr> route_config = parent_.routeConfig();
     762           0 :   Event::Dispatcher& thread_local_dispatcher = *parent_.connection_manager_.dispatcher_;
     763           0 :   if (route_config.has_value() && route_config.value()->usesVhds()) {
     764           0 :     ASSERT(!parent_.request_headers_->Host()->value().empty());
     765           0 :     const auto& host_header = absl::AsciiStrToLower(parent_.request_headers_->getHostValue());
     766           0 :     requestVhdsUpdate(host_header, thread_local_dispatcher, std::move(route_config_updated_cb));
     767           0 :     return;
     768           0 :   } else if (scope_key_builder_.has_value()) {
     769           0 :     Router::ScopeKeyPtr scope_key = scope_key_builder_->computeScopeKey(*parent_.request_headers_);
     770             :     // If scope_key is not null, the scope exists but RouteConfiguration is not initialized.
     771           0 :     if (scope_key != nullptr) {
     772           0 :       requestSrdsUpdate(std::move(scope_key), thread_local_dispatcher,
     773           0 :                         std::move(route_config_updated_cb));
     774           0 :       return;
     775           0 :     }
     776           0 :   }
     777             :   // Continue the filter chain if no on demand update is requested.
     778           0 :   (*route_config_updated_cb)(false);
     779           0 : }
     780             : 
     781             : void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestVhdsUpdate(
     782             :     const std::string& host_header, Event::Dispatcher& thread_local_dispatcher,
     783           0 :     Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
     784           0 :   route_config_provider_->requestVirtualHostsUpdate(host_header, thread_local_dispatcher,
     785           0 :                                                     std::move(route_config_updated_cb));
     786           0 : }
     787             : 
     788             : void ConnectionManagerImpl::RdsRouteConfigUpdateRequester::requestSrdsUpdate(
     789             :     Router::ScopeKeyPtr scope_key, Event::Dispatcher& thread_local_dispatcher,
     790           0 :     Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
     791             :   // Since inline scope_route_config_provider is not fully implemented and never used,
     792             :   // dynamic cast in constructor always succeed and the pointer should not be null here.
     793           0 :   ASSERT(scoped_route_config_provider_ != nullptr);
     794           0 :   Http::RouteConfigUpdatedCallback scoped_route_config_updated_cb =
     795           0 :       Http::RouteConfigUpdatedCallback(
     796           0 :           [this, weak_route_config_updated_cb = std::weak_ptr<Http::RouteConfigUpdatedCallback>(
     797           0 :                      route_config_updated_cb)](bool scope_exist) {
     798             :             // If the callback can be locked, this ActiveStream is still alive.
     799           0 :             if (auto cb = weak_route_config_updated_cb.lock()) {
     800             :               // Refresh the route before continue the filter chain.
     801           0 :               if (scope_exist) {
     802           0 :                 parent_.refreshCachedRoute();
     803           0 :               }
     804           0 :               (*cb)(scope_exist && parent_.hasCachedRoute());
     805           0 :             }
     806           0 :           });
     807           0 :   scoped_route_config_provider_->onDemandRdsUpdate(std::move(scope_key), thread_local_dispatcher,
     808           0 :                                                    std::move(scoped_route_config_updated_cb));
     809           0 : }
     810             : 
     811             : absl::optional<absl::string_view>
     812           0 : ConnectionManagerImpl::HttpStreamIdProviderImpl::toStringView() const {
     813           0 :   if (parent_.request_headers_ == nullptr) {
     814           0 :     return {};
     815           0 :   }
     816           0 :   ASSERT(parent_.connection_manager_.config_.requestIDExtension() != nullptr);
     817           0 :   return parent_.connection_manager_.config_.requestIDExtension()->get(*parent_.request_headers_);
     818           0 : }
     819             : 
     820           0 : absl::optional<uint64_t> ConnectionManagerImpl::HttpStreamIdProviderImpl::toInteger() const {
     821           0 :   if (parent_.request_headers_ == nullptr) {
     822           0 :     return {};
     823           0 :   }
     824           0 :   ASSERT(parent_.connection_manager_.config_.requestIDExtension() != nullptr);
     825           0 :   return parent_.connection_manager_.config_.requestIDExtension()->getInteger(
     826           0 :       *parent_.request_headers_);
     827           0 : }
     828             : 
     829             : ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager,
     830             :                                                   uint32_t buffer_limit,
     831             :                                                   Buffer::BufferMemoryAccountSharedPtr account)
     832             :     : connection_manager_(connection_manager),
     833             :       connection_manager_tracing_config_(connection_manager_.config_.tracingConfig() == nullptr
     834             :                                              ? absl::nullopt
     835             :                                              : makeOptRef<const TracingConnectionManagerConfig>(
     836             :                                                    *connection_manager_.config_.tracingConfig())),
     837             :       stream_id_(connection_manager.random_generator_.random()),
     838             :       filter_manager_(*this, *connection_manager_.dispatcher_,
     839             :                       connection_manager_.read_callbacks_->connection(), stream_id_,
     840             :                       std::move(account), connection_manager_.config_.proxy100Continue(),
     841             :                       buffer_limit, connection_manager_.config_.filterFactory(),
     842             :                       connection_manager_.config_.localReply(),
     843             :                       connection_manager_.codec_->protocol(), connection_manager_.timeSource(),
     844             :                       connection_manager_.read_callbacks_->connection().streamInfo().filterState(),
     845             :                       StreamInfo::FilterState::LifeSpan::Connection),
     846             :       request_response_timespan_(new Stats::HistogramCompletableTimespanImpl(
     847             :           connection_manager_.stats_.named_.downstream_rq_time_, connection_manager_.timeSource())),
     848             :       header_validator_(
     849         671 :           connection_manager.config_.makeHeaderValidator(connection_manager.codec_->protocol())) {
     850         671 :   ASSERT(!connection_manager.config_.isRoutable() ||
     851         671 :              ((connection_manager.config_.routeConfigProvider() == nullptr &&
     852         671 :                connection_manager.config_.scopedRouteConfigProvider() != nullptr &&
     853         671 :                connection_manager.config_.scopeKeyBuilder().has_value()) ||
     854         671 :               (connection_manager.config_.routeConfigProvider() != nullptr &&
     855         671 :                connection_manager.config_.scopedRouteConfigProvider() == nullptr &&
     856         671 :                !connection_manager.config_.scopeKeyBuilder().has_value())),
     857         671 :          "Either routeConfigProvider or (scopedRouteConfigProvider and scopeKeyBuilder) should be "
     858         671 :          "set in "
     859         671 :          "ConnectionManagerImpl.");
     860         671 :   for (const AccessLog::InstanceSharedPtr& access_log : connection_manager_.config_.accessLogs()) {
     861         668 :     filter_manager_.addAccessLogHandler(access_log);
     862         668 :   }
     863             : 
     864         671 :   filter_manager_.streamInfo().setStreamIdProvider(
     865         671 :       std::make_shared<HttpStreamIdProviderImpl>(*this));
     866             : 
     867         671 :   if (connection_manager_.config_.isRoutable() &&
     868         671 :       connection_manager.config_.routeConfigProvider() != nullptr) {
     869         573 :     route_config_update_requester_ =
     870         573 :         std::make_unique<ConnectionManagerImpl::RdsRouteConfigUpdateRequester>(
     871         573 :             connection_manager.config_.routeConfigProvider(), *this);
     872         605 :   } else if (connection_manager_.config_.isRoutable() &&
     873          98 :              connection_manager.config_.scopedRouteConfigProvider() != nullptr &&
     874          98 :              connection_manager.config_.scopeKeyBuilder().has_value()) {
     875           0 :     route_config_update_requester_ =
     876           0 :         std::make_unique<ConnectionManagerImpl::RdsRouteConfigUpdateRequester>(
     877           0 :             connection_manager.config_.scopedRouteConfigProvider(),
     878           0 :             connection_manager.config_.scopeKeyBuilder(), *this);
     879           0 :   }
     880         671 :   ScopeTrackerScopeState scope(this,
     881         671 :                                connection_manager_.read_callbacks_->connection().dispatcher());
     882             : 
     883         671 :   connection_manager_.stats_.named_.downstream_rq_total_.inc();
     884         671 :   connection_manager_.stats_.named_.downstream_rq_active_.inc();
     885         671 :   if (connection_manager_.codec_->protocol() == Protocol::Http2) {
     886         212 :     connection_manager_.stats_.named_.downstream_rq_http2_total_.inc();
     887         645 :   } else if (connection_manager_.codec_->protocol() == Protocol::Http3) {
     888           0 :     connection_manager_.stats_.named_.downstream_rq_http3_total_.inc();
     889         459 :   } else {
     890         459 :     connection_manager_.stats_.named_.downstream_rq_http1_total_.inc();
     891         459 :   }
     892             : 
     893         671 :   if (connection_manager_.config_.streamIdleTimeout().count()) {
     894         517 :     idle_timeout_ms_ = connection_manager_.config_.streamIdleTimeout();
     895         517 :     stream_idle_timer_ = connection_manager_.dispatcher_->createScaledTimer(
     896         517 :         Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
     897         517 :         [this]() -> void { onIdleTimeout(); });
     898         517 :     resetIdleTimer();
     899         517 :   }
     900             : 
     901         671 :   if (connection_manager_.config_.requestTimeout().count()) {
     902           0 :     std::chrono::milliseconds request_timeout = connection_manager_.config_.requestTimeout();
     903           0 :     request_timer_ =
     904           0 :         connection_manager.dispatcher_->createTimer([this]() -> void { onRequestTimeout(); });
     905           0 :     request_timer_->enableTimer(request_timeout, this);
     906           0 :   }
     907             : 
     908         671 :   if (connection_manager_.config_.requestHeadersTimeout().count()) {
     909           0 :     std::chrono::milliseconds request_headers_timeout =
     910           0 :         connection_manager_.config_.requestHeadersTimeout();
     911           0 :     request_header_timer_ =
     912           0 :         connection_manager.dispatcher_->createTimer([this]() -> void { onRequestHeaderTimeout(); });
     913           0 :     request_header_timer_->enableTimer(request_headers_timeout, this);
     914           0 :   }
     915             : 
     916         671 :   const auto max_stream_duration = connection_manager_.config_.maxStreamDuration();
     917         671 :   if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
     918           0 :     max_stream_duration_timer_ = connection_manager.dispatcher_->createTimer(
     919           0 :         [this]() -> void { onStreamMaxDurationReached(); });
     920           0 :     max_stream_duration_timer_->enableTimer(connection_manager_.config_.maxStreamDuration().value(),
     921           0 :                                             this);
     922           0 :   }
     923             : 
     924         671 :   if (connection_manager_.config_.accessLogFlushInterval().has_value()) {
     925           0 :     access_log_flush_timer_ = connection_manager.dispatcher_->createTimer([this]() -> void {
     926             :       // If the request is complete, we've already done the stream-end access-log, and shouldn't
     927             :       // do the periodic log.
     928           0 :       if (!streamInfo().requestComplete().has_value()) {
     929           0 :         filter_manager_.log(AccessLog::AccessLogType::DownstreamPeriodic);
     930           0 :         refreshAccessLogFlushTimer();
     931           0 :       }
     932           0 :       const SystemTime now = connection_manager_.timeSource().systemTime();
     933             :       // Downstream bytes meter is guaranteed to be non-null because ActiveStream and the timer
     934             :       // event are created on the same thread that sets the meter in
     935             :       // ConnectionManagerImpl::newStream.
     936           0 :       filter_manager_.streamInfo().getDownstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(
     937           0 :           now);
     938           0 :       if (auto& upstream_bytes_meter = filter_manager_.streamInfo().getUpstreamBytesMeter();
     939           0 :           upstream_bytes_meter != nullptr) {
     940           0 :         upstream_bytes_meter->takeDownstreamPeriodicLoggingSnapshot(now);
     941           0 :       }
     942           0 :     });
     943           0 :     refreshAccessLogFlushTimer();
     944           0 :   }
     945         671 : }
     946             : 
     947         671 : void ConnectionManagerImpl::ActiveStream::completeRequest() {
     948         671 :   filter_manager_.streamInfo().onRequestComplete();
     949             : 
     950         671 :   if (connection_manager_.remote_close_) {
     951          64 :     filter_manager_.streamInfo().setResponseCodeDetails(
     952          64 :         StreamInfo::ResponseCodeDetails::get().DownstreamRemoteDisconnect);
     953          64 :     filter_manager_.streamInfo().setResponseFlag(
     954          64 :         StreamInfo::ResponseFlag::DownstreamConnectionTermination);
     955          64 :   }
     956         671 :   connection_manager_.stats_.named_.downstream_rq_active_.dec();
     957         671 :   if (filter_manager_.streamInfo().healthCheck()) {
     958           0 :     connection_manager_.config_.tracingStats().health_check_.inc();
     959           0 :   }
     960             : 
     961         671 :   if (active_span_) {
     962           0 :     Tracing::HttpTracerUtility::finalizeDownstreamSpan(
     963           0 :         *active_span_, request_headers_.get(), response_headers_.get(), response_trailers_.get(),
     964           0 :         filter_manager_.streamInfo(), *this);
     965           0 :   }
     966         671 :   if (state_.successful_upgrade_) {
     967           2 :     connection_manager_.stats_.named_.downstream_cx_upgrades_active_.dec();
     968           2 :   }
     969         671 : }
     970             : 
     971        2403 : void ConnectionManagerImpl::ActiveStream::resetIdleTimer() {
     972        2403 :   if (stream_idle_timer_ != nullptr) {
     973             :     // TODO(htuch): If this shows up in performance profiles, optimize by only
     974             :     // updating a timestamp here and doing periodic checks for idle timeouts
     975             :     // instead, or reducing the accuracy of timers.
     976        1825 :     stream_idle_timer_->enableTimer(idle_timeout_ms_);
     977        1825 :   }
     978        2403 : }
     979             : 
     980           0 : void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
     981           0 :   connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();
     982             : 
     983           0 :   filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::StreamIdleTimeout);
     984           0 :   sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
     985           0 :                  "stream timeout", nullptr, absl::nullopt,
     986           0 :                  StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
     987           0 : }
     988             : 
     989           0 : void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
     990           0 :   connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
     991           0 :   sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
     992           0 :                  "request timeout", nullptr, absl::nullopt,
     993           0 :                  StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
     994           0 : }
     995             : 
     996           0 : void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
     997           0 :   connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
     998           0 :   sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
     999           0 :                  "request header timeout", nullptr, absl::nullopt,
    1000           0 :                  StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
    1001           0 : }
    1002             : 
    1003           0 : void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
    1004           0 :   ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
    1005           0 :   connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
    1006           0 :   sendLocalReply(Http::Utility::maybeRequestTimeoutCode(filter_manager_.remoteDecodeComplete()),
    1007           0 :                  "downstream duration timeout", nullptr,
    1008           0 :                  Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
    1009           0 :                  StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
    1010           0 : }
    1011             : 
    1012         565 : void ConnectionManagerImpl::ActiveStream::chargeStats(const ResponseHeaderMap& headers) {
    1013         565 :   uint64_t response_code = Utility::getResponseStatus(headers);
    1014         565 :   filter_manager_.streamInfo().setResponseCode(response_code);
    1015             : 
    1016         565 :   if (filter_manager_.streamInfo().health_check_request_) {
    1017           0 :     return;
    1018           0 :   }
    1019             : 
    1020             :   // No response is sent back downstream for internal redirects, so don't charge downstream stats.
    1021         565 :   const absl::optional<std::string>& response_code_details =
    1022         565 :       filter_manager_.streamInfo().responseCodeDetails();
    1023         565 :   if (response_code_details.has_value() &&
    1024         565 :       response_code_details == Envoy::StreamInfo::ResponseCodeDetails::get().InternalRedirect) {
    1025           0 :     return;
    1026           0 :   }
    1027             : 
    1028         565 :   connection_manager_.stats_.named_.downstream_rq_completed_.inc();
    1029         565 :   connection_manager_.listener_stats_.downstream_rq_completed_.inc();
    1030         565 :   if (CodeUtility::is1xx(response_code)) {
    1031           4 :     connection_manager_.stats_.named_.downstream_rq_1xx_.inc();
    1032           4 :     connection_manager_.listener_stats_.downstream_rq_1xx_.inc();
    1033         561 :   } else if (CodeUtility::is2xx(response_code)) {
    1034         384 :     connection_manager_.stats_.named_.downstream_rq_2xx_.inc();
    1035         384 :     connection_manager_.listener_stats_.downstream_rq_2xx_.inc();
    1036         457 :   } else if (CodeUtility::is3xx(response_code)) {
    1037           0 :     connection_manager_.stats_.named_.downstream_rq_3xx_.inc();
    1038           0 :     connection_manager_.listener_stats_.downstream_rq_3xx_.inc();
    1039         177 :   } else if (CodeUtility::is4xx(response_code)) {
    1040         116 :     connection_manager_.stats_.named_.downstream_rq_4xx_.inc();
    1041         116 :     connection_manager_.listener_stats_.downstream_rq_4xx_.inc();
    1042         159 :   } else if (CodeUtility::is5xx(response_code)) {
    1043          61 :     connection_manager_.stats_.named_.downstream_rq_5xx_.inc();
    1044          61 :     connection_manager_.listener_stats_.downstream_rq_5xx_.inc();
    1045          61 :   }
    1046         565 : }
    1047             : 
    1048         507 : const Network::Connection* ConnectionManagerImpl::ActiveStream::connection() {
    1049         507 :   return &connection_manager_.read_callbacks_->connection();
    1050         507 : }
    1051             : 
    1052         507 : uint32_t ConnectionManagerImpl::ActiveStream::localPort() {
    1053         507 :   auto ip = connection()->connectionInfoProvider().localAddress()->ip();
    1054         507 :   if (ip == nullptr) {
    1055           0 :     return 0;
    1056           0 :   }
    1057         507 :   return ip->port();
    1058         507 : }
    1059             : 
    1060             : namespace {
    1061           0 : bool streamErrorOnlyErrors(absl::string_view error_details) {
    1062             :   // Pre UHV HCM did not respect stream_error_on_invalid_http_message
    1063             :   // and only sent 400 for specific errors.
    1064             :   // TODO(#28555): make these errors respect the stream_error_on_invalid_http_message
    1065           0 :   return error_details == UhvResponseCodeDetail::get().FragmentInUrlPath ||
    1066           0 :          error_details == UhvResponseCodeDetail::get().EscapedSlashesInPath ||
    1067           0 :          error_details == UhvResponseCodeDetail::get().Percent00InPath;
    1068           0 : }
    1069             : } // namespace
    1070             : 
    1071         547 : bool ConnectionManagerImpl::ActiveStream::validateHeaders() {
    1072         547 :   if (header_validator_) {
    1073           0 :     auto validation_result = header_validator_->validateRequestHeaders(*request_headers_);
    1074           0 :     bool failure = !validation_result.ok();
    1075           0 :     bool redirect = false;
    1076           0 :     bool is_grpc = Grpc::Common::hasGrpcContentType(*request_headers_);
    1077           0 :     std::string failure_details(validation_result.details());
    1078           0 :     if (!failure) {
    1079           0 :       auto transformation_result = header_validator_->transformRequestHeaders(*request_headers_);
    1080           0 :       failure = !transformation_result.ok();
    1081           0 :       redirect = transformation_result.action() ==
    1082           0 :                  Http::ServerHeaderValidator::RequestHeadersTransformationResult::Action::Redirect;
    1083           0 :       failure_details = std::string(transformation_result.details());
    1084           0 :       if (redirect && !is_grpc) {
    1085           0 :         connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
    1086           0 :       } else if (failure) {
    1087           0 :         connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
    1088           0 :       }
    1089           0 :     }
    1090           0 :     if (failure) {
    1091           0 :       std::function<void(ResponseHeaderMap & headers)> modify_headers;
    1092           0 :       Code response_code = failure_details == Http1ResponseCodeDetail::get().InvalidTransferEncoding
    1093           0 :                                ? Code::NotImplemented
    1094           0 :                                : Code::BadRequest;
    1095           0 :       absl::optional<Grpc::Status::GrpcStatus> grpc_status;
    1096           0 :       if (redirect && !is_grpc) {
    1097           0 :         response_code = Code::TemporaryRedirect;
    1098           0 :         modify_headers = [new_path = request_headers_->Path()->value().getStringView()](
    1099           0 :                              Http::ResponseHeaderMap& response_headers) -> void {
    1100           0 :           response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
    1101           0 :         };
    1102           0 :       } else if (is_grpc) {
    1103           0 :         grpc_status = Grpc::Status::WellKnownGrpcStatus::Internal;
    1104           0 :       }
    1105             : 
    1106             :       // H/2 codec was resetting requests that were rejected due to headers with underscores,
    1107             :       // instead of sending 400. Preserving this behavior for now.
    1108             :       // TODO(#24466): Make H/2 behavior consistent with H/1 and H/3.
    1109           0 :       if (failure_details == UhvResponseCodeDetail::get().InvalidUnderscore &&
    1110           0 :           connection_manager_.codec_->protocol() == Protocol::Http2) {
    1111           0 :         filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
    1112           0 :         resetStream();
    1113           0 :       } else {
    1114           0 :         sendLocalReply(response_code, "", modify_headers, grpc_status, failure_details);
    1115           0 :         if (!response_encoder_->streamErrorOnInvalidHttpMessage() &&
    1116           0 :             !streamErrorOnlyErrors(failure_details)) {
    1117           0 :           connection_manager_.handleCodecError(failure_details);
    1118           0 :         }
    1119           0 :       }
    1120           0 :       return false;
    1121           0 :     }
    1122           0 :   }
    1123             : 
    1124         547 :   return true;
    1125         547 : }
    1126             : 
    1127           6 : bool ConnectionManagerImpl::ActiveStream::validateTrailers() {
    1128           6 :   if (!header_validator_) {
    1129           6 :     return true;
    1130           6 :   }
    1131             : 
    1132           0 :   auto validation_result = header_validator_->validateRequestTrailers(*request_trailers_);
    1133           0 :   std::string failure_details(validation_result.details());
    1134           0 :   if (validation_result.ok()) {
    1135           0 :     auto transformation_result = header_validator_->transformRequestTrailers(*request_trailers_);
    1136           0 :     if (transformation_result.ok()) {
    1137           0 :       return true;
    1138           0 :     }
    1139           0 :     failure_details = std::string(transformation_result.details());
    1140           0 :   }
    1141             : 
    1142           0 :   Code response_code = Code::BadRequest;
    1143           0 :   absl::optional<Grpc::Status::GrpcStatus> grpc_status;
    1144           0 :   if (Grpc::Common::hasGrpcContentType(*request_headers_)) {
    1145           0 :     grpc_status = Grpc::Status::WellKnownGrpcStatus::Internal;
    1146           0 :   }
    1147             : 
    1148             :   // H/2 codec was resetting requests that were rejected due to headers with underscores,
    1149             :   // instead of sending 400. Preserving this behavior for now.
    1150             :   // TODO(#24466): Make H/2 behavior consistent with H/1 and H/3.
    1151           0 :   if (failure_details == UhvResponseCodeDetail::get().InvalidUnderscore &&
    1152           0 :       connection_manager_.codec_->protocol() == Protocol::Http2) {
    1153           0 :     filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
    1154           0 :     resetStream();
    1155           0 :   } else {
    1156             :     // TODO(#24735): Harmonize H/2 and H/3 behavior with H/1
    1157           0 :     if (connection_manager_.codec_->protocol() < Protocol::Http2) {
    1158           0 :       sendLocalReply(response_code, "", nullptr, grpc_status, failure_details);
    1159           0 :     } else {
    1160           0 :       filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
    1161           0 :       resetStream();
    1162           0 :     }
    1163           0 :     if (!response_encoder_->streamErrorOnInvalidHttpMessage()) {
    1164           0 :       connection_manager_.handleCodecError(failure_details);
    1165           0 :     }
    1166           0 :   }
    1167           0 :   return false;
    1168           0 : }
    1169             : 
    1170         725 : void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) {
    1171             :   // If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
    1172         725 :   if (end_stream && !filter_manager_.remoteDecodeComplete()) {
    1173         388 :     filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
    1174         388 :         connection_manager_.dispatcher_->timeSource());
    1175         388 :     ENVOY_STREAM_LOG(debug, "request end stream", *this);
    1176         388 :   }
    1177         725 : }
    1178             : 
    1179             : // Ordering in this function is complicated, but important.
    1180             : //
    1181             : // We want to do minimal work before selecting route and creating a filter
    1182             : // chain to maximize the number of requests which get custom filter behavior,
    1183             : // e.g. registering access logging.
    1184             : //
    1185             : // This must be balanced by doing sanity checking for invalid requests (one
    1186             : // can't route select properly without full headers), checking state required to
    1187             : // serve error responses (connection close, head requests, etc), and
    1188             : // modifications which may themselves affect route selection.
    1189             : void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPtr&& headers,
    1190         547 :                                                         bool end_stream) {
    1191         547 :   ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
    1192         547 :                    *headers);
    1193             :   // We only want to record this when reading the headers the first time, not when recreating
    1194             :   // a stream.
    1195         547 :   if (!filter_manager_.remoteDecodeComplete()) {
    1196         547 :     filter_manager_.streamInfo().downstreamTiming().onLastDownstreamHeaderRxByteReceived(
    1197         547 :         connection_manager_.dispatcher_->timeSource());
    1198         547 :   }
    1199         547 :   ScopeTrackerScopeState scope(this,
    1200         547 :                                connection_manager_.read_callbacks_->connection().dispatcher());
    1201         547 :   request_headers_ = std::move(headers);
    1202         547 :   filter_manager_.requestHeadersInitialized();
    1203         547 :   if (request_header_timer_ != nullptr) {
    1204           0 :     request_header_timer_->disableTimer();
    1205           0 :     request_header_timer_.reset();
    1206           0 :   }
    1207             : 
    1208             :   // Both shouldDrainConnectionUponCompletion() and is_head_request_ affect local replies: set them
    1209             :   // as early as possible.
    1210         547 :   const Protocol protocol = connection_manager_.codec_->protocol();
    1211         547 :   if (Runtime::runtimeFeatureEnabled(
    1212         547 :           "envoy.reloadable_features.http1_connection_close_header_in_redirect")) {
    1213         547 :     if (HeaderUtility::shouldCloseConnection(protocol, *request_headers_)) {
    1214             :       // Only mark the connection to be closed if the request indicates so. The connection might
    1215             :       // already be marked so before this step, in which case if shouldCloseConnection() returns
    1216             :       // false, the stream info value shouldn't be overridden.
    1217           0 :       filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
    1218           0 :     }
    1219         547 :   } else {
    1220           0 :     filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(
    1221           0 :         HeaderUtility::shouldCloseConnection(protocol, *request_headers_));
    1222           0 :   }
    1223             : 
    1224         547 :   filter_manager_.streamInfo().protocol(protocol);
    1225             : 
    1226             :   // We end the decode here to mark that the downstream stream is complete.
    1227         547 :   maybeEndDecode(end_stream);
    1228             : 
    1229         547 :   if (!validateHeaders()) {
    1230           0 :     ENVOY_STREAM_LOG(debug, "request headers validation failed\n{}", *this, *request_headers_);
    1231           0 :     return;
    1232           0 :   }
    1233             : 
    1234             :   // We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
    1235         547 :   if (connection_manager_.config_.isRoutable()) {
    1236         449 :     if (connection_manager_.config_.routeConfigProvider() != nullptr) {
    1237         449 :       snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
    1238         449 :     } else if (connection_manager_.config_.scopedRouteConfigProvider() != nullptr &&
    1239           0 :                connection_manager_.config_.scopeKeyBuilder().has_value()) {
    1240           0 :       snapped_scoped_routes_config_ =
    1241           0 :           connection_manager_.config_.scopedRouteConfigProvider()->config<Router::ScopedConfig>();
    1242           0 :       snapScopedRouteConfig();
    1243           0 :     }
    1244         486 :   } else {
    1245          98 :     snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
    1246          98 :   }
    1247             : 
    1248             :   // Drop new requests when overloaded as soon as we have decoded the headers.
    1249         547 :   const bool drop_request_due_to_overload =
    1250         547 :       (connection_manager_.accept_new_http_stream_ != nullptr &&
    1251         547 :        connection_manager_.accept_new_http_stream_->shouldShedLoad()) ||
    1252         547 :       connection_manager_.random_generator_.bernoulli(
    1253         547 :           connection_manager_.overload_stop_accepting_requests_ref_.value());
    1254             : 
    1255         547 :   if (drop_request_due_to_overload) {
    1256             :     // In this one special case, do not create the filter chain. If there is a risk of memory
    1257             :     // overload it is more important to avoid unnecessary allocation than to create the filters.
    1258           0 :     filter_manager_.skipFilterChainCreation();
    1259           0 :     connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
    1260           0 :     sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, absl::nullopt,
    1261           0 :                    StreamInfo::ResponseCodeDetails::get().Overload);
    1262           0 :     return;
    1263           0 :   }
    1264             : 
    1265         547 :   if (!connection_manager_.config_.proxy100Continue() && request_headers_->Expect() &&
    1266             :       // The Expect field-value is case-insensitive.
    1267             :       // https://tools.ietf.org/html/rfc7231#section-5.1.1
    1268         547 :       absl::EqualsIgnoreCase((request_headers_->Expect()->value().getStringView()),
    1269           0 :                              Headers::get().ExpectValues._100Continue)) {
    1270             :     // Note in the case Envoy is handling 100-Continue complexity, it skips the filter chain
    1271             :     // and sends the 100-Continue directly to the encoder.
    1272           0 :     chargeStats(continueHeader());
    1273           0 :     response_encoder_->encode1xxHeaders(continueHeader());
    1274             :     // Remove the Expect header so it won't be handled again upstream.
    1275           0 :     request_headers_->removeExpect();
    1276           0 :   }
    1277             : 
    1278         547 :   connection_manager_.user_agent_.initializeFromHeaders(*request_headers_,
    1279         547 :                                                         connection_manager_.stats_.prefixStatName(),
    1280         547 :                                                         connection_manager_.stats_.scope_);
    1281             : 
    1282         547 :   if (!request_headers_->Host()) {
    1283             :     // Require host header. For HTTP/1.1 Host has already been translated to :authority.
    1284           6 :     sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
    1285           6 :                    StreamInfo::ResponseCodeDetails::get().MissingHost);
    1286           6 :     return;
    1287           6 :   }
    1288             : 
    1289             :   // Apply header sanity checks.
    1290         541 :   absl::optional<std::reference_wrapper<const absl::string_view>> error =
    1291         541 :       HeaderUtility::requestHeadersValid(*request_headers_);
    1292         541 :   if (error != absl::nullopt) {
    1293           2 :     sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt, error.value().get());
    1294           2 :     if (!response_encoder_->streamErrorOnInvalidHttpMessage()) {
    1295           2 :       connection_manager_.handleCodecError(error.value().get());
    1296           2 :     }
    1297           2 :     return;
    1298           2 :   }
    1299             : 
    1300             :   // Check for the existence of the :path header for non-CONNECT requests, or present-but-empty
    1301             :   // :path header for CONNECT requests. We expect the codec to have broken the path into pieces if
    1302             :   // applicable. NOTE: Currently the HTTP/1.1 codec only does this when the allow_absolute_url flag
    1303             :   // is enabled on the HCM.
    1304         539 :   if ((!HeaderUtility::isConnect(*request_headers_) || request_headers_->Path()) &&
    1305         539 :       request_headers_->getPathValue().empty()) {
    1306          32 :     sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
    1307          32 :                    StreamInfo::ResponseCodeDetails::get().MissingPath);
    1308          32 :     return;
    1309          32 :   }
    1310             : 
    1311             :   // Rewrites the host of CONNECT-UDP requests.
    1312         507 :   if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.enable_connect_udp_support") &&
    1313         507 :       HeaderUtility::isConnectUdpRequest(*request_headers_) &&
    1314         507 :       !HeaderUtility::rewriteAuthorityForConnectUdp(*request_headers_)) {
    1315           0 :     sendLocalReply(Code::NotFound, "The path is incorrect for CONNECT-UDP", nullptr, absl::nullopt,
    1316           0 :                    StreamInfo::ResponseCodeDetails::get().InvalidPath);
    1317           0 :     return;
    1318           0 :   }
    1319             : 
    1320             :   // Currently we only support relative paths at the application layer.
    1321         507 :   if (!request_headers_->getPathValue().empty() && request_headers_->getPathValue()[0] != '/') {
    1322           0 :     connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
    1323           0 :     sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
    1324           0 :                    StreamInfo::ResponseCodeDetails::get().AbsolutePath);
    1325           0 :     return;
    1326           0 :   }
    1327             : 
    1328         507 : #ifndef ENVOY_ENABLE_UHV
    1329             :   // In UHV mode path normalization is done in the UHV
    1330             :   // Path sanitization should happen before any path access other than the above sanity check.
    1331         507 :   const auto action =
    1332         507 :       ConnectionManagerUtility::maybeNormalizePath(*request_headers_, connection_manager_.config_);
    1333             :   // gRPC requests are rejected if Envoy is configured to redirect post-normalization. This is
    1334             :   // because gRPC clients do not support redirect.
    1335         507 :   if (action == ConnectionManagerUtility::NormalizePathAction::Reject ||
    1336         507 :       (action == ConnectionManagerUtility::NormalizePathAction::Redirect &&
    1337         507 :        Grpc::Common::hasGrpcContentType(*request_headers_))) {
    1338           0 :     connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
    1339           0 :     sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
    1340           0 :                    StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
    1341           0 :     return;
    1342         507 :   } else if (action == ConnectionManagerUtility::NormalizePathAction::Redirect) {
    1343           0 :     connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
    1344           0 :     sendLocalReply(
    1345           0 :         Code::TemporaryRedirect, "",
    1346           0 :         [new_path = request_headers_->Path()->value().getStringView()](
    1347           0 :             Http::ResponseHeaderMap& response_headers) -> void {
    1348           0 :           response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
    1349           0 :         },
    1350           0 :         absl::nullopt, StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
    1351           0 :     return;
    1352           0 :   }
    1353             : 
    1354         507 :   ASSERT(action == ConnectionManagerUtility::NormalizePathAction::Continue);
    1355         507 : #endif
    1356         507 :   auto optional_port = ConnectionManagerUtility::maybeNormalizeHost(
    1357         507 :       *request_headers_, connection_manager_.config_, localPort());
    1358         507 :   if (optional_port.has_value() &&
    1359         507 :       requestWasConnect(request_headers_, connection_manager_.codec_->protocol())) {
    1360           0 :     filter_manager_.streamInfo().filterState()->setData(
    1361           0 :         Router::OriginalConnectPort::key(),
    1362           0 :         std::make_unique<Router::OriginalConnectPort>(optional_port.value()),
    1363           0 :         StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Request);
    1364           0 :   }
    1365             : 
    1366         507 :   if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
    1367             :     // Modify the downstream remote address depending on configuration and headers.
    1368         507 :     const auto mutate_result = ConnectionManagerUtility::mutateRequestHeaders(
    1369         507 :         *request_headers_, connection_manager_.read_callbacks_->connection(),
    1370         507 :         connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_,
    1371         507 :         filter_manager_.streamInfo());
    1372             : 
    1373             :     // IP detection failed, reject the request.
    1374         507 :     if (mutate_result.reject_request.has_value()) {
    1375           0 :       const auto& reject_request_params = mutate_result.reject_request.value();
    1376           0 :       connection_manager_.stats_.named_.downstream_rq_rejected_via_ip_detection_.inc();
    1377           0 :       sendLocalReply(reject_request_params.response_code, reject_request_params.body, nullptr,
    1378           0 :                      absl::nullopt,
    1379           0 :                      StreamInfo::ResponseCodeDetails::get().OriginalIPDetectionFailed);
    1380           0 :       return;
    1381           0 :     }
    1382             : 
    1383         507 :     filter_manager_.setDownstreamRemoteAddress(mutate_result.final_remote_address);
    1384         507 :   }
    1385         507 :   ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);
    1386             : 
    1387         507 :   ASSERT(!cached_route_);
    1388         507 :   refreshCachedRoute();
    1389             : 
    1390         507 :   if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
    1391         507 :     filter_manager_.streamInfo().setTraceReason(
    1392         507 :         ConnectionManagerUtility::mutateTracingRequestHeader(
    1393         507 :             *request_headers_, connection_manager_.runtime_, connection_manager_.config_,
    1394         507 :             cached_route_.value().get()));
    1395         507 :   }
    1396             : 
    1397         507 :   filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
    1398             : 
    1399         507 :   const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
    1400             : 
    1401         507 :   if (connection_manager_.config_.flushAccessLogOnNewRequest()) {
    1402           0 :     filter_manager_.log(AccessLog::AccessLogType::DownstreamStart);
    1403           0 :   }
    1404             : 
    1405             :   // TODO if there are no filters when starting a filter iteration, the connection manager
    1406             :   // should return 404. The current returns no response if there is no router filter.
    1407         507 :   if (hasCachedRoute()) {
    1408             :     // Do not allow upgrades if the route does not support it.
    1409         409 :     if (upgrade_rejected) {
    1410             :       // While downstream servers should not send upgrade payload without the upgrade being
    1411             :       // accepted, err on the side of caution and refuse to process any further requests on this
    1412             :       // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
    1413             :       // contains a smuggled HTTP request.
    1414           0 :       filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
    1415           0 :       connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
    1416           0 :       sendLocalReply(Code::Forbidden, "", nullptr, absl::nullopt,
    1417           0 :                      StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
    1418           0 :       return;
    1419           0 :     }
    1420             :     // Allow non websocket requests to go through websocket enabled routes.
    1421         409 :   }
    1422             : 
    1423             :   // Check if tracing is enabled.
    1424         507 :   if (connection_manager_tracing_config_.has_value()) {
    1425           0 :     traceRequest();
    1426           0 :   }
    1427             : 
    1428         507 :   if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
    1429         507 :     filter_manager_.decodeHeaders(*request_headers_, end_stream);
    1430         507 :   } else {
    1431           0 :     state_.deferred_to_next_io_iteration_ = true;
    1432           0 :     state_.deferred_end_stream_ = end_stream;
    1433           0 :   }
    1434             : 
    1435             :   // Reset it here for both global and overridden cases.
    1436         507 :   resetIdleTimer();
    1437         507 : }
    1438             : 
    1439           0 : void ConnectionManagerImpl::ActiveStream::traceRequest() {
    1440           0 :   const Tracing::Decision tracing_decision =
    1441           0 :       Tracing::TracerUtility::shouldTraceRequest(filter_manager_.streamInfo());
    1442           0 :   ConnectionManagerImpl::chargeTracingStats(tracing_decision.reason,
    1443           0 :                                             connection_manager_.config_.tracingStats());
    1444             : 
    1445           0 :   Tracing::HttpTraceContext trace_context(*request_headers_);
    1446           0 :   active_span_ = connection_manager_.tracer().startSpan(
    1447           0 :       *this, trace_context, filter_manager_.streamInfo(), tracing_decision);
    1448             : 
    1449           0 :   if (!active_span_) {
    1450           0 :     return;
    1451           0 :   }
    1452             : 
    1453             :   // TODO: Need to investigate the following code based on the cached route, as may
    1454             :   // be broken in the case a filter changes the route.
    1455             : 
    1456             :   // If a decorator has been defined, apply it to the active span.
    1457           0 :   if (hasCachedRoute() && cached_route_.value()->decorator()) {
    1458           0 :     const Router::Decorator* decorator = cached_route_.value()->decorator();
    1459             : 
    1460           0 :     decorator->apply(*active_span_);
    1461             : 
    1462           0 :     state_.decorated_propagate_ = decorator->propagate();
    1463             : 
    1464             :     // Cache decorated operation.
    1465           0 :     if (!decorator->getOperation().empty()) {
    1466           0 :       decorated_operation_ = &decorator->getOperation();
    1467           0 :     }
    1468           0 :   }
    1469             : 
    1470           0 :   if (connection_manager_tracing_config_->operation_name_ == Tracing::OperationName::Egress) {
    1471             :     // For egress (outbound) requests, pass the decorator's operation name (if defined and
    1472             :     // propagation enabled) as a request header to enable the receiving service to use it in its
    1473             :     // server span.
    1474           0 :     if (decorated_operation_ && state_.decorated_propagate_) {
    1475           0 :       request_headers_->setEnvoyDecoratorOperation(*decorated_operation_);
    1476           0 :     }
    1477           0 :   } else {
    1478           0 :     const HeaderEntry* req_operation_override = request_headers_->EnvoyDecoratorOperation();
    1479             : 
    1480             :     // For ingress (inbound) requests, if a decorator operation name has been provided, it
    1481             :     // should be used to override the active span's operation.
    1482           0 :     if (req_operation_override) {
    1483           0 :       if (!req_operation_override->value().empty()) {
    1484           0 :         active_span_->setOperation(req_operation_override->value().getStringView());
    1485             : 
    1486             :         // Clear the decorated operation so won't be used in the response header, as
    1487             :         // it has been overridden by the inbound decorator operation request header.
    1488           0 :         decorated_operation_ = nullptr;
    1489           0 :       }
    1490             :       // Remove header so not propagated to service
    1491           0 :       request_headers_->removeEnvoyDecoratorOperation();
    1492           0 :     }
    1493           0 :   }
    1494           0 : }
    1495             : 
    1496         172 : void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) {
    1497         172 :   ScopeTrackerScopeState scope(this,
    1498         172 :                                connection_manager_.read_callbacks_->connection().dispatcher());
    1499         172 :   maybeEndDecode(end_stream);
    1500         172 :   filter_manager_.streamInfo().addBytesReceived(data.length());
    1501         172 :   if (!state_.deferred_to_next_io_iteration_) {
    1502         172 :     filter_manager_.decodeData(data, end_stream);
    1503         172 :   } else {
    1504           0 :     if (!deferred_data_) {
    1505           0 :       deferred_data_ = std::make_unique<Buffer::OwnedImpl>();
    1506           0 :     }
    1507           0 :     deferred_data_->move(data);
    1508           0 :     state_.deferred_end_stream_ = end_stream;
    1509           0 :   }
    1510         172 : }
    1511             : 
    1512           6 : void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) {
    1513           6 :   ENVOY_STREAM_LOG(debug, "request trailers complete:\n{}", *this, *trailers);
    1514           6 :   ScopeTrackerScopeState scope(this,
    1515           6 :                                connection_manager_.read_callbacks_->connection().dispatcher());
    1516           6 :   resetIdleTimer();
    1517             : 
    1518           6 :   ASSERT(!request_trailers_);
    1519           6 :   request_trailers_ = std::move(trailers);
    1520           6 :   if (!validateTrailers()) {
    1521           0 :     ENVOY_STREAM_LOG(debug, "request trailers validation failed:\n{}", *this, *request_trailers_);
    1522           0 :     return;
    1523           0 :   }
    1524           6 :   maybeEndDecode(true);
    1525           6 :   if (!state_.deferred_to_next_io_iteration_) {
    1526           6 :     filter_manager_.decodeTrailers(*request_trailers_);
    1527           6 :   }
    1528           6 : }
    1529             : 
    1530           0 : void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
    1531           0 :   resetIdleTimer();
    1532           0 :   if (!state_.deferred_to_next_io_iteration_) {
    1533             :     // After going through filters, the ownership of metadata_map will be passed to terminal filter.
    1534             :     // The terminal filter may encode metadata_map to the next hop immediately or store metadata_map
    1535             :     // and encode later when connection pool is ready.
    1536           0 :     filter_manager_.decodeMetadata(*metadata_map);
    1537           0 :   } else {
    1538           0 :     deferred_metadata_.push(std::move(metadata_map));
    1539           0 :   }
    1540           0 : }
    1541             : 
    1542        1728 : void ConnectionManagerImpl::ActiveStream::disarmRequestTimeout() {
    1543        1728 :   if (request_timer_) {
    1544           0 :     request_timer_->disableTimer();
    1545           0 :   }
    1546        1728 : }
    1547             : 
    1548           0 : void ConnectionManagerImpl::startDrainSequence() {
    1549           0 :   ASSERT(drain_state_ == DrainState::NotDraining);
    1550           0 :   drain_state_ = DrainState::Draining;
    1551           0 :   codec_->shutdownNotice();
    1552           0 :   drain_timer_ = dispatcher_->createTimer([this]() -> void { onDrainTimeout(); });
    1553           0 :   drain_timer_->enableTimer(config_.drainTimeout());
    1554           0 : }
    1555             : 
    1556           0 : void ConnectionManagerImpl::ActiveStream::snapScopedRouteConfig() {
    1557             :   // NOTE: if a RDS subscription hasn't got a RouteConfiguration back, a Router::NullConfigImpl is
    1558             :   // returned, in that case we let it pass.
    1559           0 :   auto scope_key =
    1560           0 :       connection_manager_.config_.scopeKeyBuilder()->computeScopeKey(*request_headers_);
    1561           0 :   snapped_route_config_ = snapped_scoped_routes_config_->getRouteConfig(scope_key);
    1562           0 :   if (snapped_route_config_ == nullptr) {
    1563           0 :     ENVOY_STREAM_LOG(trace, "can't find SRDS scope.", *this);
    1564             :     // TODO(stevenzzzz): Consider to pass an error message to router filter, so that it can
    1565             :     // send back 404 with some more details.
    1566           0 :     snapped_route_config_ = std::make_shared<Router::NullConfigImpl>();
    1567           0 :   }
    1568           0 : }
    1569             : 
    1570         507 : void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { refreshCachedRoute(nullptr); }
    1571             : 
    1572         628 : void ConnectionManagerImpl::ActiveStream::refreshDurationTimeout() {
    1573         628 :   if (!filter_manager_.streamInfo().route() ||
    1574         628 :       !filter_manager_.streamInfo().route()->routeEntry() || !request_headers_) {
    1575         387 :     return;
    1576         387 :   }
    1577         241 :   const auto& route = filter_manager_.streamInfo().route()->routeEntry();
    1578             : 
    1579         241 :   auto grpc_timeout = Grpc::Common::getGrpcTimeout(*request_headers_);
    1580         241 :   std::chrono::milliseconds timeout;
    1581         241 :   bool disable_timer = false;
    1582             : 
    1583         241 :   if (!grpc_timeout || !route->grpcTimeoutHeaderMax()) {
    1584             :     // Either there is no grpc-timeout header or special timeouts for it are not
    1585             :     // configured. Use stream duration.
    1586         241 :     if (route->maxStreamDuration()) {
    1587           0 :       timeout = route->maxStreamDuration().value();
    1588           0 :       if (timeout == std::chrono::milliseconds(0)) {
    1589             :         // Explicitly configured 0 means no timeout.
    1590           0 :         disable_timer = true;
    1591           0 :       }
    1592         241 :     } else {
    1593             :       // Fall back to HCM config. If no HCM duration limit exists, disable
    1594             :       // timers set by any prior route configuration.
    1595         241 :       const auto max_stream_duration = connection_manager_.config_.maxStreamDuration();
    1596         241 :       if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
    1597           0 :         timeout = max_stream_duration.value();
    1598         241 :       } else {
    1599         241 :         disable_timer = true;
    1600         241 :       }
    1601         241 :     }
    1602         241 :   } else {
    1603             :     // Start with the timeout equal to the gRPC timeout header.
    1604           0 :     timeout = grpc_timeout.value();
    1605             :     // If there's a valid cap, apply it.
    1606           0 :     if (timeout > route->grpcTimeoutHeaderMax().value() &&
    1607           0 :         route->grpcTimeoutHeaderMax().value() != std::chrono::milliseconds(0)) {
    1608           0 :       timeout = route->grpcTimeoutHeaderMax().value();
    1609           0 :     }
    1610             : 
    1611             :     // Apply the configured offset.
    1612           0 :     if (timeout != std::chrono::milliseconds(0) && route->grpcTimeoutHeaderOffset()) {
    1613           0 :       const auto offset = route->grpcTimeoutHeaderOffset().value();
    1614           0 :       if (offset < timeout) {
    1615           0 :         timeout -= offset;
    1616           0 :       } else {
    1617           0 :         timeout = std::chrono::milliseconds(0);
    1618           0 :       }
    1619           0 :     }
    1620           0 :   }
    1621             : 
    1622             :   // Disable any existing timer if configured to do so.
    1623         241 :   if (disable_timer) {
    1624         241 :     if (max_stream_duration_timer_) {
    1625           0 :       max_stream_duration_timer_->disableTimer();
    1626           0 :       if (route->usingNewTimeouts() && Grpc::Common::isGrpcRequestHeaders(*request_headers_)) {
    1627           0 :         request_headers_->removeGrpcTimeout();
    1628           0 :       }
    1629           0 :     }
    1630         241 :     return;
    1631         241 :   }
    1632             : 
    1633             :   // Set the header timeout before doing used-time adjustments.
    1634             :   // This may result in the upstream not getting the latest results, but also
    1635             :   // avoids every request getting a custom timeout based on envoy think time.
    1636           0 :   if (route->usingNewTimeouts() && Grpc::Common::isGrpcRequestHeaders(*request_headers_)) {
    1637           0 :     Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(timeout), *request_headers_);
    1638           0 :   }
    1639             : 
    1640             :   // See how long this stream has been alive, and adjust the timeout
    1641             :   // accordingly.
    1642           0 :   std::chrono::duration time_used = std::chrono::duration_cast<std::chrono::milliseconds>(
    1643           0 :       connection_manager_.timeSource().monotonicTime() -
    1644           0 :       filter_manager_.streamInfo().startTimeMonotonic());
    1645           0 :   if (timeout > time_used) {
    1646           0 :     timeout -= time_used;
    1647           0 :   } else {
    1648           0 :     timeout = std::chrono::milliseconds(0);
    1649           0 :   }
    1650             : 
    1651             :   // Finally create (if necessary) and enable the timer.
    1652           0 :   if (!max_stream_duration_timer_) {
    1653           0 :     max_stream_duration_timer_ = connection_manager_.dispatcher_->createTimer(
    1654           0 :         [this]() -> void { onStreamMaxDurationReached(); });
    1655           0 :   }
    1656           0 :   max_stream_duration_timer_->enableTimer(timeout);
    1657           0 : }
    1658             : 
    1659         628 : void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
    1660             :   // If the cached route is blocked then any attempt to clear it or refresh it
    1661             :   // will be ignored.
    1662         628 :   if (routeCacheBlocked()) {
    1663           0 :     return;
    1664           0 :   }
    1665             : 
    1666         628 :   Router::RouteConstSharedPtr route;
    1667         628 :   if (request_headers_ != nullptr) {
    1668         547 :     if (connection_manager_.config_.isRoutable() &&
    1669         547 :         connection_manager_.config_.scopedRouteConfigProvider() != nullptr &&
    1670         547 :         connection_manager_.config_.scopeKeyBuilder().has_value()) {
    1671             :       // NOTE: re-select scope as well in case the scope key header has been changed by a filter.
    1672           0 :       snapScopedRouteConfig();
    1673           0 :     }
    1674         547 :     if (snapped_route_config_ != nullptr) {
    1675         547 :       route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
    1676         547 :                                            stream_id_);
    1677         547 :     }
    1678         547 :   }
    1679             : 
    1680         628 :   setRoute(route);
    1681         628 : }
    1682             : 
    1683         628 : void ConnectionManagerImpl::ActiveStream::refreshCachedTracingCustomTags() {
    1684         628 :   if (!connection_manager_tracing_config_.has_value()) {
    1685         628 :     return;
    1686         628 :   }
    1687           0 :   const Tracing::CustomTagMap& conn_manager_tags = connection_manager_tracing_config_->custom_tags_;
    1688           0 :   const Tracing::CustomTagMap* route_tags = nullptr;
    1689           0 :   if (hasCachedRoute() && cached_route_.value()->tracingConfig()) {
    1690           0 :     route_tags = &cached_route_.value()->tracingConfig()->getCustomTags();
    1691           0 :   }
    1692           0 :   const bool configured_in_conn = !conn_manager_tags.empty();
    1693           0 :   const bool configured_in_route = route_tags && !route_tags->empty();
    1694           0 :   if (!configured_in_conn && !configured_in_route) {
    1695           0 :     return;
    1696           0 :   }
    1697           0 :   Tracing::CustomTagMap& custom_tag_map = getOrMakeTracingCustomTagMap();
    1698           0 :   if (configured_in_route) {
    1699           0 :     custom_tag_map.insert(route_tags->begin(), route_tags->end());
    1700           0 :   }
    1701           0 :   if (configured_in_conn) {
    1702           0 :     custom_tag_map.insert(conn_manager_tags.begin(), conn_manager_tags.end());
    1703           0 :   }
    1704           0 : }
    1705             : 
    1706             : // TODO(chaoqin-li1123): Make on demand vhds and on demand srds works at the same time.
    1707             : void ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate(
    1708           0 :     Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
    1709           0 :   route_config_update_requester_->requestRouteConfigUpdate(route_config_updated_cb);
    1710           0 : }
    1711             : 
    1712           0 : absl::optional<Router::ConfigConstSharedPtr> ConnectionManagerImpl::ActiveStream::routeConfig() {
    1713           0 :   if (connection_manager_.config_.routeConfigProvider() != nullptr) {
    1714           0 :     return {connection_manager_.config_.routeConfigProvider()->configCast()};
    1715           0 :   }
    1716           0 :   return {};
    1717           0 : }
    1718             : 
    1719         383 : void ConnectionManagerImpl::ActiveStream::onLocalReply(Code code) {
    1720             :   // The BadRequest error code indicates there has been a messaging error.
    1721         383 :   if (code == Http::Code::BadRequest && connection_manager_.codec_->protocol() < Protocol::Http2 &&
    1722         383 :       !response_encoder_->streamErrorOnInvalidHttpMessage()) {
    1723          67 :     filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
    1724          67 :   }
    1725         383 : }
    1726             : 
    1727           3 : void ConnectionManagerImpl::ActiveStream::encode1xxHeaders(ResponseHeaderMap& response_headers) {
    1728             :   // Strip the T-E headers etc. Defer other header additions as well as drain-close logic to the
    1729             :   // continuation headers.
    1730           3 :   ConnectionManagerUtility::mutateResponseHeaders(
    1731           3 :       response_headers, request_headers_.get(), connection_manager_.config_, EMPTY_STRING,
    1732           3 :       filter_manager_.streamInfo(), connection_manager_.proxy_name_,
    1733           3 :       connection_manager_.clear_hop_by_hop_response_headers_);
    1734             : 
    1735             :   // Count both the 1xx and follow-up response code in stats.
    1736           3 :   chargeStats(response_headers);
    1737             : 
    1738           3 :   ENVOY_STREAM_LOG(debug, "encoding 100 continue headers via codec:\n{}", *this, response_headers);
    1739             : 
    1740             :   // Now actually encode via the codec.
    1741           3 :   response_encoder_->encode1xxHeaders(response_headers);
    1742           3 : }
    1743             : 
    1744             : void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& headers,
    1745         562 :                                                         bool end_stream) {
    1746             :   // Base headers.
    1747             : 
    1748             :   // We want to preserve the original date header, but we add a date header if it is absent
    1749         562 :   if (!headers.Date()) {
    1750         562 :     connection_manager_.config_.dateProvider().setDateHeader(headers);
    1751         562 :   }
    1752             : 
    1753             :   // Following setReference() is safe because serverName() is constant for the life of the
    1754             :   // listener.
    1755         562 :   const auto transformation = connection_manager_.config_.serverHeaderTransformation();
    1756         562 :   if (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::OVERWRITE ||
    1757         562 :       (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::APPEND_IF_ABSENT &&
    1758         562 :        headers.Server() == nullptr)) {
    1759         562 :     headers.setReferenceServer(connection_manager_.config_.serverName());
    1760         562 :   }
    1761         562 :   ConnectionManagerUtility::mutateResponseHeaders(
    1762         562 :       headers, request_headers_.get(), connection_manager_.config_,
    1763         562 :       connection_manager_.config_.via(), filter_manager_.streamInfo(),
    1764         562 :       connection_manager_.proxy_name_, connection_manager_.clear_hop_by_hop_response_headers_);
    1765             : 
    1766         562 :   bool drain_connection_due_to_overload = false;
    1767         562 :   if (connection_manager_.drain_state_ == DrainState::NotDraining &&
    1768         562 :       connection_manager_.random_generator_.bernoulli(
    1769         547 :           connection_manager_.overload_disable_keepalive_ref_.value())) {
    1770           0 :     ENVOY_STREAM_LOG(debug, "disabling keepalive due to envoy overload", *this);
    1771           0 :     drain_connection_due_to_overload = true;
    1772           0 :     connection_manager_.stats_.named_.downstream_cx_overload_disable_keepalive_.inc();
    1773           0 :   }
    1774             : 
    1775             :   // See if we want to drain/close the connection. Send the go away frame prior to encoding the
    1776             :   // header block.
    1777         562 :   if (connection_manager_.drain_state_ == DrainState::NotDraining &&
    1778         562 :       (connection_manager_.drain_close_.drainClose() || drain_connection_due_to_overload)) {
    1779             : 
    1780             :     // This doesn't really do anything for HTTP/1.1 other then give the connection another boost
    1781             :     // of time to race with incoming requests. For HTTP/2 connections, send a GOAWAY frame to
    1782             :     // prevent any new streams.
    1783           0 :     connection_manager_.startDrainSequence();
    1784           0 :     connection_manager_.stats_.named_.downstream_cx_drain_close_.inc();
    1785           0 :     ENVOY_STREAM_LOG(debug, "drain closing connection", *this);
    1786           0 :   }
    1787             : 
    1788         562 :   if (connection_manager_.codec_->protocol() == Protocol::Http10) {
    1789             :     // As HTTP/1.0 and below can not do chunked encoding, if there is no content
    1790             :     // length the response will be framed by connection close.
    1791          19 :     if (!headers.ContentLength()) {
    1792           0 :       filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
    1793           0 :     }
    1794             :     // If the request came with a keep-alive and no other factor resulted in a
    1795             :     // connection close header, send an explicit keep-alive header.
    1796          19 :     if (!filter_manager_.streamInfo().shouldDrainConnectionUponCompletion()) {
    1797          19 :       headers.setConnection(Headers::get().ConnectionValues.KeepAlive);
    1798          19 :     }
    1799          19 :   }
    1800             : 
    1801         562 :   if (connection_manager_.drain_state_ == DrainState::NotDraining &&
    1802         562 :       filter_manager_.streamInfo().shouldDrainConnectionUponCompletion()) {
    1803          67 :     ENVOY_STREAM_LOG(debug, "closing connection due to connection close header", *this);
    1804          67 :     connection_manager_.drain_state_ = DrainState::Closing;
    1805          67 :   }
    1806             : 
    1807             :   // If we are destroying a stream before remote is complete and the connection does not support
    1808             :   // multiplexing, we should disconnect since we don't want to wait around for the request to
    1809             :   // finish.
    1810         562 :   if (!filter_manager_.remoteDecodeComplete()) {
    1811         234 :     if (connection_manager_.codec_->protocol() < Protocol::Http2) {
    1812         232 :       connection_manager_.drain_state_ = DrainState::Closing;
    1813         232 :     }
    1814             : 
    1815         234 :     connection_manager_.stats_.named_.downstream_rq_response_before_rq_complete_.inc();
    1816         234 :   }
    1817             : 
    1818         562 :   if (Utility::isUpgrade(headers) ||
    1819         562 :       HeaderUtility::isConnectResponse(request_headers_.get(), *responseHeaders())) {
    1820           1 :     state_.is_tunneling_ = true;
    1821           1 :   }
    1822             : 
    1823             :   // Block route cache if the response headers is received and processed. Because after this
    1824             :   // point, the cached route should never be updated or refreshed.
    1825         562 :   blockRouteCache();
    1826             : 
    1827         562 :   if (connection_manager_.drain_state_ != DrainState::NotDraining &&
    1828         562 :       connection_manager_.codec_->protocol() < Protocol::Http2) {
    1829             :     // If the connection manager is draining send "Connection: Close" on HTTP/1.1 connections.
    1830             :     // Do not do this for H2 (which drains via GOAWAY) or Upgrade or CONNECT (as the
    1831             :     // payload is no longer HTTP/1.1)
    1832         241 :     if (!state_.is_tunneling_) {
    1833         240 :       headers.setReferenceConnection(Headers::get().ConnectionValues.Close);
    1834         240 :     }
    1835         241 :   }
    1836             : 
    1837         562 :   if (connection_manager_tracing_config_.has_value()) {
    1838           0 :     if (connection_manager_tracing_config_->operation_name_ == Tracing::OperationName::Ingress) {
    1839             :       // For ingress (inbound) responses, if the request headers do not include a
    1840             :       // decorator operation (override), and the decorated operation should be
    1841             :       // propagated, then pass the decorator's operation name (if defined)
    1842             :       // as a response header to enable the client service to use it in its client span.
    1843           0 :       if (decorated_operation_ && state_.decorated_propagate_) {
    1844           0 :         headers.setEnvoyDecoratorOperation(*decorated_operation_);
    1845           0 :       }
    1846           0 :     } else if (connection_manager_tracing_config_->operation_name_ ==
    1847           0 :                Tracing::OperationName::Egress) {
    1848           0 :       const HeaderEntry* resp_operation_override = headers.EnvoyDecoratorOperation();
    1849             : 
    1850             :       // For Egress (outbound) response, if a decorator operation name has been provided, it
    1851             :       // should be used to override the active span's operation.
    1852           0 :       if (resp_operation_override) {
    1853           0 :         if (!resp_operation_override->value().empty() && active_span_) {
    1854           0 :           active_span_->setOperation(resp_operation_override->value().getStringView());
    1855           0 :         }
    1856             :         // Remove header so not propagated to service.
    1857           0 :         headers.removeEnvoyDecoratorOperation();
    1858           0 :       }
    1859           0 :     }
    1860           0 :   }
    1861             : 
    1862         562 :   chargeStats(headers);
    1863             : 
    1864         562 :   if (state_.is_tunneling_ &&
    1865         562 :       connection_manager_.config_.flushAccessLogOnTunnelSuccessfullyEstablished()) {
    1866           0 :     filter_manager_.log(AccessLog::AccessLogType::DownstreamTunnelSuccessfullyEstablished);
    1867           0 :   }
    1868         562 :   ENVOY_STREAM_LOG(debug, "encoding headers via codec (end_stream={}):\n{}", *this, end_stream,
    1869         562 :                    headers);
    1870             : 
    1871         562 :   filter_manager_.streamInfo().downstreamTiming().onFirstDownstreamTxByteSent(
    1872         562 :       connection_manager_.time_source_);
    1873             : 
    1874         562 :   if (header_validator_) {
    1875           0 :     auto result = header_validator_->transformResponseHeaders(headers);
    1876           0 :     if (!result.status.ok()) {
    1877             :       // It is possible that the header map is invalid if an encoder filter makes invalid
    1878             :       // modifications
    1879             :       // TODO(yanavlasov): add handling for this case.
    1880           0 :     } else if (result.new_headers) {
    1881           0 :       response_encoder_->encodeHeaders(*result.new_headers, end_stream);
    1882           0 :       return;
    1883           0 :     }
    1884           0 :   }
    1885             : 
    1886             :   // Now actually encode via the codec.
    1887         562 :   response_encoder_->encodeHeaders(headers, end_stream);
    1888         562 : }
    1889             : 
    1890         582 : void ConnectionManagerImpl::ActiveStream::encodeData(Buffer::Instance& data, bool end_stream) {
    1891         582 :   ENVOY_STREAM_LOG(trace, "encoding data via codec (size={} end_stream={})", *this, data.length(),
    1892         582 :                    end_stream);
    1893             : 
    1894         582 :   filter_manager_.streamInfo().addBytesSent(data.length());
    1895         582 :   response_encoder_->encodeData(data, end_stream);
    1896         582 : }
    1897             : 
    1898           1 : void ConnectionManagerImpl::ActiveStream::encodeTrailers(ResponseTrailerMap& trailers) {
    1899           1 :   ENVOY_STREAM_LOG(debug, "encoding trailers via codec:\n{}", *this, trailers);
    1900             : 
    1901           1 :   response_encoder_->encodeTrailers(trailers);
    1902           1 : }
    1903             : 
    1904           2 : void ConnectionManagerImpl::ActiveStream::encodeMetadata(MetadataMapPtr&& metadata) {
    1905           2 :   MetadataMapVector metadata_map_vector;
    1906           2 :   metadata_map_vector.emplace_back(std::move(metadata));
    1907           2 :   ENVOY_STREAM_LOG(debug, "encoding metadata via codec:\n{}", *this, metadata_map_vector);
    1908           2 :   response_encoder_->encodeMetadata(metadata_map_vector);
    1909           2 : }
    1910             : 
    1911          17 : void ConnectionManagerImpl::ActiveStream::onDecoderFilterBelowWriteBufferLowWatermark() {
    1912          17 :   ENVOY_STREAM_LOG(debug, "Read-enabling downstream stream due to filter callbacks.", *this);
    1913             :   // If the state is destroyed, the codec's stream is already torn down. On
    1914             :   // teardown the codec will unwind any remaining read disable calls.
    1915          17 :   if (!filter_manager_.destroyed()) {
    1916          17 :     response_encoder_->getStream().readDisable(false);
    1917          17 :   }
    1918          17 :   connection_manager_.stats_.named_.downstream_flow_control_resumed_reading_total_.inc();
    1919          17 : }
    1920             : 
    1921          19 : void ConnectionManagerImpl::ActiveStream::onDecoderFilterAboveWriteBufferHighWatermark() {
    1922          19 :   ENVOY_STREAM_LOG(debug, "Read-disabling downstream stream due to filter callbacks.", *this);
    1923          19 :   response_encoder_->getStream().readDisable(true);
    1924          19 :   connection_manager_.stats_.named_.downstream_flow_control_paused_reading_total_.inc();
    1925          19 : }
    1926             : 
    1927             : void ConnectionManagerImpl::ActiveStream::onResetStream(StreamResetReason reset_reason,
    1928         366 :                                                         absl::string_view) {
    1929             :   // NOTE: This function gets called in all of the following cases:
    1930             :   //       1) We TX an app level reset
    1931             :   //       2) The codec TX a codec level reset
    1932             :   //       3) The codec RX a reset
    1933             :   //       4) The overload manager reset the stream
    1934             :   //       If we need to differentiate we need to do it inside the codec. Can start with this.
    1935         366 :   const absl::string_view encoder_details = response_encoder_->getStream().responseDetails();
    1936         366 :   ENVOY_STREAM_LOG(debug, "stream reset: reset reason: {}, response details: {}", *this,
    1937         366 :                    Http::Utility::resetReasonToString(reset_reason),
    1938         366 :                    encoder_details.empty() ? absl::string_view{"-"} : encoder_details);
    1939         366 :   connection_manager_.stats_.named_.downstream_rq_rx_reset_.inc();
    1940         366 :   state_.on_reset_stream_called_ = true;
    1941             : 
    1942             :   // If the codec sets its responseDetails() for a reason other than peer reset, set a
    1943             :   // DownstreamProtocolError. Either way, propagate details.
    1944         366 :   if (!encoder_details.empty() && reset_reason == StreamResetReason::LocalReset) {
    1945          81 :     filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::DownstreamProtocolError);
    1946          81 :   }
    1947         366 :   if (!encoder_details.empty()) {
    1948         103 :     filter_manager_.streamInfo().setResponseCodeDetails(encoder_details);
    1949         103 :   }
    1950             : 
    1951             :   // Check if we're in the overload manager reset case.
    1952             :   // encoder_details should be empty in this case as we don't have a codec error.
    1953         366 :   if (encoder_details.empty() && reset_reason == StreamResetReason::OverloadManager) {
    1954           0 :     filter_manager_.streamInfo().setResponseFlag(StreamInfo::ResponseFlag::OverloadManager);
    1955           0 :     filter_manager_.streamInfo().setResponseCodeDetails(
    1956           0 :         StreamInfo::ResponseCodeDetails::get().Overload);
    1957           0 :   }
    1958         366 :   filter_manager_.onDownstreamReset();
    1959         366 :   connection_manager_.doDeferredStreamDestroy(*this);
    1960         366 : }
    1961             : 
    1962           0 : void ConnectionManagerImpl::ActiveStream::onAboveWriteBufferHighWatermark() {
    1963           0 :   ENVOY_STREAM_LOG(debug, "Disabling upstream stream due to downstream stream watermark.", *this);
    1964           0 :   filter_manager_.callHighWatermarkCallbacks();
    1965           0 : }
    1966             : 
    1967           0 : void ConnectionManagerImpl::ActiveStream::onBelowWriteBufferLowWatermark() {
    1968           0 :   ENVOY_STREAM_LOG(debug, "Enabling upstream stream due to downstream stream watermark.", *this);
    1969           0 :   filter_manager_.callLowWatermarkCallbacks();
    1970           0 : }
    1971             : 
    1972         505 : void ConnectionManagerImpl::ActiveStream::onCodecEncodeComplete() {
    1973         505 :   ASSERT(!state_.codec_encode_complete_);
    1974         505 :   ENVOY_STREAM_LOG(debug, "Codec completed encoding stream.", *this);
    1975         505 :   state_.codec_encode_complete_ = true;
    1976             : 
    1977             :   // Update timing
    1978         505 :   filter_manager_.streamInfo().downstreamTiming().onLastDownstreamTxByteSent(
    1979         505 :       connection_manager_.time_source_);
    1980         505 :   request_response_timespan_->complete();
    1981             : 
    1982             :   // Only reap stream once.
    1983         505 :   if (state_.is_zombie_stream_) {
    1984          70 :     connection_manager_.doDeferredStreamDestroy(*this);
    1985          70 :   }
    1986         505 : }
    1987             : 
    1988           0 : void ConnectionManagerImpl::ActiveStream::onCodecLowLevelReset() {
    1989           0 :   ASSERT(!state_.codec_encode_complete_);
    1990           0 :   state_.on_reset_stream_called_ = true;
    1991           0 :   ENVOY_STREAM_LOG(debug, "Codec timed out flushing stream", *this);
    1992             : 
    1993             :   // TODO(kbaichoo): Update streamInfo to account for the reset.
    1994             : 
    1995             :   // Only reap stream once.
    1996           0 :   if (state_.is_zombie_stream_) {
    1997           0 :     connection_manager_.doDeferredStreamDestroy(*this);
    1998           0 :   }
    1999           0 : }
    2000             : 
    2001           0 : Tracing::OperationName ConnectionManagerImpl::ActiveStream::operationName() const {
    2002           0 :   ASSERT(connection_manager_tracing_config_.has_value());
    2003           0 :   return connection_manager_tracing_config_->operation_name_;
    2004           0 : }
    2005             : 
    2006           0 : const Tracing::CustomTagMap* ConnectionManagerImpl::ActiveStream::customTags() const {
    2007           0 :   return tracing_custom_tags_.get();
    2008           0 : }
    2009             : 
    2010           0 : bool ConnectionManagerImpl::ActiveStream::verbose() const {
    2011           0 :   ASSERT(connection_manager_tracing_config_.has_value());
    2012           0 :   return connection_manager_tracing_config_->verbose_;
    2013           0 : }
    2014             : 
    2015           0 : uint32_t ConnectionManagerImpl::ActiveStream::maxPathTagLength() const {
    2016           0 :   ASSERT(connection_manager_tracing_config_.has_value());
    2017           0 :   return connection_manager_tracing_config_->max_path_tag_length_;
    2018           0 : }
    2019             : 
    2020           0 : bool ConnectionManagerImpl::ActiveStream::spawnUpstreamSpan() const {
    2021           0 :   ASSERT(connection_manager_tracing_config_.has_value());
    2022           0 :   return connection_manager_tracing_config_->spawn_upstream_span_;
    2023           0 : }
    2024             : 
    2025           2 : const Router::RouteEntry::UpgradeMap* ConnectionManagerImpl::ActiveStream::upgradeMap() {
    2026             :   // We must check if the 'cached_route_' optional is populated since this function can be called
    2027             :   // early via sendLocalReply(), before the cached route is populated.
    2028           2 :   if (hasCachedRoute() && cached_route_.value()->routeEntry()) {
    2029           2 :     return &cached_route_.value()->routeEntry()->upgradeMap();
    2030           2 :   }
    2031             : 
    2032           0 :   return nullptr;
    2033           2 : }
    2034             : 
    2035         183 : Tracing::Span& ConnectionManagerImpl::ActiveStream::activeSpan() {
    2036         183 :   if (active_span_) {
    2037           0 :     return *active_span_;
    2038         183 :   } else {
    2039         183 :     return Tracing::NullSpan::instance();
    2040         183 :   }
    2041         183 : }
    2042             : 
    2043         183 : OptRef<const Tracing::Config> ConnectionManagerImpl::ActiveStream::tracingConfig() const {
    2044         183 :   if (connection_manager_tracing_config_.has_value()) {
    2045           0 :     return makeOptRef<const Tracing::Config>(*this);
    2046           0 :   }
    2047         183 :   return {};
    2048         183 : }
    2049             : 
    2050         407 : const ScopeTrackedObject& ConnectionManagerImpl::ActiveStream::scope() { return *this; }
    2051             : 
    2052           0 : Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStream::clusterInfo() {
    2053             :   // NOTE: Refreshing route caches clusterInfo as well.
    2054           0 :   if (!cached_route_.has_value()) {
    2055           0 :     refreshCachedRoute();
    2056           0 :   }
    2057             : 
    2058           0 :   return cached_cluster_info_.value();
    2059           0 : }
    2060             : 
    2061             : Router::RouteConstSharedPtr
    2062        1125 : ConnectionManagerImpl::ActiveStream::route(const Router::RouteCallback& cb) {
    2063        1125 :   if (cached_route_.has_value()) {
    2064        1004 :     return cached_route_.value();
    2065        1004 :   }
    2066         121 :   refreshCachedRoute(cb);
    2067         121 :   return cached_route_.value();
    2068        1125 : }
    2069             : 
    2070             : /**
    2071             :  * Sets the cached route to the RouteConstSharedPtr argument passed in. Handles setting the
    2072             :  * cached_route_/cached_cluster_info_ ActiveStream attributes, the FilterManager streamInfo, tracing
    2073             :  * tags, and timeouts.
    2074             :  *
    2075             :  * Declared as a StreamFilterCallbacks member function for filters to call directly, but also
    2076             :  * functions as a helper to refreshCachedRoute(const Router::RouteCallback& cb).
    2077             :  */
    2078         628 : void ConnectionManagerImpl::ActiveStream::setRoute(Router::RouteConstSharedPtr route) {
    2079             :   // If the cached route is blocked then any attempt to clear it or refresh it
    2080             :   // will be ignored.
    2081             :   // setRoute() may be called directly by the interface of DownstreamStreamFilterCallbacks,
    2082             :   // so check for routeCacheBlocked() here again.
    2083         628 :   if (routeCacheBlocked()) {
    2084           0 :     return;
    2085           0 :   }
    2086             : 
    2087             :   // Update the cached route.
    2088         628 :   setCachedRoute({route});
    2089             :   // Update the cached cluster info based on the new route.
    2090         628 :   if (nullptr == route || nullptr == route->routeEntry()) {
    2091         387 :     cached_cluster_info_ = nullptr;
    2092         575 :   } else {
    2093         241 :     auto* cluster = connection_manager_.cluster_manager_.getThreadLocalCluster(
    2094         241 :         route->routeEntry()->clusterName());
    2095         241 :     cached_cluster_info_ = (nullptr == cluster) ? nullptr : cluster->info();
    2096         241 :   }
    2097             : 
    2098             :   // Update route and cluster info in the filter manager's stream info.
    2099         628 :   filter_manager_.streamInfo().route_ = std::move(route); // Now can move route here safely.
    2100         628 :   filter_manager_.streamInfo().setUpstreamClusterInfo(cached_cluster_info_.value());
    2101             : 
    2102         628 :   refreshCachedTracingCustomTags();
    2103         628 :   refreshDurationTimeout();
    2104         628 :   refreshIdleTimeout();
    2105         628 : }
    2106             : 
    2107         628 : void ConnectionManagerImpl::ActiveStream::refreshIdleTimeout() {
    2108         628 :   if (hasCachedRoute()) {
    2109         443 :     const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
    2110         443 :     if (route_entry != nullptr && route_entry->idleTimeout()) {
    2111           0 :       idle_timeout_ms_ = route_entry->idleTimeout().value();
    2112           0 :       response_encoder_->getStream().setFlushTimeout(idle_timeout_ms_);
    2113           0 :       if (idle_timeout_ms_.count()) {
    2114             :         // If we have a route-level idle timeout but no global stream idle timeout, create a timer.
    2115           0 :         if (stream_idle_timer_ == nullptr) {
    2116           0 :           stream_idle_timer_ = connection_manager_.dispatcher_->createScaledTimer(
    2117           0 :               Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
    2118           0 :               [this]() -> void { onIdleTimeout(); });
    2119           0 :         }
    2120           0 :       } else if (stream_idle_timer_ != nullptr) {
    2121             :         // If we had a global stream idle timeout but the route-level idle timeout is set to zero
    2122             :         // (to override), we disable the idle timer.
    2123           0 :         stream_idle_timer_->disableTimer();
    2124           0 :         stream_idle_timer_ = nullptr;
    2125           0 :       }
    2126           0 :     }
    2127         443 :   }
    2128         628 : }
    2129             : 
    2130           0 : void ConnectionManagerImpl::ActiveStream::refreshAccessLogFlushTimer() {
    2131           0 :   if (connection_manager_.config_.accessLogFlushInterval().has_value()) {
    2132           0 :     access_log_flush_timer_->enableTimer(
    2133           0 :         connection_manager_.config_.accessLogFlushInterval().value(), this);
    2134           0 :   }
    2135           0 : }
    2136             : 
    2137           0 : void ConnectionManagerImpl::ActiveStream::clearRouteCache() {
    2138             :   // If the cached route is blocked then any attempt to clear it or refresh it
    2139             :   // will be ignored.
    2140           0 :   if (routeCacheBlocked()) {
    2141           0 :     return;
    2142           0 :   }
    2143             : 
    2144           0 :   setCachedRoute({});
    2145             : 
    2146           0 :   cached_cluster_info_ = absl::optional<Upstream::ClusterInfoConstSharedPtr>();
    2147           0 :   if (tracing_custom_tags_) {
    2148           0 :     tracing_custom_tags_->clear();
    2149           0 :   }
    2150           0 : }
    2151             : 
    2152             : void ConnectionManagerImpl::ActiveStream::setCachedRoute(
    2153         628 :     absl::optional<Router::RouteConstSharedPtr>&& route) {
    2154         628 :   if (hasCachedRoute()) {
    2155             :     // The configuration of the route may be referenced by some filters.
    2156             :     // Cache the route to avoid it being destroyed before the stream is destroyed.
    2157           0 :     cleared_cached_routes_.emplace_back(std::move(cached_route_.value()));
    2158           0 :   }
    2159         628 :   cached_route_ = std::move(route);
    2160         628 : }
    2161             : 
    2162         562 : void ConnectionManagerImpl::ActiveStream::blockRouteCache() {
    2163         562 :   route_cache_blocked_ = true;
    2164             :   // Clear the snapped route configuration because it is unnecessary to keep it.
    2165         562 :   snapped_route_config_.reset();
    2166         562 :   snapped_scoped_routes_config_.reset();
    2167         562 : }
    2168             : 
    2169           0 : void ConnectionManagerImpl::ActiveStream::onRequestDataTooLarge() {
    2170           0 :   connection_manager_.stats_.named_.downstream_rq_too_large_.inc();
    2171           0 : }
    2172             : 
    2173             : void ConnectionManagerImpl::ActiveStream::recreateStream(
    2174           0 :     StreamInfo::FilterStateSharedPtr filter_state) {
    2175           0 :   ResponseEncoder* response_encoder = response_encoder_;
    2176           0 :   response_encoder_ = nullptr;
    2177             : 
    2178           0 :   Buffer::InstancePtr request_data = std::make_unique<Buffer::OwnedImpl>();
    2179           0 :   const auto& buffered_request_data = filter_manager_.bufferedRequestData();
    2180           0 :   const bool proxy_body = buffered_request_data != nullptr && buffered_request_data->length() > 0;
    2181           0 :   if (proxy_body) {
    2182           0 :     request_data->move(*buffered_request_data);
    2183           0 :   }
    2184             : 
    2185           0 :   response_encoder->getStream().removeCallbacks(*this);
    2186             : 
    2187             :   // This functionally deletes the stream (via deferred delete) so do not
    2188             :   // reference anything beyond this point.
    2189             :   // Make sure to not check for deferred close as we'll be immediately creating a new stream.
    2190           0 :   state_.is_internally_destroyed_ = true;
    2191           0 :   connection_manager_.doEndStream(*this, /*check_for_deferred_close*/ false);
    2192             : 
    2193           0 :   RequestDecoder& new_stream = connection_manager_.newStream(*response_encoder, true);
    2194             : 
    2195             :   // Set the new RequestDecoder on the ResponseEncoder. Even though all of the decoder callbacks
    2196             :   // have already been called at this point, the encoder still needs the new decoder for deferred
    2197             :   // logging in some cases.
    2198             :   // This doesn't currently work for HTTP/1 as the H/1 ResponseEncoder doesn't hold the active
    2199             :   // stream's pointer to the RequestDecoder.
    2200           0 :   response_encoder->setRequestDecoder(new_stream);
    2201             :   // We don't need to copy over the old parent FilterState from the old StreamInfo if it did not
    2202             :   // store any objects with a LifeSpan at or above DownstreamRequest. This is to avoid unnecessary
    2203             :   // heap allocation.
    2204             :   // TODO(snowp): In the case where connection level filter state has been set on the connection
    2205             :   // FilterState that we inherit, we'll end up copying this every time even though we could get
    2206             :   // away with just resetting it to the HCM filter_state_.
    2207           0 :   if (filter_state->hasDataAtOrAboveLifeSpan(StreamInfo::FilterState::LifeSpan::Request)) {
    2208           0 :     (*connection_manager_.streams_.begin())->filter_manager_.streamInfo().filter_state_ =
    2209           0 :         std::make_shared<StreamInfo::FilterStateImpl>(
    2210           0 :             filter_state->parent(), StreamInfo::FilterState::LifeSpan::FilterChain);
    2211           0 :   }
    2212             : 
    2213             :   // Make sure that relevant information makes it from the original stream info
    2214             :   // to the new one. Generally this should consist of all downstream related
    2215             :   // data, and not include upstream related data.
    2216           0 :   (*connection_manager_.streams_.begin())
    2217           0 :       ->filter_manager_.streamInfo()
    2218           0 :       .setFromForRecreateStream(filter_manager_.streamInfo());
    2219           0 :   new_stream.decodeHeaders(std::move(request_headers_), !proxy_body);
    2220           0 :   if (proxy_body) {
    2221             :     // This functionality is currently only used for internal redirects, which the router only
    2222             :     // allows if the full request has been read (end_stream = true) so we don't need to handle the
    2223             :     // case of upstream sending an early response mid-request.
    2224           0 :     new_stream.decodeData(*request_data, true);
    2225           0 :   }
    2226           0 : }
    2227             : 
    2228           0 : Http1StreamEncoderOptionsOptRef ConnectionManagerImpl::ActiveStream::http1StreamEncoderOptions() {
    2229           0 :   return response_encoder_->http1StreamEncoderOptions();
    2230           0 : }
    2231             : 
    2232           0 : void ConnectionManagerImpl::ActiveStream::onResponseDataTooLarge() {
    2233           0 :   connection_manager_.stats_.named_.rs_too_large_.inc();
    2234           0 : }
    2235             : 
    2236           0 : void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, absl::string_view) {
    2237           0 :   connection_manager_.stats_.named_.downstream_rq_tx_reset_.inc();
    2238           0 :   connection_manager_.doEndStream(*this);
    2239           0 : }
    2240             : 
    2241           0 : bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
    2242             :   // TODO(yanavlasov): Merge this with the filter manager continueIteration() method
    2243           0 :   if (!state_.deferred_to_next_io_iteration_) {
    2244           0 :     return false;
    2245           0 :   }
    2246           0 :   state_.deferred_to_next_io_iteration_ = false;
    2247           0 :   bool end_stream = state_.deferred_end_stream_ && deferred_data_ == nullptr &&
    2248           0 :                     request_trailers_ == nullptr && deferred_metadata_.empty();
    2249           0 :   filter_manager_.decodeHeaders(*request_headers_, end_stream);
    2250           0 :   if (end_stream) {
    2251           0 :     return true;
    2252           0 :   }
    2253             :   // Send metadata before data, as data may have an associated end_stream.
    2254           0 :   while (!deferred_metadata_.empty()) {
    2255           0 :     MetadataMapPtr& metadata = deferred_metadata_.front();
    2256           0 :     filter_manager_.decodeMetadata(*metadata);
    2257           0 :     deferred_metadata_.pop();
    2258           0 :   }
    2259             :   // Filter manager will return early from decodeData and decodeTrailers if
    2260             :   // request has completed.
    2261           0 :   if (deferred_data_ != nullptr) {
    2262           0 :     end_stream = state_.deferred_end_stream_ && request_trailers_ == nullptr;
    2263           0 :     filter_manager_.decodeData(*deferred_data_, end_stream);
    2264           0 :   }
    2265           0 :   if (request_trailers_ != nullptr) {
    2266           0 :     filter_manager_.decodeTrailers(*request_trailers_);
    2267           0 :   }
    2268           0 :   return true;
    2269           0 : }
    2270             : 
    2271         507 : bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
    2272             :   // Do not defer this stream if stream deferral is disabled
    2273         507 :   if (deferred_request_processing_callback_ == nullptr) {
    2274         507 :     return false;
    2275         507 :   }
    2276             :   // Defer this stream if there are already deferred streams, so they are not
    2277             :   // processed out of order
    2278           0 :   if (deferred_request_processing_callback_->enabled()) {
    2279           0 :     return true;
    2280           0 :   }
    2281           0 :   ++requests_during_dispatch_count_;
    2282           0 :   bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_;
    2283           0 :   if (defer) {
    2284           0 :     deferred_request_processing_callback_->scheduleCallbackNextIteration();
    2285           0 :   }
    2286           0 :   return defer;
    2287           0 : }
    2288             : 
    2289           0 : void ConnectionManagerImpl::onDeferredRequestProcessing() {
    2290           0 :   if (streams_.empty()) {
    2291           0 :     return;
    2292           0 :   }
    2293           0 :   requests_during_dispatch_count_ = 1; // 1 stream is always let through
    2294             :   // Streams are inserted at the head of the list. As such process deferred
    2295             :   // streams in the reverse order.
    2296           0 :   auto reverse_iter = std::prev(streams_.end());
    2297           0 :   bool at_first_element = false;
    2298           0 :   do {
    2299           0 :     at_first_element = reverse_iter == streams_.begin();
    2300             :     // Move the iterator to the previous item in case the `onDeferredRequestProcessing` call removes
    2301             :     // the stream from the list.
    2302           0 :     auto previous_element = std::prev(reverse_iter);
    2303           0 :     bool was_deferred = (*reverse_iter)->onDeferredRequestProcessing();
    2304           0 :     if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
    2305           0 :       break;
    2306           0 :     }
    2307           0 :     reverse_iter = previous_element;
    2308             :     // TODO(yanavlasov): see if `rend` can be used.
    2309           0 :   } while (!at_first_element);
    2310           0 : }
    2311             : 
    2312             : } // namespace Http
    2313             : } // namespace Envoy

Generated by: LCOV version 1.15