1
#include "source/common/http/conn_manager_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <iterator>
7
#include <limits>
8
#include <list>
9
#include <memory>
10
#include <string>
11
#include <vector>
12

            
13
#include "envoy/buffer/buffer.h"
14
#include "envoy/common/time.h"
15
#include "envoy/config/core/v3/base.pb.h"
16
#include "envoy/event/dispatcher.h"
17
#include "envoy/event/scaled_range_timer_manager.h"
18
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
19
#include "envoy/http/header_map.h"
20
#include "envoy/http/header_validator_errors.h"
21
#include "envoy/network/drain_decision.h"
22
#include "envoy/router/router.h"
23
#include "envoy/ssl/connection.h"
24
#include "envoy/stats/scope.h"
25
#include "envoy/stream_info/filter_state.h"
26
#include "envoy/stream_info/stream_info.h"
27
#include "envoy/tracing/tracer.h"
28
#include "envoy/type/v3/percent.pb.h"
29

            
30
#include "source/common/buffer/buffer_impl.h"
31
#include "source/common/common/assert.h"
32
#include "source/common/common/empty_string.h"
33
#include "source/common/common/enum_to_int.h"
34
#include "source/common/common/fmt.h"
35
#include "source/common/common/perf_tracing.h"
36
#include "source/common/common/scope_tracker.h"
37
#include "source/common/common/utility.h"
38
#include "source/common/http/codes.h"
39
#include "source/common/http/conn_manager_utility.h"
40
#include "source/common/http/exception.h"
41
#include "source/common/http/header_map_impl.h"
42
#include "source/common/http/header_utility.h"
43
#include "source/common/http/headers.h"
44
#include "source/common/http/http1/codec_impl.h"
45
#include "source/common/http/http2/codec_impl.h"
46
#include "source/common/http/path_utility.h"
47
#include "source/common/http/status.h"
48
#include "source/common/http/utility.h"
49
#include "source/common/network/utility.h"
50
#include "source/common/router/config_impl.h"
51
#include "source/common/runtime/runtime_features.h"
52
#include "source/common/stats/timespan_impl.h"
53
#include "source/common/stream_info/utility.h"
54

            
55
#include "absl/strings/escaping.h"
56
#include "absl/strings/match.h"
57
#include "absl/strings/str_cat.h"
58

            
59
namespace Envoy {
60
namespace Http {
61

            
62
const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey =
63
    "overload.premature_reset_total_stream_count";
64
const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
65
    "overload.premature_reset_min_stream_lifetime_seconds";
66
// Runtime key for maximum number of requests that can be processed from a single connection per
67
// I/O cycle. Requests over this limit are deferred until the next I/O cycle.
68
const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle =
69
    "http.max_requests_per_io_cycle";
70
// Don't attempt to intelligently delay close: https://github.com/envoyproxy/envoy/issues/30010
71
const absl::string_view ConnectionManagerImpl::OptionallyDelayClose =
72
    "http1.optionally_delay_close";
73

            
74
2785
bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
75
2785
  if (!headers) {
76
1349
    return false;
77
1349
  }
78
1436
  if (protocol <= Protocol::Http11) {
79
525
    return HeaderUtility::isConnect(*headers);
80
525
  }
81
  // All HTTP/2 style upgrades were originally connect requests.
82
911
  return HeaderUtility::isConnect(*headers) || Utility::isUpgrade(*headers);
83
1436
}
84

            
85
const Formatter::Formatter*
86
operationNameFormatter(const Http::TracingConnectionManagerConfig& hcm_config,
87
158
                       const Router::RouteTracing* route_config) {
88
158
  const Formatter::Formatter* formatter =
89
158
      route_config != nullptr ? route_config->operation().ptr() : nullptr;
90
158
  return formatter != nullptr ? formatter : hcm_config.operation_.get();
91
158
}
92
const Formatter::Formatter*
93
upstreamOperationNameFormatter(const Http::TracingConnectionManagerConfig& hcm_config,
94
17
                               const Router::RouteTracing* route_config) {
95
17
  const Formatter::Formatter* formatter =
96
17
      route_config != nullptr ? route_config->upstreamOperation().ptr() : nullptr;
97
17
  return formatter != nullptr ? formatter : hcm_config.upstream_operation_.get();
98
17
}
99

            
100
ConnectionManagerStats ConnectionManagerImpl::generateStats(const std::string& prefix,
101
20682
                                                            Stats::Scope& scope) {
102
20682
  return ConnectionManagerStats(
103
20682
      {ALL_HTTP_CONN_MAN_STATS(POOL_COUNTER_PREFIX(scope, prefix), POOL_GAUGE_PREFIX(scope, prefix),
104
20682
                               POOL_HISTOGRAM_PREFIX(scope, prefix))},
105
20682
      prefix, scope);
106
20682
}
107

            
108
ConnectionManagerTracingStats ConnectionManagerImpl::generateTracingStats(const std::string& prefix,
109
20682
                                                                          Stats::Scope& scope) {
110
20682
  return {CONN_MAN_TRACING_STATS(POOL_COUNTER_PREFIX(scope, prefix + "tracing."))};
111
20682
}
112

            
113
ConnectionManagerListenerStats
114
20644
ConnectionManagerImpl::generateListenerStats(const std::string& prefix, Stats::Scope& scope) {
115
20644
  return {CONN_MAN_LISTENER_STATS(POOL_COUNTER_PREFIX(scope, prefix))};
116
20644
}
117

            
118
ConnectionManagerImpl::ConnectionManagerImpl(
119
    ConnectionManagerConfigSharedPtr config, const Network::DrainDecision& drain_close,
120
    Random::RandomGenerator& random_generator, Http::Context& http_context,
121
    Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info,
122
    Upstream::ClusterManager& cluster_manager, Server::OverloadManager& overload_manager,
123
    TimeSource& time_source, envoy::config::core::v3::TrafficDirection direction)
124
22464
    : config_(std::move(config)), stats_(config_->stats()),
125
22464
      conn_length_(new Stats::HistogramCompletableTimespanImpl(
126
22464
          stats_.named_.downstream_cx_length_ms_, time_source)),
127
22464
      drain_close_(drain_close), user_agent_(http_context.userAgentContext()),
128
22464
      random_generator_(random_generator), runtime_(runtime), local_info_(local_info),
129
22464
      cluster_manager_(cluster_manager), listener_stats_(config_->listenerStats()),
130
22464
      overload_manager_(overload_manager),
131
22464
      overload_state_(overload_manager.getThreadLocalOverloadState()),
132
      accept_new_http_stream_(
133
22464
          overload_manager.getLoadShedPoint(Server::LoadShedPointName::get().HcmDecodeHeaders)),
134
      hcm_ondata_creating_codec_(
135
22464
          overload_manager.getLoadShedPoint(Server::LoadShedPointName::get().HcmCodecCreation)),
136
      overload_stop_accepting_requests_ref_(
137
22464
          overload_state_.getState(Server::OverloadActionNames::get().StopAcceptingRequests)),
138
      overload_disable_keepalive_ref_(
139
22464
          overload_state_.getState(Server::OverloadActionNames::get().DisableHttpKeepAlive)),
140
22464
      time_source_(time_source), proxy_name_(StreamInfo::ProxyStatusUtils::makeProxyName(
141
22464
                                     /*node_id=*/local_info_.node().id(),
142
22464
                                     /*server_name=*/config_->serverName(),
143
22464
                                     /*proxy_status_config=*/config_->proxyStatusConfig())),
144
      max_requests_during_dispatch_(
145
22464
          runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)),
146
22464
      direction_(direction),
147
22464
      allow_upstream_half_close_(Runtime::runtimeFeatureEnabled(
148
22464
          "envoy.reloadable_features.allow_multiplexed_upstream_half_close")),
149
22464
      close_connection_on_zombie_stream_complete_(Runtime::runtimeFeatureEnabled(
150
22464
          "envoy.reloadable_features.http1_close_connection_on_zombie_stream_complete")) {
151
22464
  ENVOY_LOG_ONCE_IF(
152
22464
      trace, accept_new_http_stream_ == nullptr,
153
22464
      "LoadShedPoint envoy.load_shed_points.http_connection_manager_decode_headers is not "
154
22464
      "found. Is it configured?");
155
22464
  ENVOY_LOG_ONCE_IF(trace, hcm_ondata_creating_codec_ == nullptr,
156
22464
                    "LoadShedPoint envoy.load_shed_points.hcm_ondata_creating_codec is not found. "
157
22464
                    "Is it configured?");
158
22464
}
159

            
160
80
const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
161
80
  static const auto headers = createHeaderMap<ResponseHeaderMapImpl>(
162
80
      {{Http::Headers::get().Status, std::to_string(enumToInt(Code::Continue))}});
163
80
  return *headers;
164
80
}
165

            
166
22464
void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
167
22464
  read_callbacks_ = &callbacks;
168
22464
  dispatcher_ = &callbacks.connection().dispatcher();
169
22464
  if (max_requests_during_dispatch_ != UINT32_MAX) {
170
22
    deferred_request_processing_callback_ =
171
324
        dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); });
172
22
  }
173

            
174
22464
  stats_.named_.downstream_cx_total_.inc();
175
22464
  stats_.named_.downstream_cx_active_.inc();
176
22464
  if (read_callbacks_->connection().ssl()) {
177
2202
    stats_.named_.downstream_cx_ssl_total_.inc();
178
2202
    stats_.named_.downstream_cx_ssl_active_.inc();
179
2202
  }
180

            
181
22464
  read_callbacks_->connection().addConnectionCallbacks(*this);
182

            
183
22464
  if (config_->addProxyProtocolConnectionState() &&
184
22464
      !read_callbacks_->connection()
185
22463
           .streamInfo()
186
22463
           .filterState()
187
22463
           ->hasData<Network::ProxyProtocolFilterState>(Network::ProxyProtocolFilterState::key())) {
188
22454
    read_callbacks_->connection().streamInfo().filterState()->setData(
189
22454
        Network::ProxyProtocolFilterState::key(),
190
22454
        std::make_unique<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
191
22454
            read_callbacks_->connection().connectionInfoProvider().remoteAddress(),
192
22454
            read_callbacks_->connection().connectionInfoProvider().localAddress()}),
193
22454
        StreamInfo::FilterState::StateType::ReadOnly,
194
22454
        StreamInfo::FilterState::LifeSpan::Connection);
195
22454
  }
196

            
197
22464
  if (config_->idleTimeout()) {
198
21790
    connection_idle_timer_ =
199
21790
        dispatcher_->createScaledTimer(Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout,
200
21790
                                       [this]() -> void { onIdleTimeout(); });
201
21790
    connection_idle_timer_->enableTimer(config_->idleTimeout().value());
202
21790
  }
203

            
204
22464
  if (config_->maxConnectionDuration()) {
205
85
    connection_duration_timer_ =
206
85
        dispatcher_->createScaledTimer(Event::ScaledTimerType::HttpDownstreamMaxConnectionTimeout,
207
85
                                       [this]() -> void { onConnectionDurationTimeout(); });
208
85
    connection_duration_timer_->enableTimer(config_->maxConnectionDuration().value());
209
85
  }
210

            
211
22464
  read_callbacks_->connection().setDelayedCloseTimeout(config_->delayedCloseTimeout());
212

            
213
22464
  read_callbacks_->connection().setConnectionStats(
214
22464
      {stats_.named_.downstream_cx_rx_bytes_total_, stats_.named_.downstream_cx_rx_bytes_buffered_,
215
22464
       stats_.named_.downstream_cx_tx_bytes_total_, stats_.named_.downstream_cx_tx_bytes_buffered_,
216
22464
       nullptr, &stats_.named_.downstream_cx_delayed_close_timeout_});
217
22464
}
218

            
219
22464
ConnectionManagerImpl::~ConnectionManagerImpl() {
220
22464
  stats_.named_.downstream_cx_destroy_.inc();
221

            
222
22464
  stats_.named_.downstream_cx_active_.dec();
223
22464
  if (read_callbacks_->connection().ssl()) {
224
2202
    stats_.named_.downstream_cx_ssl_active_.dec();
225
2202
  }
226

            
227
22464
  if (codec_) {
228
22323
    if (codec_->protocol() == Protocol::Http2) {
229
8046
      stats_.named_.downstream_cx_http2_active_.dec();
230
18088
    } else if (codec_->protocol() == Protocol::Http3) {
231
1995
      stats_.named_.downstream_cx_http3_active_.dec();
232
13002
    } else {
233
12282
      stats_.named_.downstream_cx_http1_active_.dec();
234
12282
    }
235
22323
  }
236

            
237
22464
  if (soft_drain_http1_) {
238
9
    stats_.named_.downstream_cx_http1_soft_drain_.dec();
239
9
  }
240

            
241
22464
  conn_length_->complete();
242
22464
  user_agent_.completeConnectionLength(*conn_length_);
243
22464
}
244

            
245
117818
void ConnectionManagerImpl::checkForDeferredClose(bool skip_delay_close) {
246
117818
  Network::ConnectionCloseType close = Network::ConnectionCloseType::FlushWriteAndDelay;
247
117818
  if (runtime_.snapshot().getBoolean(ConnectionManagerImpl::OptionallyDelayClose, true) &&
248
117818
      skip_delay_close) {
249
105
    close = Network::ConnectionCloseType::FlushWrite;
250
105
  }
251
117818
  if (drain_state_ == DrainState::Closing && streams_.empty() && !codec_->wantsToWrite()) {
252
    // We are closing a draining connection with no active streams and the codec has
253
    // nothing to write.
254
2412
    doConnectionClose(close, absl::nullopt,
255
2412
                      StreamInfo::LocalCloseReasons::get().DeferredCloseOnDrainedConnection);
256
2412
  }
257
117818
}
258

            
259
47282
void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_deferred_close) {
260
  // The order of what happens in this routine is important and a little complicated. We first see
261
  // if the stream needs to be reset. If it needs to be, this will end up invoking reset callbacks
262
  // and then moving the stream to the deferred destruction list. If the stream has not been reset,
263
  // we move it to the deferred deletion list here. Then, we potentially close the connection. This
264
  // must be done after deleting the stream since the stream refers to the connection and must be
265
  // deleted first.
266
47282
  bool reset_stream = false;
267
  // If the response encoder is still associated with the stream, reset the stream. The exception
268
  // here is when Envoy "ends" the stream by calling recreateStream at which point recreateStream
269
  // explicitly nulls out response_encoder to avoid the downstream being notified of the
270
  // Envoy-internal stream instance being ended.
271
47282
  if (stream.response_encoder_ != nullptr &&
272
47282
      (!stream.filter_manager_.hasLastDownstreamByteReceived() ||
273
46968
       !stream.state_.codec_saw_local_complete_)) {
274
    // Indicate local is complete at this point so that if we reset during a continuation, we don't
275
    // raise further data or trailers.
276
2772
    ENVOY_STREAM_LOG(debug, "doEndStream() resetting stream", stream);
277
    // TODO(snowp): This call might not be necessary, try to clean up + remove setter function.
278
2772
    stream.filter_manager_.setLocalComplete();
279
2772
    stream.state_.codec_saw_local_complete_ = true;
280

            
281
    // Per https://tools.ietf.org/html/rfc7540#section-8.3 if there was an error
282
    // with the TCP connection during a CONNECT request, it should be
283
    // communicated via CONNECT_ERROR
284
2772
    if (requestWasConnect(stream.request_headers_, codec_->protocol()) &&
285
2772
        (stream.filter_manager_.streamInfo().hasResponseFlag(
286
167
             StreamInfo::CoreResponseFlag::UpstreamConnectionFailure) ||
287
167
         stream.filter_manager_.streamInfo().hasResponseFlag(
288
167
             StreamInfo::CoreResponseFlag::UpstreamConnectionTermination))) {
289
64
      stream.response_encoder_->getStream().resetStream(StreamResetReason::ConnectError);
290
2753
    } else {
291
2708
      const bool reset_with_error =
292
2708
          Runtime::runtimeFeatureEnabled("envoy.reloadable_features.reset_with_error");
293
2708
      if (stream.filter_manager_.streamInfo().hasResponseFlag(
294
2708
              StreamInfo::CoreResponseFlag::UpstreamProtocolError) &&
295
2708
          !Runtime::runtimeFeatureEnabled(
296
39
              "envoy.reloadable_features.reset_ignore_upstream_reason")) {
297
2
        stream.response_encoder_->getStream().resetStream(StreamResetReason::ProtocolError);
298
2706
      } else if (reset_with_error && stream.filter_manager_.streamInfo().hasResponseFlag(
299
2702
                                         StreamInfo::CoreResponseFlag::DownstreamProtocolError)) {
300
15
        stream.response_encoder_->getStream().resetStream(StreamResetReason::ProtocolError);
301
2691
      } else {
302
2691
        stream.response_encoder_->getStream().resetStream(StreamResetReason::LocalReset);
303
2691
      }
304
2708
    }
305
2772
    reset_stream = true;
306
2772
  }
307

            
308
47282
  if (!reset_stream) {
309
44510
    doDeferredStreamDestroy(stream);
310
44510
  }
311

            
312
47282
  if (reset_stream && codec_->protocol() < Protocol::Http2) {
313
1868
    drain_state_ = DrainState::Closing;
314
1868
  }
315

            
316
47282
  if (check_for_deferred_close) {
317
46968
    checkForDeferredClose(stream.shouldSkipDeferredCloseDelay());
318
46968
  }
319
47282
}
320

            
321
97078
void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
322
97078
  if (!stream.state_.is_internally_destroyed_) {
323
96764
    ++closed_non_internally_destroyed_requests_;
324
96764
    if (isPrematureRstStream(stream)) {
325
23127
      ++number_premature_stream_resets_;
326
23127
    }
327
96764
  }
328
97078
  if (stream.max_stream_duration_timer_ != nullptr) {
329
45
    stream.max_stream_duration_timer_->disableTimer();
330
45
    stream.max_stream_duration_timer_ = nullptr;
331
45
  }
332
97078
  if (stream.stream_idle_timer_ != nullptr) {
333
93339
    stream.stream_idle_timer_->disableTimer();
334
93339
    stream.stream_idle_timer_ = nullptr;
335
93339
  }
336
97078
  stream.filter_manager_.disarmRequestTimeout();
337
97078
  if (stream.request_header_timer_ != nullptr) {
338
1
    stream.request_header_timer_->disableTimer();
339
1
    stream.request_header_timer_ = nullptr;
340
1
  }
341
97078
  if (stream.access_log_flush_timer_ != nullptr) {
342
10
    stream.access_log_flush_timer_->disableTimer();
343
10
    stream.access_log_flush_timer_ = nullptr;
344
10
  }
345

            
346
  // Only destroy the active stream if the underlying codec has notified us of
347
  // completion or we've internal redirect the stream.
348
97078
  if (!stream.canDestroyStream()) {
349
    // Track that this stream is not expecting any additional calls apart from
350
    // codec notification.
351
2971
    stream.state_.is_zombie_stream_ = true;
352
2971
    return;
353
2971
  }
354

            
355
94107
  if (stream.response_encoder_ != nullptr) {
356
93793
    stream.response_encoder_->getStream().registerCodecEventCallbacks(nullptr);
357
93793
  }
358

            
359
94107
  stream.completeRequest();
360
94107
  stream.filter_manager_.onStreamComplete();
361

            
362
  // For HTTP/3, skip access logging here and add deferred logging info
363
  // to stream info for QuicStatsGatherer to use later.
364
94107
  if (codec_ && codec_->protocol() == Protocol::Http3 &&
365
      // There was a downstream reset, log immediately.
366
94107
      !stream.filter_manager_.sawDownstreamReset() &&
367
      // On recreate stream, log immediately.
368
94107
      stream.response_encoder_ != nullptr &&
369
94107
      Runtime::runtimeFeatureEnabled(
370
1509
          "envoy.reloadable_features.quic_defer_logging_to_ack_listener")) {
371
1508
    stream.deferHeadersAndTrailers();
372
92745
  } else {
373
    // For HTTP/1 and HTTP/2, log here as usual.
374
92599
    stream.log(AccessLog::AccessLogType::DownstreamEnd);
375
92599
  }
376

            
377
94107
  stream.filter_manager_.destroyFilters();
378

            
379
94107
  dispatcher_->deferredDelete(stream.removeFromList(streams_));
380

            
381
  // The response_encoder should never be dangling (unless we're destroying a
382
  // stream we are recreating) as the codec level stream will either outlive the
383
  // ActiveStream, or be alive in deferred deletion queue at this point.
384
94107
  if (stream.response_encoder_) {
385
93793
    stream.response_encoder_->getStream().removeCallbacks(stream);
386
93793
  }
387

            
388
94107
  if (connection_idle_timer_ && streams_.empty()) {
389
27628
    connection_idle_timer_->enableTimer(config_->idleTimeout().value());
390
27628
  }
391
94107
  maybeDrainDueToPrematureResets();
392
94107
}
393

            
394
RequestDecoderHandlePtr ConnectionManagerImpl::newStreamHandle(ResponseEncoder& response_encoder,
395
6
                                                               bool is_internally_created) {
396
6
  RequestDecoder& decoder = newStream(response_encoder, is_internally_created);
397
6
  return std::make_unique<ActiveStreamHandle>(static_cast<ActiveStream&>(decoder));
398
6
}
399

            
400
RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder,
401
94107
                                                 bool is_internally_created) {
402
94107
  TRACE_EVENT("core", "ConnectionManagerImpl::newStream");
403
94107
  if (connection_idle_timer_) {
404
93324
    connection_idle_timer_->disableTimer();
405
93324
  }
406

            
407
94107
  ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());
408

            
409
94107
  Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
410
94107
      response_encoder.getStream().account();
411

            
412
94107
  if (downstream_stream_account == nullptr) {
413
    // Create account, wiring the stream to use it for tracking bytes.
414
    // If tracking is disabled, the wiring becomes a NOP.
415
94103
    auto& buffer_factory = dispatcher_->getWatermarkFactory();
416
94103
    downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
417
94103
    response_encoder.getStream().setAccount(downstream_stream_account);
418
94103
  }
419

            
420
94107
  auto new_stream = std::make_unique<ActiveStream>(
421
94107
      *this, response_encoder.getStream().bufferLimit(), std::move(downstream_stream_account));
422

            
423
94107
  accumulated_requests_++;
424
94107
  if (config_->maxRequestsPerConnection() > 0 &&
425
94107
      accumulated_requests_ >= config_->maxRequestsPerConnection()) {
426
54
    if (codec_->protocol() < Protocol::Http2) {
427
22
      new_stream->filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
428
      // Prevent erroneous debug log of closing due to incoming connection close header.
429
22
      drain_state_ = DrainState::Closing;
430
40
    } else if (drain_state_ == DrainState::NotDraining) {
431
21
      startDrainSequence();
432
21
    }
433
54
    ENVOY_CONN_LOG(debug, "max requests per connection reached", read_callbacks_->connection());
434
54
    stats_.named_.downstream_cx_max_requests_reached_.inc();
435
54
  }
436

            
437
94107
  if (soft_drain_http1_) {
438
8
    new_stream->filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
439
    // Prevent erroneous debug log of closing due to incoming connection close header.
440
8
    drain_state_ = DrainState::Closing;
441
8
  }
442

            
443
94107
  new_stream->state_.is_internally_created_ = is_internally_created;
444
94107
  new_stream->response_encoder_ = &response_encoder;
445
94107
  new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
446
94107
  new_stream->response_encoder_->getStream().registerCodecEventCallbacks(new_stream.get());
447
94107
  if (config_->streamFlushTimeout().has_value()) {
448
93327
    new_stream->response_encoder_->getStream().setFlushTimeout(
449
93327
        config_->streamFlushTimeout().value());
450
93984
  } else {
451
780
    new_stream->response_encoder_->getStream().setFlushTimeout(config_->streamIdleTimeout());
452
780
  }
453
94107
  new_stream->streamInfo().setDownstreamBytesMeter(response_encoder.getStream().bytesMeter());
454
94107
  new_stream->streamInfo().setCodecStreamId(response_encoder.getStream().codecStreamId());
455
  // If the network connection is backed up, the stream should be made aware of it on creation.
456
  // Both HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacksHelper.
457
94107
  ASSERT(read_callbacks_->connection().aboveHighWatermark() == false ||
458
94107
         new_stream->filter_manager_.aboveHighWatermark());
459
94107
  LinkedList::moveIntoList(std::move(new_stream), streams_);
460
94107
  return **streams_.begin();
461
94107
}
462

            
463
void ConnectionManagerImpl::handleCodecErrorImpl(absl::string_view error, absl::string_view details,
464
1769
                                                 StreamInfo::CoreResponseFlag response_flag) {
465
1769
  ENVOY_CONN_LOG(debug, "dispatch error: {}", read_callbacks_->connection(), error);
466
1769
  read_callbacks_->connection().streamInfo().setResponseFlag(response_flag);
467

            
468
  // HTTP/1.1 codec has already sent a 400 response if possible. HTTP/2 codec has already sent
469
  // GOAWAY.
470
1769
  doConnectionClose(Network::ConnectionCloseType::FlushWriteAndDelay, response_flag, details);
471
1769
}
472

            
473
1759
void ConnectionManagerImpl::handleCodecError(absl::string_view error) {
474
1759
  handleCodecErrorImpl(error, absl::StrCat("codec_error:", StringUtil::replaceAllEmptySpace(error)),
475
1759
                       StreamInfo::CoreResponseFlag::DownstreamProtocolError);
476
1759
}
477

            
478
10
void ConnectionManagerImpl::handleCodecOverloadError(absl::string_view error) {
479
10
  handleCodecErrorImpl(error,
480
10
                       absl::StrCat("overload_error:", StringUtil::replaceAllEmptySpace(error)),
481
10
                       StreamInfo::CoreResponseFlag::OverloadManager);
482
10
}
483

            
484
22323
void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
485
22323
  ASSERT(!codec_);
486
22323
  codec_ = config_->createCodec(read_callbacks_->connection(), data, *this, overload_manager_);
487

            
488
22323
  switch (codec_->protocol()) {
489
1995
  case Protocol::Http3:
490
1995
    stats_.named_.downstream_cx_http3_total_.inc();
491
1995
    stats_.named_.downstream_cx_http3_active_.inc();
492
1995
    break;
493
8046
  case Protocol::Http2:
494
8046
    stats_.named_.downstream_cx_http2_total_.inc();
495
8046
    stats_.named_.downstream_cx_http2_active_.inc();
496
8046
    break;
497
12279
  case Protocol::Http11:
498
12282
  case Protocol::Http10:
499
12282
    stats_.named_.downstream_cx_http1_total_.inc();
500
12282
    stats_.named_.downstream_cx_http1_active_.inc();
501
12282
    break;
502
22323
  }
503
22323
}
504

            
505
69519
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
506
69519
  requests_during_dispatch_count_ = 0;
507
69519
  if (!codec_) {
508
    // Close connections if Envoy is under pressure, typically memory, before creating codec.
509
20317
    if (hcm_ondata_creating_codec_ != nullptr && hcm_ondata_creating_codec_->shouldShedLoad()) {
510
4
      stats_.named_.downstream_rq_overload_close_.inc();
511
4
      handleCodecOverloadError("onData codec creation overload");
512
4
      return Network::FilterStatus::StopIteration;
513
4
    }
514
    // Http3 codec should have been instantiated by now.
515
20313
    createCodec(data);
516
20313
  }
517

            
518
69515
  bool redispatch;
519
69521
  do {
520
69521
    redispatch = false;
521

            
522
69521
    const Status status = codec_->dispatch(data);
523

            
524
69521
    if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
525
33
      handleCodecError(status.message());
526
33
      return Network::FilterStatus::StopIteration;
527
69488
    } else if (isCodecProtocolError(status)) {
528
1711
      stats_.named_.downstream_cx_protocol_error_.inc();
529
1711
      handleCodecError(status.message());
530
1711
      return Network::FilterStatus::StopIteration;
531
67833
    } else if (isEnvoyOverloadError(status)) {
532
6
      stats_.named_.downstream_rq_overload_close_.inc();
533
6
      handleCodecOverloadError(status.message());
534
6
      return Network::FilterStatus::StopIteration;
535
6
    }
536
67771
    ASSERT(status.ok());
537

            
538
    // Processing incoming data may release outbound data so check for closure here as well.
539
67771
    checkForDeferredClose(false);
540

            
541
    // The HTTP/1 codec will pause dispatch after a single message is complete. We want to
542
    // either redispatch if there are no streams and we have more data. If we have a single
543
    // complete non-WebSocket stream but have not responded yet we will pause socket reads
544
    // to apply back pressure.
545
67771
    if (codec_->protocol() < Protocol::Http2) {
546
17866
      if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
547
17866
          data.length() > 0 && streams_.empty()) {
548
6
        redispatch = true;
549
6
      }
550
17866
    }
551
67771
  } while (redispatch);
552

            
553
67765
  if (!read_callbacks_->connection().streamInfo().protocol()) {
554
18649
    read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
555
18649
  }
556

            
557
67765
  return Network::FilterStatus::StopIteration;
558
69515
}
559

            
560
22218
Network::FilterStatus ConnectionManagerImpl::onNewConnection() {
561
22218
  if (!read_callbacks_->connection().streamInfo().protocol()) {
562
    // For Non-QUIC traffic, continue passing data to filters.
563
20224
    return Network::FilterStatus::Continue;
564
20224
  }
565
  // Only QUIC connection's stream_info_ specifies protocol.
566
1994
  Buffer::OwnedImpl dummy;
567
1994
  createCodec(dummy);
568
1994
  ASSERT(codec_->protocol() == Protocol::Http3);
569
  // Stop iterating through network filters for QUIC. Currently QUIC connections bypass the
570
  // onData() interface because QUICHE already handles de-multiplexing.
571
1994
  return Network::FilterStatus::StopIteration;
572
22218
}
573

            
574
void ConnectionManagerImpl::resetAllStreams(
575
667
    absl::optional<StreamInfo::CoreResponseFlag> response_flag, absl::string_view details) {
576
45397
  while (!streams_.empty()) {
577
    // Mimic a downstream reset in this case. We must also remove callbacks here. Though we are
578
    // about to close the connection and will disable further reads, it is possible that flushing
579
    // data out can cause stream callbacks to fire (e.g., low watermark callbacks).
580
    //
581
    // TODO(mattklein123): I tried to actually reset through the codec here, but ran into issues
582
    // with nghttp2 state and being unhappy about sending reset frames after the connection had
583
    // been terminated via GOAWAY. It might be possible to do something better here inside the h2
584
    // codec but there are no easy answers and this seems simpler.
585
44730
    auto& stream = *streams_.front();
586
44730
    stream.response_encoder_->getStream().removeCallbacks(stream);
587
44730
    if (!stream.response_encoder_->getStream().responseDetails().empty()) {
588
323
      stream.filter_manager_.streamInfo().setResponseCodeDetails(
589
323
          stream.response_encoder_->getStream().responseDetails());
590
44695
    } else if (!details.empty()) {
591
44407
      stream.filter_manager_.streamInfo().setResponseCodeDetails(details);
592
44407
    }
593
44730
    if (response_flag.has_value()) {
594
41113
      stream.filter_manager_.streamInfo().setResponseFlag(response_flag.value());
595
41113
    }
596
44730
    stream.onResetStream(StreamResetReason::ConnectionTermination, absl::string_view());
597
44730
  }
598
667
}
599

            
600
44105
void ConnectionManagerImpl::onEvent(Network::ConnectionEvent event) {
601
44105
  if (event == Network::ConnectionEvent::LocalClose) {
602
3124
    stats_.named_.downstream_cx_destroy_local_.inc();
603
3124
  }
604

            
605
44105
  if (event == Network::ConnectionEvent::RemoteClose ||
606
44105
      event == Network::ConnectionEvent::LocalClose) {
607

            
608
22386
    std::string details;
609
22386
    if (event == Network::ConnectionEvent::RemoteClose) {
610
19262
      remote_close_ = true;
611
19262
      stats_.named_.downstream_cx_destroy_remote_.inc();
612
19262
      details = StreamInfo::ResponseCodeDetails::get().DownstreamRemoteDisconnect;
613
19553
    } else {
614
3124
      absl::string_view local_close_reason = read_callbacks_->connection().localCloseReason();
615
3124
      ENVOY_BUG(!local_close_reason.empty(), "Local Close Reason was not set!");
616
3124
      details = fmt::format(
617
3124
          fmt::runtime(StreamInfo::ResponseCodeDetails::get().DownstreamLocalDisconnect),
618
3124
          StringUtil::replaceAllEmptySpace(local_close_reason));
619
3124
    }
620

            
621
    // TODO(mattklein123): It is technically possible that something outside of the filter causes
622
    // a local connection close, so we still guard against that here. A better solution would be to
623
    // have some type of "pre-close" callback that we could hook for cleanup that would get called
624
    // regardless of where local close is invoked from.
625
    // NOTE: that this will cause doConnectionClose() to get called twice in the common local close
626
    // cases, but the method protects against that.
627
    // NOTE: In the case where a local close comes from outside the filter, this will cause any
628
    // stream closures to increment remote close stats. We should do better here in the future,
629
    // via the pre-close callback mentioned above.
630
22386
    doConnectionClose(absl::nullopt, StreamInfo::CoreResponseFlag::DownstreamConnectionTermination,
631
22386
                      details);
632
22386
  }
633
44105
}
634

            
635
void ConnectionManagerImpl::doConnectionClose(
636
    absl::optional<Network::ConnectionCloseType> close_type,
637
26601
    absl::optional<StreamInfo::CoreResponseFlag> response_flag, absl::string_view details) {
638
26601
  if (connection_idle_timer_) {
639
21789
    connection_idle_timer_->disableTimer();
640
21789
    connection_idle_timer_.reset();
641
21789
  }
642

            
643
26601
  if (connection_duration_timer_) {
644
85
    connection_duration_timer_->disableTimer();
645
85
    connection_duration_timer_.reset();
646
85
  }
647

            
648
26601
  if (drain_timer_) {
649
187
    drain_timer_->disableTimer();
650
187
    drain_timer_.reset();
651
187
  }
652

            
653
26601
  if (!streams_.empty()) {
654
667
    const Network::ConnectionEvent event = close_type.has_value()
655
667
                                               ? Network::ConnectionEvent::LocalClose
656
667
                                               : Network::ConnectionEvent::RemoteClose;
657
667
    if (event == Network::ConnectionEvent::LocalClose) {
658
377
      stats_.named_.downstream_cx_destroy_local_active_rq_.inc();
659
377
    }
660
667
    if (event == Network::ConnectionEvent::RemoteClose) {
661
290
      stats_.named_.downstream_cx_destroy_remote_active_rq_.inc();
662
290
    }
663

            
664
667
    stats_.named_.downstream_cx_destroy_active_rq_.inc();
665
667
    user_agent_.onConnectionDestroy(event, true);
666
    // Note that resetAllStreams() does not actually write anything to the wire. It just resets
667
    // all upstream streams and their filter stacks. Thus, there are no issues around recursive
668
    // entry.
669
667
    resetAllStreams(response_flag, details);
670
667
  }
671

            
672
26601
  if (close_type.has_value()) {
673
4215
    read_callbacks_->connection().close(close_type.value(), details);
674
4215
  }
675
26601
}
676

            
677
96764
bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const {
678
  // Check if the request was prematurely reset, by comparing its lifetime to the configured
679
  // threshold.
680
96764
  ASSERT(!stream.state_.is_internally_destroyed_);
681
96764
  absl::optional<std::chrono::nanoseconds> duration =
682
96764
      stream.filter_manager_.streamInfo().currentDuration();
683

            
684
  // Check if request lifetime is longer than the premature reset threshold.
685
96764
  if (duration) {
686
96764
    const uint64_t lifetime = std::chrono::duration_cast<std::chrono::seconds>(*duration).count();
687
96764
    const uint64_t min_lifetime = runtime_.snapshot().getInteger(
688
96764
        ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1);
689
96764
    if (lifetime > min_lifetime) {
690
34419
      return false;
691
34419
    }
692
96764
  }
693

            
694
  // If request has completed before configured threshold, also check if the Envoy proxied the
695
  // response from the upstream. Requests without the response status were reset.
696
  // TODO(RyanTheOptimist): Possibly support half_closed_local instead.
697
62345
  return !stream.filter_manager_.streamInfo().responseCode();
698
96764
}
699

            
700
// Sends a GOAWAY if too many streams have been reset prematurely on this
701
// connection.
702
94107
void ConnectionManagerImpl::maybeDrainDueToPrematureResets() {
703
  // If the connection has been drained due to premature resets, do not check this again.
704
  // Without this flag, recursion may occur, as shown in the following stack trace:
705
  //
706
  //   maybeDrainDueToPrematureResets()
707
  //   doConnectionClose()
708
  //   resetAllStreams()
709
  //   onResetStream()
710
  //   doDeferredStreamDestroy()
711
  //   maybeDrainDueToPrematureResets()
712
  //   ...
713
  //
714
  // The recursion will continue until all streams are destroyed. If there are many streams
715
  // that may result in a stack overflow. This flag is used to avoid above recursion.
716
94107
  if (drained_due_to_premature_resets_) {
717
3600
    return;
718
3600
  }
719

            
720
90507
  if (closed_non_internally_destroyed_requests_ == 0) {
721
284
    return;
722
284
  }
723

            
724
90223
  const uint64_t limit =
725
90223
      runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500);
726

            
727
90223
  if (closed_non_internally_destroyed_requests_ < limit) {
728
    // Even though the total number of streams have not reached `limit`, check if the number of bad
729
    // streams is high enough that even if every subsequent stream is good, the connection
730
    // would be closed once the limit is reached, and if so close the connection now.
731
37310
    if (number_premature_stream_resets_ * 2 < limit) {
732
36304
      return;
733
36304
    }
734
82516
  } else {
735
52913
    if (number_premature_stream_resets_ * 2 < closed_non_internally_destroyed_requests_) {
736
20821
      return;
737
20821
    }
738
52913
  }
739

            
740
33098
  if (read_callbacks_->connection().state() == Network::Connection::State::Open) {
741
8
    stats_.named_.downstream_rq_too_many_premature_resets_.inc();
742

            
743
    // Mark the the connection has been drained due to too many premature resets.
744
8
    drained_due_to_premature_resets_ = true;
745

            
746
8
    doConnectionClose(Network::ConnectionCloseType::Abort, absl::nullopt,
747
8
                      "too_many_premature_resets");
748
8
  }
749
33098
}
750

            
751
1
void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) {
752
  // Currently we do nothing with remote go away frames. In the future we can decide to no longer
753
  // push resources if applicable.
754
1
}
755

            
756
38
void ConnectionManagerImpl::onIdleTimeout() {
757
38
  ENVOY_CONN_LOG(debug, "idle timeout", read_callbacks_->connection());
758
38
  stats_.named_.downstream_cx_idle_timeout_.inc();
759
38
  if (!codec_) {
760
    // No need to delay close after flushing since an idle timeout has already fired. Attempt to
761
    // write out buffered data one last time and issue a local close if successful.
762
1
    doConnectionClose(Network::ConnectionCloseType::FlushWrite, absl::nullopt,
763
1
                      StreamInfo::LocalCloseReasons::get().IdleTimeoutOnConnection);
764
38
  } else if (drain_state_ == DrainState::NotDraining) {
765
37
    startDrainSequence();
766
37
  }
767
38
}
768

            
769
85
void ConnectionManagerImpl::onConnectionDurationTimeout() {
770
85
  ENVOY_CONN_LOG(debug, "max connection duration reached", read_callbacks_->connection());
771
85
  stats_.named_.downstream_cx_max_duration_reached_.inc();
772
85
  if (!codec_) {
773
    // Attempt to write out buffered data one last time and issue a local close if successful.
774
8
    doConnectionClose(Network::ConnectionCloseType::FlushWrite,
775
8
                      StreamInfo::CoreResponseFlag::DurationTimeout,
776
8
                      StreamInfo::ResponseCodeDetails::get().DurationTimeout);
777
78
  } else if (drain_state_ == DrainState::NotDraining) {
778
77
    if (config_->http1SafeMaxConnectionDuration() && codec_->protocol() < Protocol::Http2) {
779
9
      ENVOY_CONN_LOG(debug,
780
9
                     "HTTP1-safe max connection duration is configured -- skipping drain sequence.",
781
9
                     read_callbacks_->connection());
782
9
      stats_.named_.downstream_cx_http1_soft_drain_.inc();
783
9
      soft_drain_http1_ = true;
784
70
    } else {
785
68
      startDrainSequence();
786
68
    }
787
77
  }
788
85
}
789

            
790
124
void ConnectionManagerImpl::onDrainTimeout() {
791
124
  ASSERT(drain_state_ != DrainState::NotDraining);
792
124
  codec_->goAway();
793
124
  drain_state_ = DrainState::Closing;
794
124
  checkForDeferredClose(false);
795
124
}
796

            
797
18
void ConnectionManagerImpl::sendGoAwayAndClose(bool graceful) {
798
18
  ENVOY_CONN_LOG(trace, "connection manager sendGoAwayAndClose was triggerred from filters.",
799
18
                 read_callbacks_->connection());
800
18
  if (go_away_sent_) {
801
    return;
802
  }
803

            
804
  // Use graceful drain sequence if graceful shutdown is requested.
805
  // startDrainSequence() works for both HTTP/1 and HTTP/2:
806
  // - HTTP/1: shutdownNotice() + goAway() provides graceful close
807
  // - HTTP/2: shutdownNotice() sends GOAWAY with high stream ID, then goAway() sends final GOAWAY
808
18
  if (graceful) {
809
1
    if (drain_state_ == DrainState::NotDraining) {
810
1
      startDrainSequence();
811
1
    }
812
    // Consider the "go away" process started once draining begins.
813
    // The actual GOAWAY frame will be sent in onDrainTimeout(), but we want to
814
    // prevent multiple calls to sendGoAwayAndClose() from starting multiple drain sequences.
815
1
    go_away_sent_ = true;
816
18
  } else {
817
    // Immediate close - send GOAWAY and close immediately
818
17
    codec_->shutdownNotice();
819
17
    codec_->goAway();
820
17
    go_away_sent_ = true;
821
17
    doConnectionClose(Network::ConnectionCloseType::FlushWriteAndDelay, absl::nullopt,
822
17
                      "forced_goaway");
823
17
  }
824
18
}
825

            
826
void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_reason,
827
271
                                               ConnectionManagerTracingStats& tracing_stats) {
828
271
  switch (tracing_reason) {
829
1
  case Tracing::Reason::ClientForced:
830
1
    tracing_stats.client_enabled_.inc();
831
1
    break;
832
40
  case Tracing::Reason::Sampling:
833
40
    tracing_stats.random_sampling_.inc();
834
40
    break;
835
11
  case Tracing::Reason::ServiceForced:
836
11
    tracing_stats.service_forced_.inc();
837
11
    break;
838
219
  default:
839
219
    tracing_stats.not_traceable_.inc();
840
219
    break;
841
271
  }
842
271
}
843

            
844
absl::optional<absl::string_view>
845
3031
ConnectionManagerImpl::HttpStreamIdProviderImpl::toStringView() const {
846
3031
  if (parent_.request_headers_ == nullptr) {
847
1
    return {};
848
1
  }
849
3030
  ASSERT(parent_.connection_manager_.config_->requestIDExtension() != nullptr);
850
3030
  return parent_.connection_manager_.config_->requestIDExtension()->get(*parent_.request_headers_);
851
3031
}
852

            
853
1
absl::optional<uint64_t> ConnectionManagerImpl::HttpStreamIdProviderImpl::toInteger() const {
854
1
  if (parent_.request_headers_ == nullptr) {
855
    return {};
856
  }
857
1
  ASSERT(parent_.connection_manager_.config_->requestIDExtension() != nullptr);
858
1
  return parent_.connection_manager_.config_->requestIDExtension()->getInteger(
859
1
      *parent_.request_headers_);
860
1
}
861

            
862
namespace {
863
constexpr absl::string_view kRouteFactoryName = "envoy.route_config_update_requester.default";
864
} // namespace
865

            
866
ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager,
867
                                                  uint32_t buffer_limit,
868
                                                  Buffer::BufferMemoryAccountSharedPtr account)
869
94107
    : connection_manager_(connection_manager),
870
94107
      connection_manager_tracing_config_(connection_manager_.config_->tracingConfig() == nullptr
871
94107
                                             ? absl::nullopt
872
94107
                                             : makeOptRef<const TracingConnectionManagerConfig>(
873
346
                                                   *connection_manager_.config_->tracingConfig())),
874
94107
      stream_id_(connection_manager.random_generator_.random()),
875
94107
      filter_manager_(*this, *connection_manager_.dispatcher_,
876
94107
                      connection_manager_.read_callbacks_->connection(), stream_id_,
877
94107
                      std::move(account), connection_manager_.config_->proxy100Continue(),
878
94107
                      buffer_limit, connection_manager_.config_->filterFactory(),
879
94107
                      connection_manager_.config_->localReply(),
880
94107
                      connection_manager_.codec_->protocol(), connection_manager_.timeSource(),
881
94107
                      connection_manager_.read_callbacks_->connection().streamInfo().filterState(),
882
94107
                      connection_manager_.overload_manager_),
883
94107
      request_response_timespan_(new Stats::HistogramCompletableTimespanImpl(
884
94107
          connection_manager_.stats_.named_.downstream_rq_time_, connection_manager_.timeSource())),
885
      has_explicit_global_flush_timeout_(
886
94107
          connection_manager.config_->streamFlushTimeout().has_value()),
887
      header_validator_(
888
94107
          connection_manager.config_->makeHeaderValidator(connection_manager.codec_->protocol())),
889
94107
      trace_refresh_after_route_refresh_(Runtime::runtimeFeatureEnabled(
890
94107
          "envoy.reloadable_features.trace_refresh_after_route_refresh")) {
891
94107
  ASSERT(!connection_manager.config_->isRoutable() ||
892
94107
             ((connection_manager.config_->routeConfigProvider() == nullptr &&
893
94107
               connection_manager.config_->scopedRouteConfigProvider() != nullptr &&
894
94107
               connection_manager.config_->scopeKeyBuilder().has_value()) ||
895
94107
              (connection_manager.config_->routeConfigProvider() != nullptr &&
896
94107
               connection_manager.config_->scopedRouteConfigProvider() == nullptr &&
897
94107
               !connection_manager.config_->scopeKeyBuilder().has_value())),
898
94107
         "Either routeConfigProvider or (scopedRouteConfigProvider and scopeKeyBuilder) should be "
899
94107
         "set in "
900
94107
         "ConnectionManagerImpl.");
901

            
902
94107
  filter_manager_.streamInfo().setStreamIdProvider(
903
94107
      std::make_shared<HttpStreamIdProviderImpl>(*this));
904

            
905
94107
  filter_manager_.streamInfo().setShouldSchemeMatchUpstream(
906
94107
      connection_manager.config_->shouldSchemeMatchUpstream());
907

            
908
  // TODO(chaoqin-li1123): can this be moved to the on demand filter?
909
94107
  auto factory = Envoy::Config::Utility::getFactoryByName<RouteConfigUpdateRequesterFactory>(
910
94107
      kRouteFactoryName);
911
94107
  if (connection_manager_.config_->isRoutable() &&
912
94107
      connection_manager.config_->routeConfigProvider() != nullptr && factory) {
913
93483
    route_config_update_requester_ = factory->createRouteConfigUpdateRequester(
914
93483
        connection_manager.config_->routeConfigProvider());
915
93890
  } else if (connection_manager_.config_->isRoutable() &&
916
624
             connection_manager.config_->scopedRouteConfigProvider() != nullptr &&
917
624
             connection_manager.config_->scopeKeyBuilder().has_value() && factory) {
918
183
    route_config_update_requester_ = factory->createRouteConfigUpdateRequester(
919
183
        connection_manager.config_->scopedRouteConfigProvider(),
920
183
        connection_manager.config_->scopeKeyBuilder());
921
183
  }
922
94107
  ScopeTrackerScopeState scope(this,
923
94107
                               connection_manager_.read_callbacks_->connection().dispatcher());
924

            
925
94107
  connection_manager_.stats_.named_.downstream_rq_total_.inc();
926
94107
  connection_manager_.stats_.named_.downstream_rq_active_.inc();
927
94107
  if (connection_manager_.codec_->protocol() == Protocol::Http2) {
928
74744
    connection_manager_.stats_.named_.downstream_rq_http2_total_.inc();
929
88791
  } else if (connection_manager_.codec_->protocol() == Protocol::Http3) {
930
2529
    connection_manager_.stats_.named_.downstream_rq_http3_total_.inc();
931
18033
  } else {
932
16834
    connection_manager_.stats_.named_.downstream_rq_http1_total_.inc();
933
16834
  }
934

            
935
94107
  if (connection_manager_.config_->streamIdleTimeout().count()) {
936
93335
    idle_timeout_ms_ = connection_manager_.config_->streamIdleTimeout();
937
93335
    stream_idle_timer_ = connection_manager_.dispatcher_->createScaledTimer(
938
93335
        Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
939
93335
        [this]() -> void { onIdleTimeout(); });
940
93335
    resetIdleTimer();
941
93335
  }
942

            
943
94107
  if (connection_manager_.config_->requestTimeout().count()) {
944
71
    std::chrono::milliseconds request_timeout = connection_manager_.config_->requestTimeout();
945
71
    request_timer_ =
946
71
        connection_manager.dispatcher_->createTimer([this]() -> void { onRequestTimeout(); });
947
71
    request_timer_->enableTimer(request_timeout, this);
948
71
  }
949

            
950
94107
  if (connection_manager_.config_->requestHeadersTimeout().count()) {
951
2
    std::chrono::milliseconds request_headers_timeout =
952
2
        connection_manager_.config_->requestHeadersTimeout();
953
2
    request_header_timer_ =
954
2
        connection_manager.dispatcher_->createTimer([this]() -> void { onRequestHeaderTimeout(); });
955
2
    request_header_timer_->enableTimer(request_headers_timeout, this);
956
2
  }
957

            
958
94107
  const auto max_stream_duration = connection_manager_.config_->maxStreamDuration();
959
94107
  if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
960
38
    max_stream_duration_timer_ = connection_manager.dispatcher_->createTimer(
961
38
        [this]() -> void { onStreamMaxDurationReached(); });
962
38
    max_stream_duration_timer_->enableTimer(
963
38
        connection_manager_.config_->maxStreamDuration().value(), this);
964
38
  }
965

            
966
94107
  if (connection_manager_.config_->accessLogFlushInterval().has_value()) {
967
11
    access_log_flush_timer_ = connection_manager.dispatcher_->createTimer([this]() -> void {
968
      // If the request is complete, we've already done the stream-end access-log, and shouldn't
969
      // do the periodic log.
970
11
      if (!streamInfo().requestComplete().has_value()) {
971
11
        log(AccessLog::AccessLogType::DownstreamPeriodic);
972
11
        refreshAccessLogFlushTimer();
973
11
      }
974
11
      const SystemTime now = connection_manager_.timeSource().systemTime();
975
      // Downstream bytes meter is guaranteed to be non-null because ActiveStream and the timer
976
      // event are created on the same thread that sets the meter in
977
      // ConnectionManagerImpl::newStream.
978
11
      filter_manager_.streamInfo().getDownstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(
979
11
          now);
980
11
      if (auto& upstream_bytes_meter = filter_manager_.streamInfo().getUpstreamBytesMeter();
981
11
          upstream_bytes_meter != nullptr) {
982
11
        upstream_bytes_meter->takeDownstreamPeriodicLoggingSnapshot(now);
983
11
      }
984
11
    });
985
10
    refreshAccessLogFlushTimer();
986
10
  }
987
94107
}
988

            
989
92627
void ConnectionManagerImpl::ActiveStream::log(AccessLog::AccessLogType type) {
990
92627
  const Formatter::Context log_context{
991
92627
      request_headers_.get(), response_headers_.get(), response_trailers_.get(), {}, type,
992
92627
      active_span_.get()};
993

            
994
92627
  filter_manager_.log(log_context);
995

            
996
92802
  for (const auto& access_logger : connection_manager_.config_->accessLogs()) {
997
72921
    access_logger->log(log_context, filter_manager_.streamInfo());
998
72921
  }
999
92627
}
94107
void ConnectionManagerImpl::ActiveStream::completeRequest() {
94107
  filter_manager_.streamInfo().onRequestComplete();
94107
  connection_manager_.stats_.named_.downstream_rq_active_.dec();
94107
  if (filter_manager_.streamInfo().healthCheck()) {
173
    connection_manager_.config_->tracingStats().health_check_.inc();
173
  }
94107
  if (active_span_) {
52
    Tracing::HttpTracerUtility::finalizeDownstreamSpan(
52
        *active_span_, request_headers_.get(), response_headers_.get(), response_trailers_.get(),
52
        filter_manager_.streamInfo(), *this);
52
  }
94107
  if (state_.successful_upgrade_) {
601
    connection_manager_.stats_.named_.downstream_cx_upgrades_active_.dec();
601
  }
94107
}
564777
void ConnectionManagerImpl::ActiveStream::resetIdleTimer() {
564777
  if (stream_idle_timer_ != nullptr) {
    // TODO(htuch): If this shows up in performance profiles, optimize by only
    // updating a timestamp here and doing periodic checks for idle timeouts
    // instead, or reducing the accuracy of timers.
557423
    stream_idle_timer_->enableTimer(idle_timeout_ms_);
557423
  }
564777
}
184
void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
184
  connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();
184
  filter_manager_.streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout);
184
  sendLocalReply(
184
      Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
184
      "stream timeout", nullptr, absl::nullopt,
184
      StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
184
}
27
void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
27
  connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
27
  sendLocalReply(
27
      Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
27
      "request timeout", nullptr, absl::nullopt,
27
      StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
27
}
1
void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
1
  connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
1
  sendLocalReply(
1
      Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
1
      "request header timeout", nullptr, absl::nullopt,
1
      StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
1
}
32
void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
32
  ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
32
  connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
32
  sendLocalReply(
32
      Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
32
      "downstream duration timeout", nullptr, Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
32
      StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
32
}
47598
void ConnectionManagerImpl::ActiveStream::chargeStats(const ResponseHeaderMap& headers) {
47598
  if (trace_refresh_after_route_refresh_ && connection_manager_tracing_config_.has_value()) {
266
    const Tracing::Decision tracing_decision =
266
        Tracing::TracerUtility::shouldTraceRequest(filter_manager_.streamInfo());
266
    ConnectionManagerImpl::chargeTracingStats(tracing_decision.reason,
266
                                              connection_manager_.config_->tracingStats());
266
  }
47598
  uint64_t response_code = Utility::getResponseStatus(headers);
47598
  filter_manager_.streamInfo().setResponseCode(response_code);
47598
  if (filter_manager_.streamInfo().healthCheck()) {
173
    return;
173
  }
  // No response is sent back downstream for internal redirects, so don't charge downstream stats.
47425
  const absl::optional<std::string>& response_code_details =
47425
      filter_manager_.streamInfo().responseCodeDetails();
47425
  if (response_code_details.has_value() &&
47425
      response_code_details == Envoy::StreamInfo::ResponseCodeDetails::get().InternalRedirect) {
151
    return;
151
  }
47274
  connection_manager_.stats_.named_.downstream_rq_completed_.inc();
47274
  connection_manager_.listener_stats_.downstream_rq_completed_.inc();
47274
  if (CodeUtility::is1xx(response_code)) {
253
    connection_manager_.stats_.named_.downstream_rq_1xx_.inc();
253
    connection_manager_.listener_stats_.downstream_rq_1xx_.inc();
47029
  } else if (CodeUtility::is2xx(response_code)) {
37364
    connection_manager_.stats_.named_.downstream_rq_2xx_.inc();
37364
    connection_manager_.listener_stats_.downstream_rq_2xx_.inc();
42053
  } else if (CodeUtility::is3xx(response_code)) {
194
    connection_manager_.stats_.named_.downstream_rq_3xx_.inc();
194
    connection_manager_.listener_stats_.downstream_rq_3xx_.inc();
9563
  } else if (CodeUtility::is4xx(response_code)) {
5714
    connection_manager_.stats_.named_.downstream_rq_4xx_.inc();
5714
    connection_manager_.listener_stats_.downstream_rq_4xx_.inc();
8934
  } else if (CodeUtility::is5xx(response_code)) {
3737
    connection_manager_.stats_.named_.downstream_rq_5xx_.inc();
3737
    connection_manager_.listener_stats_.downstream_rq_5xx_.inc();
3737
  }
47274
}
91837
const Network::Connection* ConnectionManagerImpl::ActiveStream::connection() {
91837
  return &connection_manager_.read_callbacks_->connection();
91837
}
90365
uint32_t ConnectionManagerImpl::ActiveStream::localPort() {
90365
  auto ip = connection()->connectionInfoProvider().localAddress()->ip();
90365
  if (ip == nullptr) {
15
    return 0;
15
  }
90350
  return ip->port();
90365
}
namespace {
5
bool streamErrorOnlyErrors(absl::string_view error_details) {
  // Pre UHV HCM did not respect stream_error_on_invalid_http_message
  // and only sent 400 for specific errors.
  // TODO(#28555): make these errors respect the stream_error_on_invalid_http_message
5
  return error_details == UhvResponseCodeDetail::get().FragmentInUrlPath ||
5
         error_details == UhvResponseCodeDetail::get().EscapedSlashesInPath ||
5
         error_details == UhvResponseCodeDetail::get().Percent00InPath;
5
}
} // namespace
51
void ConnectionManagerImpl::ActiveStream::setRequestDecorator(RequestHeaderMap& headers) {
51
  ASSERT(active_span_ != nullptr);
51
  const Router::Decorator* decorater = route_decorator_;
  // If a decorator has been defined, apply it to the active span.
51
  absl::string_view decorated_operation;
51
  if (decorater != nullptr) {
11
    decorated_operation = decorater->getOperation();
11
    decorater->apply(*active_span_);
11
    state_.decorated_propagate_ = decorater->propagate();
11
  }
51
  if (connection_manager_tracing_config_->operation_name_ == Tracing::OperationName::Egress) {
    // For egress (outbound) requests, pass the decorator's operation name (if defined and
    // propagation enabled) as a request header to enable the receiving service to use it in its
    // server span.
3
    if (!decorated_operation.empty() && state_.decorated_propagate_) {
2
      headers.setEnvoyDecoratorOperation(decorated_operation);
2
    }
48
  } else {
48
    absl::string_view req_operation_override = headers.getEnvoyDecoratorOperationValue();
    // For ingress (inbound) requests, if a decorator operation name has been provided, it
    // should be used to override the active span's operation.
48
    if (!req_operation_override.empty()) {
1
      active_span_->setOperation(req_operation_override);
      // Set the decorator operation as overridden to avoid propagating the route decorator
      // operation to the client when the setResponseDecorator() is called.
1
      state_.decorator_overriden_ = true;
1
    }
    // Remove header so not propagated to service
48
    headers.removeEnvoyDecoratorOperation();
48
  }
51
}
49
void ConnectionManagerImpl::ActiveStream::setResponseDecorator(ResponseHeaderMap& headers) {
49
  ASSERT(active_span_ != nullptr);
49
  if (connection_manager_tracing_config_->operation_name_ == Tracing::OperationName::Ingress) {
    // For ingress (inbound) responses, if the request headers do not include a
    // decorator operation (override), and the decorated operation should be
    // propagated, then pass the decorator's operation name (if defined)
    // as a response header to enable the client service to use it in its client span.
46
    if (state_.decorated_propagate_ && !state_.decorator_overriden_) {
44
      absl::string_view decorated_operation =
44
          route_decorator_ != nullptr ? route_decorator_->getOperation() : absl::string_view();
44
      if (!decorated_operation.empty()) {
        // If the decorator operation is defined, set it as the response header.
4
        headers.setEnvoyDecoratorOperation(decorated_operation);
4
      }
44
    }
46
  } else if (connection_manager_tracing_config_->operation_name_ ==
3
             Tracing::OperationName::Egress) {
3
    const absl::string_view resp_operation_override = headers.getEnvoyDecoratorOperationValue();
    // For Egress (outbound) response, if a decorator operation name has been provided, it
    // should be used to override the active span's operation.
3
    if (!resp_operation_override.empty()) {
1
      active_span_->setOperation(resp_operation_override);
1
    }
    // Remove header so not propagated to service.
3
    headers.removeEnvoyDecoratorOperation();
3
  }
49
}
91514
bool ConnectionManagerImpl::ActiveStream::validateHeaders() {
91514
  if (header_validator_) {
10
    auto validation_result = header_validator_->validateRequestHeaders(*request_headers_);
10
    bool failure = !validation_result.ok();
10
    bool redirect = false;
10
    bool is_grpc = Grpc::Common::hasGrpcContentType(*request_headers_);
10
    std::string failure_details(validation_result.details());
10
    if (!failure) {
7
      auto transformation_result = header_validator_->transformRequestHeaders(*request_headers_);
7
      failure = !transformation_result.ok();
7
      redirect = transformation_result.action() ==
7
                 Http::ServerHeaderValidator::RequestHeadersTransformationResult::Action::Redirect;
7
      failure_details = std::string(transformation_result.details());
7
      if (redirect && !is_grpc) {
1
        connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
6
      } else if (failure) {
1
        connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
1
      }
7
    }
10
    if (failure) {
5
      std::function<void(ResponseHeaderMap & headers)> modify_headers;
5
      Code response_code = failure_details == Http1ResponseCodeDetail::get().InvalidTransferEncoding
5
                               ? Code::NotImplemented
5
                               : Code::BadRequest;
5
      absl::optional<Grpc::Status::GrpcStatus> grpc_status;
5
      if (redirect && !is_grpc) {
1
        response_code = Code::TemporaryRedirect;
1
        modify_headers = [new_path = request_headers_->Path()->value().getStringView()](
1
                             Http::ResponseHeaderMap& response_headers) -> void {
1
          response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
1
        };
5
      } else if (is_grpc) {
2
        grpc_status = Grpc::Status::WellKnownGrpcStatus::Internal;
2
      }
5
      filter_manager_.streamInfo().setResponseFlag(
5
          StreamInfo::CoreResponseFlag::DownstreamProtocolError);
      // H/2 codec was resetting requests that were rejected due to headers with underscores,
      // instead of sending 400. Preserving this behavior for now.
      // TODO(#24466): Make H/2 behavior consistent with H/1 and H/3.
5
      if (failure_details == UhvResponseCodeDetail::get().InvalidUnderscore &&
5
          connection_manager_.codec_->protocol() == Protocol::Http2) {
        filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
        resetStream();
5
      } else {
5
        sendLocalReply(response_code, "", modify_headers, grpc_status, failure_details);
5
        if (!response_encoder_->streamErrorOnInvalidHttpMessage() &&
5
            !streamErrorOnlyErrors(failure_details)) {
5
          connection_manager_.handleCodecError(failure_details);
5
        }
5
      }
5
      return false;
5
    }
10
  }
91509
  return true;
91514
}
493
bool ConnectionManagerImpl::ActiveStream::validateTrailers(RequestTrailerMap& trailers) {
493
  if (!header_validator_) {
489
    return true;
489
  }
4
  auto validation_result = header_validator_->validateRequestTrailers(trailers);
4
  std::string failure_details(validation_result.details());
4
  if (validation_result.ok()) {
1
    auto transformation_result = header_validator_->transformRequestTrailers(trailers);
1
    if (transformation_result.ok()) {
      return true;
    }
1
    failure_details = std::string(transformation_result.details());
1
  }
4
  Code response_code = Code::BadRequest;
4
  absl::optional<Grpc::Status::GrpcStatus> grpc_status;
4
  if (Grpc::Common::hasGrpcContentType(*request_headers_)) {
    grpc_status = Grpc::Status::WellKnownGrpcStatus::Internal;
  }
4
  filter_manager_.streamInfo().setResponseFlag(
4
      StreamInfo::CoreResponseFlag::DownstreamProtocolError);
  // H/2 codec was resetting requests that were rejected due to headers with underscores,
  // instead of sending 400. Preserving this behavior for now.
  // TODO(#24466): Make H/2 behavior consistent with H/1 and H/3.
4
  if (failure_details == UhvResponseCodeDetail::get().InvalidUnderscore &&
4
      connection_manager_.codec_->protocol() == Protocol::Http2) {
    filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
    resetStream();
4
  } else {
    // TODO(#24735): Harmonize H/2 and H/3 behavior with H/1
4
    if (connection_manager_.codec_->protocol() < Protocol::Http2) {
2
      sendLocalReply(response_code, "", nullptr, grpc_status, failure_details);
4
    } else {
2
      filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
2
      resetStream();
2
    }
4
    if (!response_encoder_->streamErrorOnInvalidHttpMessage()) {
4
      connection_manager_.handleCodecError(failure_details);
4
    }
4
  }
4
  return false;
4
}
334119
void ConnectionManagerImpl::ActiveStream::maybeRecordLastByteReceived(bool end_stream) {
  // If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
334119
  if (end_stream && !filter_manager_.hasLastDownstreamByteReceived()) {
84990
    filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
84990
        connection_manager_.dispatcher_->timeSource());
84990
    ENVOY_STREAM_LOG(debug, "request end stream timestamp recorded", *this);
84990
  }
334119
}
// Ordering in this function is complicated, but important.
//
// We want to do minimal work before selecting route and creating a filter
// chain to maximize the number of requests which get custom filter behavior,
// e.g. registering access logging.
//
// This must be balanced by doing sanity checking for invalid requests (one
// can't route select properly without full headers), checking state required to
// serve error responses (connection close, head requests, etc), and
// modifications which may themselves affect route selection.
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPtr&& headers,
91514
                                                        bool end_stream) {
91514
  ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
91514
                   *headers);
  // We only want to record this when reading the headers the first time, not when recreating
  // a stream.
91514
  if (!filter_manager_.hasLastDownstreamByteReceived()) {
91200
    filter_manager_.streamInfo().downstreamTiming().onLastDownstreamHeaderRxByteReceived(
91200
        connection_manager_.dispatcher_->timeSource());
91200
  }
91514
  ScopeTrackerScopeState scope(this,
91514
                               connection_manager_.read_callbacks_->connection().dispatcher());
91514
  request_headers_ = std::move(headers);
91514
  filter_manager_.requestHeadersInitialized();
91514
  if (request_header_timer_ != nullptr) {
1
    request_header_timer_->disableTimer();
1
    request_header_timer_.reset();
1
  }
  // Both shouldDrainConnectionUponCompletion() and is_head_request_ affect local replies: set them
  // as early as possible.
91514
  const Protocol protocol = connection_manager_.codec_->protocol();
91514
  if (HeaderUtility::shouldCloseConnection(protocol, *request_headers_)) {
    // Only mark the connection to be closed if the request indicates so. The connection might
    // already be marked so before this step, in which case if shouldCloseConnection() returns
    // false, the stream info value shouldn't be overridden.
22
    filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
22
  }
91514
  filter_manager_.streamInfo().protocol(protocol);
  // We end the decode here to mark that the downstream stream is complete.
91514
  maybeRecordLastByteReceived(end_stream);
91514
  if (!validateHeaders()) {
5
    ENVOY_STREAM_LOG(debug, "request headers validation failed\n{}", *this, *request_headers_);
5
    return;
5
  }
  // We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
91509
  if (connection_manager_.config_->isRoutable()) {
91070
    if (connection_manager_.config_->routeConfigProvider() != nullptr) {
90888
      snapped_route_config_ = connection_manager_.config_->routeConfigProvider()->configCast();
90974
    } else if (connection_manager_.config_->scopedRouteConfigProvider() != nullptr &&
182
               connection_manager_.config_->scopeKeyBuilder().has_value()) {
182
      snapped_scoped_routes_config_ =
182
          connection_manager_.config_->scopedRouteConfigProvider()->config<Router::ScopedConfig>();
182
      snapScopedRouteConfig();
182
    }
91390
  } else {
439
    snapped_route_config_ = connection_manager_.config_->routeConfigProvider()->configCast();
439
  }
  // Drop new requests when overloaded as soon as we have decoded the headers.
91509
  const bool drop_request_due_to_overload =
91509
      (connection_manager_.accept_new_http_stream_ != nullptr &&
91509
       connection_manager_.accept_new_http_stream_->shouldShedLoad()) ||
91509
      connection_manager_.random_generator_.bernoulli(
91501
          connection_manager_.overload_stop_accepting_requests_ref_.value());
91509
  if (drop_request_due_to_overload) {
    // In this one special case, do not create the filter chain. If there is a risk of memory
    // overload it is more important to avoid unnecessary allocation than to create the filters.
49
    filter_manager_.skipFilterChainCreation();
49
    connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
49
    sendLocalReply(
49
        Http::Code::ServiceUnavailable, "envoy overloaded",
49
        [this](Http::ResponseHeaderMap& headers) {
49
          if (connection_manager_.config_->appendLocalOverload()) {
16
            headers.addReference(Http::Headers::get().EnvoyLocalOverloaded,
16
                                 Http::Headers::get().EnvoyOverloadedValues.True);
16
          }
49
        },
49
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().Overload);
49
    return;
49
  }
91460
  if (!connection_manager_.config_->proxy100Continue() && request_headers_->Expect() &&
      // The Expect field-value is case-insensitive.
      // https://tools.ietf.org/html/rfc7231#section-5.1.1
91460
      absl::EqualsIgnoreCase((request_headers_->Expect()->value().getStringView()),
40
                             Headers::get().ExpectValues._100Continue)) {
    // Note in the case Envoy is handling 100-Continue complexity, it skips the filter chain
    // and sends the 100-Continue directly to the encoder.
40
    chargeStats(continueHeader());
40
    response_encoder_->encode1xxHeaders(continueHeader());
    // Remove the Expect header so it won't be handled again upstream.
40
    request_headers_->removeExpect();
40
  }
91460
  connection_manager_.user_agent_.initializeFromHeaders(*request_headers_,
91460
                                                        connection_manager_.stats_.prefixStatName(),
91460
                                                        connection_manager_.stats_.scope_);
91460
  if (!request_headers_->Host()) {
    // Require host header. For HTTP/1.1 Host has already been translated to :authority.
1022
    sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
1022
                   StreamInfo::ResponseCodeDetails::get().MissingHost);
1022
    return;
1022
  }
  // Apply header sanity checks.
90438
  absl::optional<std::reference_wrapper<const absl::string_view>> error =
90438
      HeaderUtility::requestHeadersValid(*request_headers_);
90438
  if (error != absl::nullopt) {
6
    sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt, error.value().get());
6
    if (!response_encoder_->streamErrorOnInvalidHttpMessage()) {
6
      connection_manager_.handleCodecError(error.value().get());
6
    }
6
    return;
6
  }
  // Check for the existence of the :path header for non-CONNECT requests, or present-but-empty
  // :path header for CONNECT requests. We expect the codec to have broken the path into pieces if
  // applicable. NOTE: Currently the HTTP/1.1 codec only does this when the allow_absolute_url flag
  // is enabled on the HCM.
90432
  if ((!HeaderUtility::isConnect(*request_headers_) || request_headers_->Path()) &&
90432
      request_headers_->getPathValue().empty()) {
2
    sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
2
                   StreamInfo::ResponseCodeDetails::get().MissingPath);
2
    return;
2
  }
  // Rewrite the host of CONNECT-UDP requests.
90430
  if (HeaderUtility::isConnectUdpRequest(*request_headers_) &&
90430
      !HeaderUtility::rewriteAuthorityForConnectUdp(*request_headers_)) {
16
    sendLocalReply(Code::NotFound, "The path is incorrect for CONNECT-UDP", nullptr, absl::nullopt,
16
                   StreamInfo::ResponseCodeDetails::get().InvalidPath);
16
    return;
16
  }
  // Currently we only support relative paths at the application layer.
90414
  if (!request_headers_->getPathValue().empty() && request_headers_->getPathValue()[0] != '/') {
2
    connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
2
    sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
2
                   StreamInfo::ResponseCodeDetails::get().AbsolutePath);
2
    return;
2
  }
90412
#ifndef ENVOY_ENABLE_UHV
  // In UHV mode path normalization is done in the UHV
  // Path sanitization should happen before any path access other than the above sanity check.
90412
  const auto action =
90412
      ConnectionManagerUtility::maybeNormalizePath(*request_headers_, *connection_manager_.config_);
  // gRPC requests are rejected if Envoy is configured to redirect post-normalization. This is
  // because gRPC clients do not support redirect.
90412
  if (action == ConnectionManagerUtility::NormalizePathAction::Reject ||
90412
      (action == ConnectionManagerUtility::NormalizePathAction::Redirect &&
90370
       Grpc::Common::hasGrpcContentType(*request_headers_))) {
43
    connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
43
    sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
43
                   StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
43
    return;
90369
  } else if (action == ConnectionManagerUtility::NormalizePathAction::Redirect) {
4
    connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
4
    sendLocalReply(
4
        Code::TemporaryRedirect, "",
4
        [new_path = request_headers_->Path()->value().getStringView()](
4
            Http::ResponseHeaderMap& response_headers) -> void {
4
          response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
4
        },
4
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
4
    return;
4
  }
90365
  ASSERT(action == ConnectionManagerUtility::NormalizePathAction::Continue);
90365
#endif
90365
  auto optional_port = ConnectionManagerUtility::maybeNormalizeHost(
90365
      *request_headers_, *connection_manager_.config_, localPort());
90365
  if (optional_port.has_value() &&
90365
      requestWasConnect(request_headers_, connection_manager_.codec_->protocol())) {
11
    filter_manager_.streamInfo().filterState()->setData(
11
        Router::OriginalConnectPort::key(),
11
        std::make_unique<Router::OriginalConnectPort>(optional_port.value()),
11
        StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Request);
11
  }
90365
  if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
    // Modify the downstream remote address depending on configuration and headers.
90051
    const auto mutate_result = ConnectionManagerUtility::mutateRequestHeaders(
90051
        *request_headers_, connection_manager_.read_callbacks_->connection(),
90051
        *connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_,
90051
        filter_manager_.streamInfo());
    // IP detection failed, reject the request.
90051
    if (mutate_result.reject_request.has_value()) {
1
      const auto& reject_request_params = mutate_result.reject_request.value();
1
      connection_manager_.stats_.named_.downstream_rq_rejected_via_ip_detection_.inc();
1
      sendLocalReply(reject_request_params.response_code, reject_request_params.body, nullptr,
1
                     absl::nullopt,
1
                     StreamInfo::ResponseCodeDetails::get().OriginalIPDetectionFailed);
1
      return;
1
    }
90050
    filter_manager_.setDownstreamRemoteAddress(mutate_result.final_remote_address);
90050
  }
90364
  ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);
90364
  ASSERT(!cached_route_);
90364
  refreshCachedRoute();
90364
  if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
90050
    filter_manager_.streamInfo().setTraceReason(
90050
        ConnectionManagerUtility::mutateTracingRequestHeader(
90050
            *request_headers_, connection_manager_.runtime_, *connection_manager_.config_,
90050
            cached_route_.value().get()));
90050
  }
90364
  filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
90364
  const FilterManager::CreateChainResult create_chain_result =
90364
      filter_manager_.createDownstreamFilterChain();
90364
  if (create_chain_result.upgradeAccepted()) {
601
    connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
601
    connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
601
    state_.successful_upgrade_ = true;
601
  }
90364
  if (connection_manager_.config_->flushAccessLogOnNewRequest()) {
12
    log(AccessLog::AccessLogType::DownstreamStart);
12
  }
  // TODO if there are no filters when starting a filter iteration, the connection manager
  // should return 404. The current returns no response if there is no router filter.
90364
  if (hasCachedRoute()) {
    // Do not allow upgrades if the route does not support it.
87638
    if (create_chain_result.upgradeRejected()) {
      // While downstream servers should not send upgrade payload without the upgrade being
      // accepted, err on the side of caution and refuse to process any further requests on this
      // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
      // contains a smuggled HTTP request.
2
      filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
2
      connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
2
      sendLocalReply(Code::Forbidden, "", nullptr, absl::nullopt,
2
                     StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
2
      return;
2
    }
    // Allow non websocket requests to go through websocket enabled routes.
87638
  }
  // Check if tracing is enabled.
90362
  if (connection_manager_tracing_config_.has_value()) {
298
    traceRequest();
298
  }
90362
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
90362
  if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
49934
    filter_manager_.decodeHeaders(*request_headers_, end_stream);
87883
  } else {
40428
    state_.deferred_to_next_io_iteration_ = true;
40428
    state_.deferred_end_stream_ = end_stream;
40428
  }
  // Reset it here for both global and overridden cases.
90362
  resetIdleTimer();
90362
}
298
void ConnectionManagerImpl::ActiveStream::traceRequest() {
298
  ASSERT(connection_manager_tracing_config_.has_value());
298
  const Tracing::Decision tracing_decision =
298
      Tracing::TracerUtility::shouldTraceRequest(filter_manager_.streamInfo());
298
  if (!trace_refresh_after_route_refresh_) {
1
    ConnectionManagerImpl::chargeTracingStats(tracing_decision.reason,
1
                                              connection_manager_.config_->tracingStats());
1
  }
298
  Tracing::HttpTraceContext trace_context(*request_headers_);
298
  active_span_ = connection_manager_.tracer().startSpan(
298
      *this, trace_context, filter_manager_.streamInfo(), tracing_decision);
298
  if (!active_span_) {
246
    return;
246
  }
52
  if (hasCachedRoute()) {
52
    route_decorator_ = cached_route_.value()->decorator();
52
    route_tracing_ = cached_route_.value()->tracingConfig();
52
  }
52
  if (!operationNameFormatter(*connection_manager_tracing_config_, route_tracing_)) {
    // Only set decorator when there is no operation name formatter configured at either
    // the HCM level or the route level.
49
    setRequestDecorator(*request_headers_);
49
  }
52
}
242116
void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) {
242116
  ScopeTrackerScopeState scope(this,
242116
                               connection_manager_.read_callbacks_->connection().dispatcher());
242116
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
242116
  maybeRecordLastByteReceived(end_stream);
242116
  filter_manager_.streamInfo().addBytesReceived(data.length());
242116
  if (!state_.deferred_to_next_io_iteration_) {
241842
    filter_manager_.decodeData(data, end_stream);
241895
  } else {
274
    if (!deferred_data_) {
274
      deferred_data_ = std::make_unique<Buffer::OwnedImpl>();
274
    }
274
    deferred_data_->move(data);
274
    state_.deferred_end_stream_ = end_stream;
274
  }
242116
}
493
void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) {
493
  ENVOY_STREAM_LOG(debug, "request trailers complete:\n{}", *this, *trailers);
493
  ScopeTrackerScopeState scope(this,
493
                               connection_manager_.read_callbacks_->connection().dispatcher());
493
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
493
  resetIdleTimer();
493
  ASSERT(!request_trailers_);
493
  if (!validateTrailers(*trailers)) {
4
    ENVOY_STREAM_LOG(debug, "request trailers validation failed:\n{}", *this, *trailers);
4
    return;
4
  }
489
  maybeRecordLastByteReceived(true);
489
  if (!state_.deferred_to_next_io_iteration_) {
345
    request_trailers_ = std::move(trailers);
345
    filter_manager_.decodeTrailers(*request_trailers_);
444
  } else {
    // Save trailers in a different variable since `request_trailers_` is available to the filter
    // manager via `requestTrailers()` callback and makes filter manager see trailers prematurely
    // when deferred request is processed.
144
    deferred_request_trailers_ = std::move(trailers);
144
  }
489
}
864
void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
864
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
864
  resetIdleTimer();
864
  if (!state_.deferred_to_next_io_iteration_) {
    // After going through filters, the ownership of metadata_map will be passed to terminal filter.
    // The terminal filter may encode metadata_map to the next hop immediately or store metadata_map
    // and encode later when connection pool is ready.
806
    filter_manager_.decodeMetadata(*metadata_map);
826
  } else {
58
    deferred_metadata_.push(std::move(metadata_map));
58
  }
864
}
192878
void ConnectionManagerImpl::ActiveStream::disarmRequestTimeout() {
192878
  if (request_timer_) {
172
    request_timer_->disableTimer();
172
  }
192878
}
204
void ConnectionManagerImpl::startDrainSequence() {
204
  ASSERT(drain_state_ == DrainState::NotDraining);
204
  drain_state_ = DrainState::Draining;
204
  codec_->shutdownNotice();
204
  drain_timer_ = dispatcher_->createTimer([this]() -> void { onDrainTimeout(); });
204
  drain_timer_->enableTimer(config_->drainTimeout());
204
}
406
void ConnectionManagerImpl::ActiveStream::snapScopedRouteConfig() {
  // NOTE: if a RDS subscription hasn't got a RouteConfiguration back, a Router::NullConfigImpl is
  // returned, in that case we let it pass.
406
  auto scope_key =
406
      connection_manager_.config_->scopeKeyBuilder()->computeScopeKey(*request_headers_);
406
  snapped_route_config_ = snapped_scoped_routes_config_->getRouteConfig(scope_key);
406
  if (snapped_route_config_ == nullptr) {
158
    ENVOY_STREAM_LOG(trace, "can't find SRDS scope.", *this);
    // TODO(stevenzzzz): Consider to pass an error message to router filter, so that it can
    // send back 404 with some more details.
158
    snapped_route_config_ = std::make_shared<Router::NullConfigImpl>();
158
  }
406
}
90420
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { refreshCachedRoute(nullptr); }
93494
void ConnectionManagerImpl::ActiveStream::refreshDurationTimeout() {
93494
  if (!hasCachedRoute() || !request_headers_) {
5229
    return;
5229
  }
88265
  const Router::RouteEntry* route = cached_route_.value()->routeEntry();
88265
  if (route == nullptr) {
276
    return;
276
  }
87989
  auto grpc_timeout = Grpc::Common::getGrpcTimeout(*request_headers_);
87989
  std::chrono::milliseconds timeout;
87989
  bool disable_timer = false;
87989
  if (!grpc_timeout || !route->grpcTimeoutHeaderMax()) {
    // Either there is no grpc-timeout header or special timeouts for it are not
    // configured. Use stream duration.
87975
    if (route->maxStreamDuration()) {
2
      timeout = route->maxStreamDuration().value();
2
      if (timeout == std::chrono::milliseconds(0)) {
        // Explicitly configured 0 means no timeout.
        disable_timer = true;
      }
87973
    } else {
      // Fall back to HCM config. If no HCM duration limit exists, disable
      // timers set by any prior route configuration.
87973
      const auto max_stream_duration = connection_manager_.config_->maxStreamDuration();
87973
      if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
36
        timeout = max_stream_duration.value();
87937
      } else {
87937
        disable_timer = true;
87937
      }
87973
    }
87975
  } else {
    // Start with the timeout equal to the gRPC timeout header.
14
    timeout = grpc_timeout.value();
    // If there's a valid cap, apply it.
14
    if (timeout > route->grpcTimeoutHeaderMax().value() &&
14
        route->grpcTimeoutHeaderMax().value() != std::chrono::milliseconds(0)) {
5
      timeout = route->grpcTimeoutHeaderMax().value();
5
    }
    // Apply the configured offset.
14
    if (timeout != std::chrono::milliseconds(0) && route->grpcTimeoutHeaderOffset()) {
2
      const auto offset = route->grpcTimeoutHeaderOffset().value();
2
      if (offset < timeout) {
1
        timeout -= offset;
1
      } else {
1
        timeout = std::chrono::milliseconds(0);
1
      }
2
    }
14
  }
  // Disable any existing timer if configured to do so.
87989
  if (disable_timer) {
87937
    if (max_stream_duration_timer_) {
2
      max_stream_duration_timer_->disableTimer();
2
      if (route->usingNewTimeouts() && Grpc::Common::isGrpcRequestHeaders(*request_headers_)) {
        request_headers_->removeGrpcTimeout();
      }
2
    }
87937
    return;
87937
  }
  // Set the header timeout before doing used-time adjustments.
  // This may result in the upstream not getting the latest results, but also
  // avoids every request getting a custom timeout based on envoy think time.
52
  if (route->usingNewTimeouts() && Grpc::Common::isGrpcRequestHeaders(*request_headers_)) {
6
    Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(timeout), *request_headers_);
6
  }
  // See how long this stream has been alive, and adjust the timeout
  // accordingly.
52
  std::chrono::milliseconds time_used = std::chrono::duration_cast<std::chrono::milliseconds>(
52
      connection_manager_.timeSource().monotonicTime() -
52
      filter_manager_.streamInfo().startTimeMonotonic());
52
  if (timeout > time_used) {
49
    timeout -= time_used;
49
  } else {
3
    timeout = std::chrono::milliseconds(0);
3
  }
  // Finally create (if necessary) and enable the timer.
52
  if (!max_stream_duration_timer_) {
7
    max_stream_duration_timer_ = connection_manager_.dispatcher_->createTimer(
7
        [this]() -> void { onStreamMaxDurationReached(); });
7
  }
52
  max_stream_duration_timer_->enableTimer(timeout);
52
}
93478
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
  // If the cached route is blocked then any attempt to clear it or refresh it
  // will be ignored.
93478
  if (routeCacheBlocked()) {
    return;
  }
93478
  Router::VirtualHostRoute route_result;
93478
  if (request_headers_ != nullptr) {
92129
    if (connection_manager_.config_->isRoutable() &&
92129
        connection_manager_.config_->scopedRouteConfigProvider() != nullptr &&
92129
        connection_manager_.config_->scopeKeyBuilder().has_value()) {
      // NOTE: re-select scope as well in case the scope key header has been changed by a filter.
224
      snapScopedRouteConfig();
224
    }
92129
    if (snapped_route_config_ != nullptr) {
92124
      route_result = snapped_route_config_->route(cb, *request_headers_,
92124
                                                  filter_manager_.streamInfo(), stream_id_);
92124
    }
92129
  }
93478
  setVirtualHostRoute(std::move(route_result));
93478
}
93494
void ConnectionManagerImpl::ActiveStream::refreshTracing() {
93494
  if (!trace_refresh_after_route_refresh_) {
2
    return;
2
  }
93492
  if (!connection_manager_tracing_config_.has_value() || active_span_ == nullptr ||
93492
      request_headers_ == nullptr) {
93490
    return;
93490
  }
2
  ASSERT(cached_route_.has_value());
  // NOTE: if the trace reason have been encoded into the request id then the trace reason may
  // not be updated. That means we may cannot to force a traced request to be untraced by the
  // refreshing.
2
  const auto trace_reason = ConnectionManagerUtility::mutateTracingRequestHeader(
2
      *request_headers_, connection_manager_.runtime_, *connection_manager_.config_,
2
      cached_route_.value().get());
2
  filter_manager_.streamInfo().setTraceReason(trace_reason);
2
  const Tracing::Decision tracing_decision =
2
      Tracing::TracerUtility::shouldTraceRequest(filter_manager_.streamInfo());
2
  if (active_span_->useLocalDecision()) {
1
    active_span_->setSampled(tracing_decision.traced);
1
  }
2
  if (hasCachedRoute()) {
2
    route_decorator_ = cached_route_.value()->decorator();
2
    route_tracing_ = cached_route_.value()->tracingConfig();
2
  }
2
  if (!operationNameFormatter(*connection_manager_tracing_config_, route_tracing_)) {
    // Only set decorator when there is no operation name formatter configured at either
    // the HCM level or the route level.
2
    setRequestDecorator(*request_headers_);
2
  }
2
}
// TODO(chaoqin-li1123): Make on demand vhds and on demand srds works at the same time.
void ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate(
100
    Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
100
  ENVOY_BUG(route_config_update_requester_.has_value(),
100
            "RouteConfigUpdate requested but RDS not compiled into the binary. Try linking "
100
            "//source/common/http:rds_lib");
100
  if (route_config_update_requester_.has_value()) {
100
    (*route_config_update_requester_)
100
        ->requestRouteConfigUpdate(*this, route_config_updated_cb, routeConfig(),
100
                                   *connection_manager_.dispatcher_, *request_headers_);
100
  }
100
}
100
absl::optional<Router::ConfigConstSharedPtr> ConnectionManagerImpl::ActiveStream::routeConfig() {
100
  if (connection_manager_.config_->routeConfigProvider() != nullptr) {
48
    return {connection_manager_.config_->routeConfigProvider()->configCast()};
48
  }
52
  return {};
100
}
10015
void ConnectionManagerImpl::ActiveStream::onLocalReply(Code code) {
  // The BadRequest error code indicates there has been a messaging error.
10015
  if (code == Http::Code::BadRequest && connection_manager_.codec_->protocol() < Protocol::Http2 &&
10015
      !response_encoder_->streamErrorOnInvalidHttpMessage()) {
1374
    filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
1374
  }
10015
}
129
void ConnectionManagerImpl::ActiveStream::encode1xxHeaders(ResponseHeaderMap& response_headers) {
129
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
  // Strip the T-E headers etc. Defer other header additions as well as drain-close logic to the
  // continuation headers.
129
  ConnectionManagerUtility::mutateResponseHeaders(
129
      response_headers, request_headers_.get(), *connection_manager_.config_, EMPTY_STRING,
129
      filter_manager_.streamInfo(), connection_manager_.proxy_name_,
129
      connection_manager_.clear_hop_by_hop_response_headers_);
  // Count both the 1xx and follow-up response code in stats.
129
  chargeStats(response_headers);
129
  ENVOY_STREAM_LOG(debug, "encoding 1xx continue headers via codec:\n{}", *this, response_headers);
  // Now actually encode via the codec.
129
  response_encoder_->encode1xxHeaders(response_headers);
129
}
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& headers,
47278
                                                        bool end_stream) {
47278
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
  // Base headers.
  // We want to preserve the original date header, but we add a date header if it is absent
47278
  if (!headers.Date()) {
46921
    connection_manager_.config_->dateProvider().setDateHeader(headers);
46921
  }
  // Following setReference() is safe because serverName() is constant for the life of the
  // listener.
47278
  const auto transformation = connection_manager_.config_->serverHeaderTransformation();
47278
  if (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::OVERWRITE ||
47278
      (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::APPEND_IF_ABSENT &&
47275
       headers.Server() == nullptr)) {
47275
    headers.setReferenceServer(connection_manager_.config_->serverName());
47275
  }
47278
  ConnectionManagerUtility::mutateResponseHeaders(
47278
      headers, request_headers_.get(), *connection_manager_.config_,
47278
      connection_manager_.config_->via(), filter_manager_.streamInfo(),
47278
      connection_manager_.proxy_name_, connection_manager_.clear_hop_by_hop_response_headers_);
47278
  bool drain_connection_due_to_overload = false;
47278
  if (connection_manager_.drain_state_ == DrainState::NotDraining &&
47278
      connection_manager_.random_generator_.bernoulli(
47212
          connection_manager_.overload_disable_keepalive_ref_.value())) {
6
    ENVOY_STREAM_LOG(debug, "disabling keepalive due to envoy overload", *this);
6
    drain_connection_due_to_overload = true;
6
    connection_manager_.stats_.named_.downstream_cx_overload_disable_keepalive_.inc();
6
  }
  // If we're an inbound listener, then we should drain even if the drain direction is inbound only.
  // If not, then we only drain if the drain direction is all.
47278
  Network::DrainDirection drain_scope =
47278
      connection_manager_.direction_ == envoy::config::core::v3::TrafficDirection::INBOUND
47278
          ? Network::DrainDirection::InboundOnly
47278
          : Network::DrainDirection::All;
  // See if we want to drain/close the connection. Send the go away frame prior to encoding the
  // header block. Only drain if the drain direction is not inbound only or the connection is
  // inbound.
47278
  if (connection_manager_.drain_state_ == DrainState::NotDraining &&
47278
      (connection_manager_.drain_close_.drainClose(drain_scope) ||
47212
       drain_connection_due_to_overload)) {
    // This doesn't really do anything for HTTP/1.1 other then give the connection another boost
    // of time to race with incoming requests. For HTTP/2 connections, send a GOAWAY frame to
    // prevent any new streams.
69
    connection_manager_.startDrainSequence();
69
    connection_manager_.stats_.named_.downstream_cx_drain_close_.inc();
69
    ENVOY_STREAM_LOG(debug, "drain closing connection", *this);
69
  }
47278
  if (connection_manager_.codec_->protocol() == Protocol::Http10) {
    // As HTTP/1.0 and below can not do chunked encoding, if there is no content
    // length the response will be framed by connection close.
18
    if (!headers.ContentLength()) {
10
      filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
10
    }
    // If the request came with a keep-alive and no other factor resulted in a
    // connection close header, send an explicit keep-alive header.
18
    if (!filter_manager_.streamInfo().shouldDrainConnectionUponCompletion()) {
8
      headers.setConnection(Headers::get().ConnectionValues.KeepAlive);
8
    }
18
  }
47278
  if (connection_manager_.drain_state_ == DrainState::NotDraining &&
47278
      filter_manager_.streamInfo().shouldDrainConnectionUponCompletion()) {
1414
    ENVOY_STREAM_LOG(debug, "closing connection due to connection close header", *this);
    // For HTTP/2 and HTTP/3, send GOAWAY and allow current stream to complete.
    // For HTTP/1.1, go directly to Closing (Connection: close header will be added below).
1414
    if (connection_manager_.codec_->protocol() >= Protocol::Http2) {
8
      connection_manager_.startDrainSequence();
1406
    } else {
1406
      connection_manager_.drain_state_ = DrainState::Closing;
1406
    }
1414
  }
  // If we are destroying a stream before remote is complete and the connection does not support
  // multiplexing, we should disconnect since we don't want to wait around for the request to
  // finish.
47278
  if (!filter_manager_.hasLastDownstreamByteReceived()) {
2930
    if (connection_manager_.codec_->protocol() < Protocol::Http2) {
1899
      connection_manager_.drain_state_ = DrainState::Closing;
1899
    }
2930
    connection_manager_.stats_.named_.downstream_rq_response_before_rq_complete_.inc();
2930
  }
47278
  if (Utility::isUpgrade(headers) ||
47278
      HeaderUtility::isConnectResponse(request_headers_.get(), *responseHeaders())) {
341
    state_.is_tunneling_ = true;
341
  }
  // Block route cache if the response headers is received and processed. Because after this
  // point, the cached route should never be updated or refreshed.
47278
  blockRouteCache();
47278
  if (connection_manager_.drain_state_ != DrainState::NotDraining &&
47278
      connection_manager_.codec_->protocol() < Protocol::Http2) {
    // If the connection manager is draining send "Connection: Close" on HTTP/1.1 connections.
    // Do not do this for H2 (which drains via GOAWAY) or Upgrade or CONNECT (as the
    // payload is no longer HTTP/1.1)
2029
    if (!state_.is_tunneling_) {
1960
      headers.setReferenceConnection(Headers::get().ConnectionValues.Close);
1960
    }
2029
  }
47278
  if (active_span_ != nullptr) {
52
    ASSERT(connection_manager_tracing_config_.has_value());
52
    if (!operationNameFormatter(*connection_manager_tracing_config_, route_tracing_)) {
      // Only apply decorator if there is no operation name formatter configured at either
      // the HCM level or the route level.
49
      setResponseDecorator(headers);
49
    }
52
  }
47278
  chargeStats(headers);
47278
  if (state_.is_tunneling_ &&
47278
      connection_manager_.config_->flushAccessLogOnTunnelSuccessfullyEstablished()) {
5
    log(AccessLog::AccessLogType::DownstreamTunnelSuccessfullyEstablished);
5
  }
47278
  ENVOY_STREAM_LOG(debug, "encoding headers via codec (end_stream={}):\n{}", *this, end_stream,
47278
                   headers);
47278
  filter_manager_.streamInfo().downstreamTiming().onFirstDownstreamTxByteSent(
47278
      connection_manager_.time_source_);
47278
  if (header_validator_) {
9
    auto result = header_validator_->transformResponseHeaders(headers);
9
    if (!result.status.ok()) {
      // It is possible that the header map is invalid if an encoder filter makes invalid
      // modifications
      // TODO(yanavlasov): add handling for this case.
9
    } else if (result.new_headers) {
      response_encoder_->encodeHeaders(*result.new_headers, end_stream);
      return;
    }
9
  }
  // Now actually encode via the codec.
47278
  response_encoder_->encodeHeaders(headers, end_stream);
47278
}
71095
void ConnectionManagerImpl::ActiveStream::encodeData(Buffer::Instance& data, bool end_stream) {
71095
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
71095
  ENVOY_STREAM_LOG(trace, "encoding data via codec (size={} end_stream={})", *this, data.length(),
71095
                   end_stream);
71095
  filter_manager_.streamInfo().addBytesSent(data.length());
71095
  response_encoder_->encodeData(data, end_stream);
71095
}
448
void ConnectionManagerImpl::ActiveStream::encodeTrailers(ResponseTrailerMap& trailers) {
448
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
448
  ENVOY_STREAM_LOG(debug, "encoding trailers via codec:\n{}", *this, trailers);
448
  response_encoder_->encodeTrailers(trailers);
448
}
1589
void ConnectionManagerImpl::ActiveStream::encodeMetadata(MetadataMapPtr&& metadata) {
1589
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
1589
  MetadataMapVector metadata_map_vector;
1589
  metadata_map_vector.emplace_back(std::move(metadata));
1589
  ENVOY_STREAM_LOG(debug, "encoding metadata via codec:\n{}", *this, metadata_map_vector);
1589
  response_encoder_->encodeMetadata(metadata_map_vector);
1589
}
218785
void ConnectionManagerImpl::ActiveStream::onDecoderFilterBelowWriteBufferLowWatermark() {
218785
  ENVOY_STREAM_LOG(debug, "Read-enabling downstream stream due to filter callbacks.", *this);
  // If the state is destroyed, the codec's stream is already torn down. On
  // teardown the codec will unwind any remaining read disable calls.
218785
  if (!filter_manager_.destroyed()) {
218783
    response_encoder_->getStream().readDisable(false);
218783
  }
218785
  connection_manager_.stats_.named_.downstream_flow_control_resumed_reading_total_.inc();
218785
}
218787
void ConnectionManagerImpl::ActiveStream::onDecoderFilterAboveWriteBufferHighWatermark() {
218787
  ENVOY_STREAM_LOG(debug, "Read-disabling downstream stream due to filter callbacks.", *this);
218787
  response_encoder_->getStream().readDisable(true);
218787
  connection_manager_.stats_.named_.downstream_flow_control_paused_reading_total_.inc();
218787
}
void ConnectionManagerImpl::ActiveStream::onResetStream(StreamResetReason reset_reason,
49612
                                                        absl::string_view) {
  // NOTE: This function gets called in all of the following cases:
  //       1) We TX an app level reset
  //       2) The codec TX a codec level reset
  //       3) The codec RX a reset
  //       4) The overload manager reset the stream
  //       If we need to differentiate we need to do it inside the codec. Can start with this.
49612
  const absl::string_view encoder_details = response_encoder_->getStream().responseDetails();
49612
  ENVOY_STREAM_LOG(debug, "stream reset: reset reason: {}, response details: {}", *this,
49612
                   Http::Utility::resetReasonToString(reset_reason),
49612
                   encoder_details.empty() ? absl::string_view{"-"} : encoder_details);
49612
  connection_manager_.stats_.named_.downstream_rq_rx_reset_.inc();
49612
  state_.on_reset_stream_called_ = true;
  // If the codec sets its responseDetails() for a reason other than peer reset, set a
  // DownstreamProtocolError. Either way, propagate details.
49612
  if (reset_reason == StreamResetReason::ProtocolError ||
49612
      (!encoder_details.empty() && reset_reason == StreamResetReason::LocalReset)) {
2283
    filter_manager_.streamInfo().setResponseFlag(
2283
        StreamInfo::CoreResponseFlag::DownstreamProtocolError);
2283
  }
49612
  if (!encoder_details.empty()) {
3632
    if (reset_reason == StreamResetReason::ConnectError ||
3632
        reset_reason == StreamResetReason::RemoteReset ||
3632
        reset_reason == StreamResetReason::RemoteRefusedStreamReset) {
1058
      filter_manager_.streamInfo().setResponseFlag(
1058
          StreamInfo::CoreResponseFlag::DownstreamRemoteReset);
1058
    }
3632
    filter_manager_.streamInfo().setResponseCodeDetails(encoder_details);
3632
  }
  // Check if we're in the overload manager reset case.
  // encoder_details should be empty in this case as we don't have a codec error.
49612
  if (encoder_details.empty() && reset_reason == StreamResetReason::OverloadManager) {
12
    filter_manager_.streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::OverloadManager);
12
    filter_manager_.streamInfo().setResponseCodeDetails(
12
        StreamInfo::ResponseCodeDetails::get().Overload);
12
  }
49612
  filter_manager_.onDownstreamReset();
49612
  connection_manager_.doDeferredStreamDestroy(*this);
49612
}
16994
void ConnectionManagerImpl::ActiveStream::onAboveWriteBufferHighWatermark() {
16994
  ENVOY_STREAM_LOG(debug, "Disabling upstream stream due to downstream stream watermark.", *this);
16994
  filter_manager_.callHighWatermarkCallbacks();
16994
}
16852
void ConnectionManagerImpl::ActiveStream::onBelowWriteBufferLowWatermark() {
16852
  ENVOY_STREAM_LOG(debug, "Enabling upstream stream due to downstream stream watermark.", *this);
16852
  filter_manager_.callLowWatermarkCallbacks();
16852
}
49924
bool ConnectionManagerImpl::ActiveStream::shouldSkipDeferredCloseDelay() const {
  // If HTTP/1.0 has no content length, it is framed by close and won't consider
  // the request complete until the FIN is read. Don't delay close in this case.
49924
  const bool http_10_sans_cl = (connection_manager_.codec_->protocol() == Protocol::Http10) &&
49924
                               (!response_headers_ || !response_headers_->ContentLength());
  // We also don't delay-close in the case of HTTP/1.1 where the request is
  // fully read, as there's no race condition to avoid.
49924
  const bool connection_close = filter_manager_.streamInfo().shouldDrainConnectionUponCompletion();
49924
  const bool request_complete = filter_manager_.hasLastDownstreamByteReceived();
  // Don't do delay close for HTTP/1.0 or if the request is complete.
49924
  return connection_close && (request_complete || http_10_sans_cl);
49924
}
46147
void ConnectionManagerImpl::ActiveStream::onCodecEncodeComplete() {
46147
  ASSERT(!state_.codec_encode_complete_);
46147
  ENVOY_STREAM_LOG(debug, "Codec completed encoding stream.", *this);
46147
  state_.codec_encode_complete_ = true;
  // Update timing
46147
  filter_manager_.streamInfo().downstreamTiming().onLastDownstreamTxByteSent(
46147
      connection_manager_.time_source_);
46147
  request_response_timespan_->complete();
  // Only reap stream once.
46147
  if (state_.is_zombie_stream_) {
2776
    const bool skip_delay = shouldSkipDeferredCloseDelay();
2776
    connection_manager_.doDeferredStreamDestroy(*this);
    // After destroying a zombie stream, check if the connection should be
    // closed. doEndStream() call that created the zombie may have set
    // drain_state_ to Closing, but checkForDeferredClose() couldn't close the
    // connection at that time because streams_ wasn't empty yet.
2776
    if (connection_manager_.close_connection_on_zombie_stream_complete_) {
2775
      connection_manager_.checkForDeferredClose(skip_delay);
2775
    }
2776
  }
46147
}
446
void ConnectionManagerImpl::ActiveStream::onCodecLowLevelReset() {
446
  ASSERT(!state_.codec_encode_complete_);
446
  state_.on_reset_stream_called_ = true;
446
  ENVOY_STREAM_LOG(debug, "Codec low level reset", *this);
  // TODO(kbaichoo): Update streamInfo to account for the reset.
  // Only reap stream once.
446
  if (state_.is_zombie_stream_) {
180
    const bool skip_delay = shouldSkipDeferredCloseDelay();
180
    connection_manager_.doDeferredStreamDestroy(*this);
    // After destroying a zombie stream, check if the connection should be
    // closed. doEndStream() call that created the zombie may have set
    // drain_state_ to Closing, but checkForDeferredClose() couldn't close the
    // connection at that time because streams_ wasn't empty yet.
180
    if (connection_manager_.close_connection_on_zombie_stream_complete_) {
180
      connection_manager_.checkForDeferredClose(skip_delay);
180
    }
180
  }
446
}
107
Tracing::OperationName ConnectionManagerImpl::ActiveStream::operationName() const {
107
  ASSERT(connection_manager_tracing_config_.has_value());
107
  return connection_manager_tracing_config_->operation_name_;
107
}
void ConnectionManagerImpl::ActiveStream::modifySpan(Tracing::Span& span,
69
                                                     bool upstream_span) const {
69
  ASSERT(connection_manager_tracing_config_.has_value());
69
  const Tracing::HttpTraceContext trace_context(*request_headers_);
69
  const Formatter::Context formatter_context{
69
      request_headers_.get(), response_headers_.get(), response_trailers_.get(), {}, {},
69
      active_span_.get()};
69
  const Tracing::CustomTagContext ctx{trace_context, filter_manager_.streamInfo(),
69
                                      formatter_context};
  // Cache the optional custom tags from the route first.
69
  OptRef<const Tracing::CustomTagMap> route_custom_tags;
69
  if (route_tracing_ != nullptr) {
7
    route_custom_tags.emplace(route_tracing_->getCustomTags());
12
    for (const auto& tag : *route_custom_tags) {
8
      tag.second->applySpan(span, ctx);
8
    }
7
  }
77
  for (const auto& tag : connection_manager_tracing_config_->custom_tags_) {
24
    if (!route_custom_tags.has_value() || !route_custom_tags->contains(tag.first)) {
      // If the tag is defined in both the connection manager and the route,
      // use the route's tag.
20
      tag.second->applySpan(span, ctx);
20
    }
24
  }
  // For same stream, there is only one downstream span. It's the active span.
  // So we can determine whether the span is downstream span by comparing the
  // span pointer.
69
  const Formatter::Formatter* operation =
69
      upstream_span
69
          ? upstreamOperationNameFormatter(*connection_manager_tracing_config_, route_tracing_)
69
          : operationNameFormatter(*connection_manager_tracing_config_, route_tracing_);
69
  if (operation != nullptr) {
6
    const auto op = operation->format(formatter_context, filter_manager_.streamInfo());
6
    if (!op.empty()) {
6
      span.setOperation(op);
6
    }
6
  }
69
}
66
bool ConnectionManagerImpl::ActiveStream::verbose() const {
66
  ASSERT(connection_manager_tracing_config_.has_value());
66
  return connection_manager_tracing_config_->verbose_;
66
}
52
uint32_t ConnectionManagerImpl::ActiveStream::maxPathTagLength() const {
52
  ASSERT(connection_manager_tracing_config_.has_value());
52
  return connection_manager_tracing_config_->max_path_tag_length_;
52
}
68
bool ConnectionManagerImpl::ActiveStream::spawnUpstreamSpan() const {
68
  ASSERT(connection_manager_tracing_config_.has_value());
68
  return connection_manager_tracing_config_->spawn_upstream_span_;
68
}
35
bool ConnectionManagerImpl::ActiveStream::noContextPropagation() const {
35
  ASSERT(connection_manager_tracing_config_.has_value());
35
  return connection_manager_tracing_config_->no_context_propagation_;
35
}
652
const Router::RouteEntry::UpgradeMap* ConnectionManagerImpl::ActiveStream::upgradeMap() {
  // We must check if the 'cached_route_' optional is populated since this function can be called
  // early via sendLocalReply(), before the cached route is populated.
652
  if (hasCachedRoute() && cached_route_.value()->routeEntry()) {
597
    return &cached_route_.value()->routeEntry()->upgradeMap();
597
  }
55
  return nullptr;
652
}
131384
Tracing::Span& ConnectionManagerImpl::ActiveStream::activeSpan() {
131384
  if (active_span_) {
128
    return *active_span_;
131355
  } else {
131256
    return Tracing::NullSpan::instance();
131256
  }
131384
}
43805
OptRef<const Tracing::Config> ConnectionManagerImpl::ActiveStream::tracingConfig() const {
43805
  if (connection_manager_tracing_config_.has_value()) {
52
    return makeOptRef<const Tracing::Config>(*this);
52
  }
43753
  return {};
43805
}
165394
const ScopeTrackedObject& ConnectionManagerImpl::ActiveStream::scope() { return *this; }
671
Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStream::clusterInfo() {
  // NOTE: Refreshing route caches clusterInfo as well.
671
  if (!cached_route_.has_value()) {
16
    refreshCachedRoute();
16
  }
671
  return cached_cluster_info_.value();
671
}
Router::RouteConstSharedPtr
151579
ConnectionManagerImpl::ActiveStream::route(const Router::RouteCallback& cb) {
151579
  if (cached_route_.has_value()) {
148521
    return cached_route_.value();
148521
  }
3058
  refreshCachedRoute(cb);
3058
  return cached_route_.value();
151579
}
17
void ConnectionManagerImpl::ActiveStream::setRoute(Router::RouteConstSharedPtr route) {
17
  Router::VirtualHostRoute vhost_route;
17
  if (route != nullptr) {
15
    vhost_route.vhost = route->virtualHost();
15
    vhost_route.route = std::move(route);
15
  }
17
  setVirtualHostRoute(std::move(vhost_route));
17
}
/**
 * Sets the cached route to the RouteConstSharedPtr argument passed in. Handles setting the
 * cached_route_/cached_cluster_info_ ActiveStream attributes, the FilterManager streamInfo, tracing
 * tags, and timeouts.
 *
 * Declared as a StreamFilterCallbacks member function for filters to call directly, but also
 * functions as a helper to refreshCachedRoute(const Router::RouteCallback& cb).
 */
void ConnectionManagerImpl::ActiveStream::setVirtualHostRoute(
93495
    Router::VirtualHostRoute vhost_route) {
  // If the cached route is blocked then any attempt to clear it or refresh it
  // will be ignored.
93495
  if (routeCacheBlocked()) {
1
    return;
1
  }
93494
  Router::RouteConstSharedPtr route = std::move(vhost_route.route);
  // Update the cached route.
93494
  setCachedRoute({route});
  // Update the cached cluster info based on the new route.
93494
  if (nullptr == route || nullptr == route->routeEntry()) {
5505
    cached_cluster_info_ = nullptr;
91121
  } else {
87989
    auto* cluster = connection_manager_.cluster_manager_.getThreadLocalCluster(
87989
        route->routeEntry()->clusterName());
87989
    cached_cluster_info_ = (nullptr == cluster) ? nullptr : cluster->info();
87989
  }
  // Update route, vhost and cluster info in the filter manager's stream info.
  // Now can move route here safely.
93494
  filter_manager_.streamInfo().route_ = std::move(route);
93494
  filter_manager_.streamInfo().vhost_ = std::move(vhost_route.vhost);
93494
  filter_manager_.streamInfo().setUpstreamClusterInfo(cached_cluster_info_.value());
93494
  refreshTracing();
93494
  refreshDurationTimeout();
93494
  refreshIdleAndFlushTimeouts();
93494
  refreshBufferLimit();
93494
}
93494
void ConnectionManagerImpl::ActiveStream::refreshIdleAndFlushTimeouts() {
93494
  if (!hasCachedRoute()) {
5229
    return;
5229
  }
88265
  const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
88265
  if (route_entry == nullptr) {
276
    return;
276
  }
87989
  if (route_entry->idleTimeout().has_value()) {
16193
    idle_timeout_ms_ = route_entry->idleTimeout().value();
16193
    if (idle_timeout_ms_.count()) {
      // If we have a route-level idle timeout but no global stream idle timeout, create a timer.
16192
      if (stream_idle_timer_ == nullptr) {
5
        stream_idle_timer_ = connection_manager_.dispatcher_->createScaledTimer(
5
            Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
5
            [this]() -> void { onIdleTimeout(); });
5
      }
16192
    } else if (stream_idle_timer_ != nullptr) {
      // If we had a global stream idle timeout but the route-level idle timeout is set to zero
      // (to override), we disable the idle timer.
1
      stream_idle_timer_->disableTimer();
1
      stream_idle_timer_ = nullptr;
1
    }
16193
  }
87989
  if (route_entry->flushTimeout().has_value()) {
4
    response_encoder_->getStream().setFlushTimeout(route_entry->flushTimeout().value());
87985
  } else if (!has_explicit_global_flush_timeout_ && route_entry->idleTimeout().has_value()) {
    // If there is no route-level flush timeout, and the global flush timeout was also inherited
    // from the idle timeout, also inherit the route-level idle timeout. This is for backwards
    // compatibility.
8
    response_encoder_->getStream().setFlushTimeout(idle_timeout_ms_);
8
  }
87989
}
21
void ConnectionManagerImpl::ActiveStream::refreshAccessLogFlushTimer() {
21
  if (connection_manager_.config_->accessLogFlushInterval().has_value()) {
21
    access_log_flush_timer_->enableTimer(
21
        connection_manager_.config_->accessLogFlushInterval().value(), this);
21
  }
21
}
93494
void ConnectionManagerImpl::ActiveStream::refreshBufferLimit() {
93494
  if (!hasCachedRoute()) {
5229
    return;
5229
  }
88265
  const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
88265
  if (route_entry == nullptr) {
276
    return;
276
  }
87989
  const uint64_t buffer_limit = route_entry->requestBodyBufferLimit();
87989
  if (buffer_limit == std::numeric_limits<uint64_t>::max()) {
    // Max uint64_t means no valid limit configured.
87938
    return;
87938
  }
  // Only increase the buffer limit automatically. This is to ensure same
  // behavior as previous logic in router filter.
51
  if (buffer_limit > filter_manager_.bufferLimit()) {
5
    ENVOY_STREAM_LOG(debug, "Setting new filter manager buffer limit: {}", *this, buffer_limit);
5
    filter_manager_.setBufferLimit(buffer_limit);
5
  }
51
}
832
void ConnectionManagerImpl::ActiveStream::clearRouteCache() {
  // If the cached route is blocked then any attempt to clear it or refresh it
  // will be ignored.
832
  if (routeCacheBlocked()) {
1
    return;
1
  }
831
  setCachedRoute({});
831
  cached_cluster_info_ = absl::optional<Upstream::ClusterInfoConstSharedPtr>();
831
}
4
void ConnectionManagerImpl::ActiveStream::refreshRouteCluster() {
  // If there is no cached route, or route cache is frozen, or the request headers are not
  // available, then do not refresh the route cluster.
4
  if (!hasCachedRoute() || routeCacheBlocked() || request_headers_ == nullptr) {
2
    return;
2
  }
2
  if (const auto* entry = (*cached_route_)->routeEntry(); entry != nullptr) {
    // Refresh the cluster if possible.
2
    entry->refreshRouteCluster(*request_headers_, filter_manager_.streamInfo());
    // Refresh the cached cluster info is necessary.
2
    if (!cached_cluster_info_.has_value() || cached_cluster_info_.value() == nullptr ||
2
        (*cached_cluster_info_)->name() != entry->clusterName()) {
2
      auto* cluster =
2
          connection_manager_.cluster_manager_.getThreadLocalCluster(entry->clusterName());
2
      cached_cluster_info_ = (nullptr == cluster) ? nullptr : cluster->info();
2
      filter_manager_.streamInfo().setUpstreamClusterInfo(cached_cluster_info_.value());
2
    }
2
  }
2
}
void ConnectionManagerImpl::ActiveStream::setCachedRoute(
94325
    absl::optional<Router::RouteConstSharedPtr>&& route) {
94325
  if (hasCachedRoute()) {
    // The configuration of the route may be referenced by some filters.
    // Cache the route to avoid it being destroyed before the stream is destroyed.
587
    cleared_cached_routes_.emplace_back(std::move(cached_route_.value()));
587
  }
94325
  cached_route_ = std::move(route);
94325
}
47278
void ConnectionManagerImpl::ActiveStream::blockRouteCache() {
47278
  route_cache_blocked_ = true;
  // Clear the snapped route configuration because it is unnecessary to keep it.
47278
  snapped_route_config_.reset();
47278
  snapped_scoped_routes_config_.reset();
47278
}
85
void ConnectionManagerImpl::ActiveStream::onRequestDataTooLarge() {
85
  connection_manager_.stats_.named_.downstream_rq_too_large_.inc();
85
}
void ConnectionManagerImpl::ActiveStream::recreateStream(
314
    StreamInfo::FilterStateSharedPtr filter_state) {
314
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
314
  ResponseEncoder* response_encoder = response_encoder_;
314
  response_encoder_ = nullptr;
314
  Buffer::InstancePtr request_data = std::make_unique<Buffer::OwnedImpl>();
314
  const auto& buffered_request_data = filter_manager_.bufferedRequestData();
314
  const bool proxy_body = buffered_request_data != nullptr && buffered_request_data->length() > 0;
314
  if (proxy_body) {
36
    request_data->move(*buffered_request_data);
36
  }
314
  response_encoder->getStream().removeCallbacks(*this);
  // This functionally deletes the stream (via deferred delete) so do not
  // reference anything beyond this point.
  // Make sure to not check for deferred close as we'll be immediately creating a new stream.
314
  state_.is_internally_destroyed_ = true;
314
  connection_manager_.doEndStream(*this, /*check_for_deferred_close*/ false);
314
  RequestDecoder& new_stream = connection_manager_.newStream(*response_encoder, true);
  // Set the new RequestDecoder on the ResponseEncoder. Even though all of the decoder callbacks
  // have already been called at this point, the encoder still needs the new decoder for deferred
  // logging in some cases.
  // This doesn't currently work for HTTP/1 as the H/1 ResponseEncoder doesn't hold the active
  // stream's pointer to the RequestDecoder.
314
  response_encoder->setRequestDecoder(new_stream);
  // We don't need to copy over the old parent FilterState from the old StreamInfo if it did not
  // store any objects with a LifeSpan at or above DownstreamRequest. This is to avoid unnecessary
  // heap allocation.
  // TODO(snowp): In the case where connection level filter state has been set on the connection
  // FilterState that we inherit, we'll end up copying this every time even though we could get
  // away with just resetting it to the HCM filter_state_.
314
  if (filter_state->hasDataAtOrAboveLifeSpan(StreamInfo::FilterState::LifeSpan::Request)) {
314
    (*connection_manager_.streams_.begin())->filter_manager_.streamInfo().filter_state_ =
314
        std::make_shared<StreamInfo::FilterStateImpl>(
314
            filter_state->parent(), StreamInfo::FilterState::LifeSpan::FilterChain);
314
  }
  // Make sure that relevant information makes it from the original stream info
  // to the new one. Generally this should consist of all downstream related
  // data, and not include upstream related data.
314
  (*connection_manager_.streams_.begin())
314
      ->filter_manager_.streamInfo()
314
      .setFromForRecreateStream(filter_manager_.streamInfo());
314
  new_stream.decodeHeaders(std::move(request_headers_), !proxy_body);
314
  if (proxy_body) {
    // This functionality is currently only used for internal redirects, which the router only
    // allows if the full request has been read (end_stream = true) so we don't need to handle the
    // case of upstream sending an early response mid-request.
36
    new_stream.decodeData(*request_data, true);
36
  }
314
}
1
Http1StreamEncoderOptionsOptRef ConnectionManagerImpl::ActiveStream::http1StreamEncoderOptions() {
1
  return response_encoder_->http1StreamEncoderOptions();
1
}
76
void ConnectionManagerImpl::ActiveStream::onResponseDataTooLarge() {
76
  connection_manager_.stats_.named_.rs_too_large_.inc();
76
}
313
void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, absl::string_view) {
313
  connection_manager_.stats_.named_.downstream_rq_tx_reset_.inc();
313
  connection_manager_.doEndStream(*this);
313
}
5735
bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
  // TODO(yanavlasov): Merge this with the filter manager continueIteration() method
5735
  if (!state_.deferred_to_next_io_iteration_) {
5381
    return false;
5381
  }
354
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
354
  state_.deferred_to_next_io_iteration_ = false;
354
  bool end_stream = state_.deferred_end_stream_ && deferred_data_ == nullptr &&
354
                    deferred_request_trailers_ == nullptr && deferred_metadata_.empty();
354
  filter_manager_.decodeHeaders(*request_headers_, end_stream);
354
  if (end_stream) {
60
    return true;
60
  }
  // Send metadata before data, as data may have an associated end_stream.
352
  while (!deferred_metadata_.empty()) {
58
    MetadataMapPtr& metadata = deferred_metadata_.front();
58
    filter_manager_.decodeMetadata(*metadata);
58
    deferred_metadata_.pop();
58
  }
  // Filter manager will return early from decodeData and decodeTrailers if
  // request has completed.
294
  if (deferred_data_ != nullptr) {
274
    end_stream = state_.deferred_end_stream_ && deferred_request_trailers_ == nullptr;
274
    filter_manager_.decodeData(*deferred_data_, end_stream);
274
  }
294
  if (deferred_request_trailers_ != nullptr) {
144
    request_trailers_ = std::move(deferred_request_trailers_);
144
    filter_manager_.decodeTrailers(*request_trailers_);
144
  }
294
  return true;
354
}
90716
bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
  // Do not defer this stream if stream deferral is disabled
90716
  if (deferred_request_processing_callback_ == nullptr) {
49898
    return false;
49898
  }
  // Defer this stream if there are already deferred streams, so they are not
  // processed out of order
40818
  if (deferred_request_processing_callback_->enabled()) {
40406
    return true;
40406
  }
412
  ++requests_during_dispatch_count_;
412
  bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_;
412
  if (defer) {
328
    deferred_request_processing_callback_->scheduleCallbackNextIteration();
328
  }
412
  return defer;
40818
}
324
void ConnectionManagerImpl::onDeferredRequestProcessing() {
324
  if (streams_.empty()) {
2
    return;
2
  }
322
  requests_during_dispatch_count_ = 1; // 1 stream is always let through
  // Streams are inserted at the head of the list. As such process deferred
  // streams in the reverse order.
322
  auto reverse_iter = std::prev(streams_.end());
322
  bool at_first_element = false;
5735
  do {
5735
    at_first_element = reverse_iter == streams_.begin();
    // Move the iterator to the previous item in case the `onDeferredRequestProcessing` call removes
    // the stream from the list.
5735
    auto previous_element = std::prev(reverse_iter);
5735
    bool was_deferred = (*reverse_iter)->onDeferredRequestProcessing();
5735
    if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
306
      break;
306
    }
5429
    reverse_iter = previous_element;
    // TODO(yanavlasov): see if `rend` can be used.
5429
  } while (!at_first_element);
322
}
} // namespace Http
} // namespace Envoy