1
#include "source/common/http/conn_manager_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <iterator>
7
#include <limits>
8
#include <list>
9
#include <memory>
10
#include <string>
11
#include <vector>
12

            
13
#include "envoy/buffer/buffer.h"
14
#include "envoy/common/time.h"
15
#include "envoy/config/core/v3/base.pb.h"
16
#include "envoy/event/dispatcher.h"
17
#include "envoy/event/scaled_range_timer_manager.h"
18
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
19
#include "envoy/http/header_map.h"
20
#include "envoy/http/header_validator_errors.h"
21
#include "envoy/network/drain_decision.h"
22
#include "envoy/router/router.h"
23
#include "envoy/ssl/connection.h"
24
#include "envoy/stats/scope.h"
25
#include "envoy/stream_info/filter_state.h"
26
#include "envoy/stream_info/stream_info.h"
27
#include "envoy/tracing/tracer.h"
28
#include "envoy/type/v3/percent.pb.h"
29

            
30
#include "source/common/buffer/buffer_impl.h"
31
#include "source/common/common/assert.h"
32
#include "source/common/common/empty_string.h"
33
#include "source/common/common/enum_to_int.h"
34
#include "source/common/common/fmt.h"
35
#include "source/common/common/perf_tracing.h"
36
#include "source/common/common/scope_tracker.h"
37
#include "source/common/common/utility.h"
38
#include "source/common/http/codes.h"
39
#include "source/common/http/conn_manager_utility.h"
40
#include "source/common/http/exception.h"
41
#include "source/common/http/header_map_impl.h"
42
#include "source/common/http/header_utility.h"
43
#include "source/common/http/headers.h"
44
#include "source/common/http/http1/codec_impl.h"
45
#include "source/common/http/http2/codec_impl.h"
46
#include "source/common/http/path_utility.h"
47
#include "source/common/http/status.h"
48
#include "source/common/http/utility.h"
49
#include "source/common/network/utility.h"
50
#include "source/common/router/config_impl.h"
51
#include "source/common/runtime/runtime_features.h"
52
#include "source/common/stats/timespan_impl.h"
53
#include "source/common/stream_info/utility.h"
54

            
55
#include "absl/strings/escaping.h"
56
#include "absl/strings/match.h"
57
#include "absl/strings/str_cat.h"
58

            
59
namespace Envoy {
60
namespace Http {
61

            
62
const absl::string_view ConnectionManagerImpl::PrematureResetTotalStreamCountKey =
63
    "overload.premature_reset_total_stream_count";
64
const absl::string_view ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey =
65
    "overload.premature_reset_min_stream_lifetime_seconds";
66
// Runtime key for maximum number of requests that can be processed from a single connection per
67
// I/O cycle. Requests over this limit are deferred until the next I/O cycle.
68
const absl::string_view ConnectionManagerImpl::MaxRequestsPerIoCycle =
69
    "http.max_requests_per_io_cycle";
70
// Don't attempt to intelligently delay close: https://github.com/envoyproxy/envoy/issues/30010
71
const absl::string_view ConnectionManagerImpl::OptionallyDelayClose =
72
    "http1.optionally_delay_close";
73

            
74
2805
bool requestWasConnect(const RequestHeaderMapSharedPtr& headers, Protocol protocol) {
75
2805
  if (!headers) {
76
1349
    return false;
77
1349
  }
78
1456
  if (protocol <= Protocol::Http11) {
79
547
    return HeaderUtility::isConnect(*headers);
80
547
  }
81
  // All HTTP/2 style upgrades were originally connect requests.
82
909
  return HeaderUtility::isConnect(*headers) || Utility::isUpgrade(*headers);
83
1456
}
84

            
85
const Formatter::Formatter*
86
operationNameFormatter(const Http::TracingConnectionManagerConfig& hcm_config,
87
158
                       const Router::RouteTracing* route_config) {
88
158
  const Formatter::Formatter* formatter =
89
158
      route_config != nullptr ? route_config->operation().ptr() : nullptr;
90
158
  return formatter != nullptr ? formatter : hcm_config.operation_.get();
91
158
}
92
const Formatter::Formatter*
93
upstreamOperationNameFormatter(const Http::TracingConnectionManagerConfig& hcm_config,
94
17
                               const Router::RouteTracing* route_config) {
95
17
  const Formatter::Formatter* formatter =
96
17
      route_config != nullptr ? route_config->upstreamOperation().ptr() : nullptr;
97
17
  return formatter != nullptr ? formatter : hcm_config.upstream_operation_.get();
98
17
}
99

            
100
ConnectionManagerStats ConnectionManagerImpl::generateStats(const std::string& prefix,
101
20710
                                                            Stats::Scope& scope) {
102
20710
  return ConnectionManagerStats(
103
20710
      {ALL_HTTP_CONN_MAN_STATS(POOL_COUNTER_PREFIX(scope, prefix), POOL_GAUGE_PREFIX(scope, prefix),
104
20710
                               POOL_HISTOGRAM_PREFIX(scope, prefix))},
105
20710
      prefix, scope);
106
20710
}
107

            
108
ConnectionManagerTracingStats ConnectionManagerImpl::generateTracingStats(const std::string& prefix,
109
20710
                                                                          Stats::Scope& scope) {
110
20710
  return {CONN_MAN_TRACING_STATS(POOL_COUNTER_PREFIX(scope, prefix + "tracing."))};
111
20710
}
112

            
113
ConnectionManagerListenerStats
114
20672
ConnectionManagerImpl::generateListenerStats(const std::string& prefix, Stats::Scope& scope) {
115
20672
  return {CONN_MAN_LISTENER_STATS(POOL_COUNTER_PREFIX(scope, prefix))};
116
20672
}
117

            
118
ConnectionManagerImpl::ConnectionManagerImpl(
119
    ConnectionManagerConfigSharedPtr config, const Network::DrainDecision& drain_close,
120
    Random::RandomGenerator& random_generator, Http::Context& http_context,
121
    Runtime::Loader& runtime, const LocalInfo::LocalInfo& local_info,
122
    Upstream::ClusterManager& cluster_manager, Server::OverloadManager& overload_manager,
123
    TimeSource& time_source, envoy::config::core::v3::TrafficDirection direction)
124
22431
    : config_(std::move(config)), stats_(config_->stats()),
125
22431
      conn_length_(new Stats::HistogramCompletableTimespanImpl(
126
22431
          stats_.named_.downstream_cx_length_ms_, time_source)),
127
22431
      drain_close_(drain_close), user_agent_(http_context.userAgentContext()),
128
22431
      random_generator_(random_generator), runtime_(runtime), local_info_(local_info),
129
22431
      cluster_manager_(cluster_manager), listener_stats_(config_->listenerStats()),
130
22431
      overload_manager_(overload_manager),
131
22431
      overload_state_(overload_manager.getThreadLocalOverloadState()),
132
      accept_new_http_stream_(
133
22431
          overload_manager.getLoadShedPoint(Server::LoadShedPointName::get().HcmDecodeHeaders)),
134
      hcm_ondata_creating_codec_(
135
22431
          overload_manager.getLoadShedPoint(Server::LoadShedPointName::get().HcmCodecCreation)),
136
      overload_stop_accepting_requests_ref_(
137
22431
          overload_state_.getState(Server::OverloadActionNames::get().StopAcceptingRequests)),
138
      overload_disable_keepalive_ref_(
139
22431
          overload_state_.getState(Server::OverloadActionNames::get().DisableHttpKeepAlive)),
140
22431
      time_source_(time_source), proxy_name_(StreamInfo::ProxyStatusUtils::makeProxyName(
141
22431
                                     /*node_id=*/local_info_.node().id(),
142
22431
                                     /*server_name=*/config_->serverName(),
143
22431
                                     /*proxy_status_config=*/config_->proxyStatusConfig())),
144
      max_requests_during_dispatch_(
145
22431
          runtime_.snapshot().getInteger(ConnectionManagerImpl::MaxRequestsPerIoCycle, UINT32_MAX)),
146
22431
      direction_(direction),
147
22431
      allow_upstream_half_close_(Runtime::runtimeFeatureEnabled(
148
22431
          "envoy.reloadable_features.allow_multiplexed_upstream_half_close")),
149
22431
      close_connection_on_zombie_stream_complete_(Runtime::runtimeFeatureEnabled(
150
22431
          "envoy.reloadable_features.http1_close_connection_on_zombie_stream_complete")) {
151
22431
  ENVOY_LOG_ONCE_IF(
152
22431
      trace, accept_new_http_stream_ == nullptr,
153
22431
      "LoadShedPoint envoy.load_shed_points.http_connection_manager_decode_headers is not "
154
22431
      "found. Is it configured?");
155
22431
  ENVOY_LOG_ONCE_IF(trace, hcm_ondata_creating_codec_ == nullptr,
156
22431
                    "LoadShedPoint envoy.load_shed_points.hcm_ondata_creating_codec is not found. "
157
22431
                    "Is it configured?");
158
22431
}
159

            
160
80
const ResponseHeaderMap& ConnectionManagerImpl::continueHeader() {
161
80
  static const auto headers = createHeaderMap<ResponseHeaderMapImpl>(
162
80
      {{Http::Headers::get().Status, std::to_string(enumToInt(Code::Continue))}});
163
80
  return *headers;
164
80
}
165

            
166
22431
void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
167
22431
  read_callbacks_ = &callbacks;
168
22431
  dispatcher_ = &callbacks.connection().dispatcher();
169
22431
  if (max_requests_during_dispatch_ != UINT32_MAX) {
170
22
    deferred_request_processing_callback_ =
171
324
        dispatcher_->createSchedulableCallback([this]() -> void { onDeferredRequestProcessing(); });
172
22
  }
173

            
174
22431
  stats_.named_.downstream_cx_total_.inc();
175
22431
  stats_.named_.downstream_cx_active_.inc();
176
22431
  if (read_callbacks_->connection().ssl()) {
177
2202
    stats_.named_.downstream_cx_ssl_total_.inc();
178
2202
    stats_.named_.downstream_cx_ssl_active_.inc();
179
2202
  }
180

            
181
22431
  read_callbacks_->connection().addConnectionCallbacks(*this);
182

            
183
22431
  if (config_->addProxyProtocolConnectionState() &&
184
22431
      !read_callbacks_->connection()
185
22430
           .streamInfo()
186
22430
           .filterState()
187
22430
           ->hasData<Network::ProxyProtocolFilterState>(Network::ProxyProtocolFilterState::key())) {
188
22421
    read_callbacks_->connection().streamInfo().filterState()->setData(
189
22421
        Network::ProxyProtocolFilterState::key(),
190
22421
        std::make_unique<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
191
22421
            read_callbacks_->connection().connectionInfoProvider().remoteAddress(),
192
22421
            read_callbacks_->connection().connectionInfoProvider().localAddress()}),
193
22421
        StreamInfo::FilterState::StateType::ReadOnly,
194
22421
        StreamInfo::FilterState::LifeSpan::Connection);
195
22421
  }
196

            
197
22431
  if (config_->idleTimeout()) {
198
21757
    connection_idle_timer_ =
199
21757
        dispatcher_->createScaledTimer(Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout,
200
21757
                                       [this]() -> void { onIdleTimeout(); });
201
21757
    connection_idle_timer_->enableTimer(config_->idleTimeout().value());
202
21757
  }
203

            
204
22431
  if (config_->maxConnectionDuration()) {
205
85
    connection_duration_timer_ =
206
85
        dispatcher_->createScaledTimer(Event::ScaledTimerType::HttpDownstreamMaxConnectionTimeout,
207
85
                                       [this]() -> void { onConnectionDurationTimeout(); });
208
85
    connection_duration_timer_->enableTimer(config_->maxConnectionDuration().value());
209
85
  }
210

            
211
22431
  read_callbacks_->connection().setDelayedCloseTimeout(config_->delayedCloseTimeout());
212

            
213
22431
  read_callbacks_->connection().setConnectionStats(
214
22431
      {stats_.named_.downstream_cx_rx_bytes_total_, stats_.named_.downstream_cx_rx_bytes_buffered_,
215
22431
       stats_.named_.downstream_cx_tx_bytes_total_, stats_.named_.downstream_cx_tx_bytes_buffered_,
216
22431
       nullptr, &stats_.named_.downstream_cx_delayed_close_timeout_});
217
22431
}
218

            
219
22431
ConnectionManagerImpl::~ConnectionManagerImpl() {
220
22431
  stats_.named_.downstream_cx_destroy_.inc();
221

            
222
22431
  stats_.named_.downstream_cx_active_.dec();
223
22431
  if (read_callbacks_->connection().ssl()) {
224
2202
    stats_.named_.downstream_cx_ssl_active_.dec();
225
2202
  }
226

            
227
22431
  if (codec_) {
228
22290
    if (codec_->protocol() == Protocol::Http2) {
229
7993
      stats_.named_.downstream_cx_http2_active_.dec();
230
18046
    } else if (codec_->protocol() == Protocol::Http3) {
231
1995
      stats_.named_.downstream_cx_http3_active_.dec();
232
13022
    } else {
233
12302
      stats_.named_.downstream_cx_http1_active_.dec();
234
12302
    }
235
22290
  }
236

            
237
22431
  if (soft_drain_http1_) {
238
9
    stats_.named_.downstream_cx_http1_soft_drain_.dec();
239
9
  }
240

            
241
22431
  conn_length_->complete();
242
22431
  user_agent_.completeConnectionLength(*conn_length_);
243
22431
}
244

            
245
117866
void ConnectionManagerImpl::checkForDeferredClose(bool skip_delay_close) {
246
117866
  Network::ConnectionCloseType close = Network::ConnectionCloseType::FlushWriteAndDelay;
247
117866
  if (runtime_.snapshot().getBoolean(ConnectionManagerImpl::OptionallyDelayClose, true) &&
248
117866
      skip_delay_close) {
249
105
    close = Network::ConnectionCloseType::FlushWrite;
250
105
  }
251
117866
  if (drain_state_ == DrainState::Closing && streams_.empty() && !codec_->wantsToWrite()) {
252
    // We are closing a draining connection with no active streams and the codec has
253
    // nothing to write.
254
2457
    doConnectionClose(close, absl::nullopt,
255
2457
                      StreamInfo::LocalCloseReasons::get().DeferredCloseOnDrainedConnection);
256
2457
  }
257
117866
}
258

            
259
47242
void ConnectionManagerImpl::doEndStream(ActiveStream& stream, bool check_for_deferred_close) {
260
  // The order of what happens in this routine is important and a little complicated. We first see
261
  // if the stream needs to be reset. If it needs to be, this will end up invoking reset callbacks
262
  // and then moving the stream to the deferred destruction list. If the stream has not been reset,
263
  // we move it to the deferred deletion list here. Then, we potentially close the connection. This
264
  // must be done after deleting the stream since the stream refers to the connection and must be
265
  // deleted first.
266
47242
  bool reset_stream = false;
267
  // If the response encoder is still associated with the stream, reset the stream. The exception
268
  // here is when Envoy "ends" the stream by calling recreateStream at which point recreateStream
269
  // explicitly nulls out response_encoder to avoid the downstream being notified of the
270
  // Envoy-internal stream instance being ended.
271
47242
  if (stream.response_encoder_ != nullptr &&
272
47242
      (!stream.filter_manager_.hasLastDownstreamByteReceived() ||
273
46946
       !stream.state_.codec_saw_local_complete_)) {
274
    // Indicate local is complete at this point so that if we reset during a continuation, we don't
275
    // raise further data or trailers.
276
2792
    ENVOY_STREAM_LOG(debug, "doEndStream() resetting stream", stream);
277
    // TODO(snowp): This call might not be necessary, try to clean up + remove setter function.
278
2792
    stream.filter_manager_.setLocalComplete();
279
2792
    stream.state_.codec_saw_local_complete_ = true;
280

            
281
    // Per https://tools.ietf.org/html/rfc7540#section-8.3 if there was an error
282
    // with the TCP connection during a CONNECT request, it should be
283
    // communicated via CONNECT_ERROR
284
2792
    if (requestWasConnect(stream.request_headers_, codec_->protocol()) &&
285
2792
        (stream.filter_manager_.streamInfo().hasResponseFlag(
286
166
             StreamInfo::CoreResponseFlag::UpstreamConnectionFailure) ||
287
166
         stream.filter_manager_.streamInfo().hasResponseFlag(
288
166
             StreamInfo::CoreResponseFlag::UpstreamConnectionTermination))) {
289
63
      stream.response_encoder_->getStream().resetStream(StreamResetReason::ConnectError);
290
2773
    } else {
291
2729
      const bool reset_with_error =
292
2729
          Runtime::runtimeFeatureEnabled("envoy.reloadable_features.reset_with_error");
293
2729
      if (stream.filter_manager_.streamInfo().hasResponseFlag(
294
2729
              StreamInfo::CoreResponseFlag::UpstreamProtocolError) &&
295
2729
          !Runtime::runtimeFeatureEnabled(
296
39
              "envoy.reloadable_features.reset_ignore_upstream_reason")) {
297
2
        stream.response_encoder_->getStream().resetStream(StreamResetReason::ProtocolError);
298
2727
      } else if (reset_with_error && stream.filter_manager_.streamInfo().hasResponseFlag(
299
2723
                                         StreamInfo::CoreResponseFlag::DownstreamProtocolError)) {
300
15
        stream.response_encoder_->getStream().resetStream(StreamResetReason::ProtocolError);
301
2712
      } else {
302
2712
        stream.response_encoder_->getStream().resetStream(StreamResetReason::LocalReset);
303
2712
      }
304
2729
    }
305
2792
    reset_stream = true;
306
2792
  }
307

            
308
47242
  if (!reset_stream) {
309
44450
    doDeferredStreamDestroy(stream);
310
44450
  }
311

            
312
47242
  if (reset_stream && codec_->protocol() < Protocol::Http2) {
313
1890
    drain_state_ = DrainState::Closing;
314
1890
  }
315

            
316
47242
  if (check_for_deferred_close) {
317
46946
    checkForDeferredClose(stream.shouldSkipDeferredCloseDelay());
318
46946
  }
319
47242
}
320

            
321
97061
void ConnectionManagerImpl::doDeferredStreamDestroy(ActiveStream& stream) {
322
97061
  if (!stream.state_.is_internally_destroyed_) {
323
96765
    ++closed_non_internally_destroyed_requests_;
324
96765
    if (isPrematureRstStream(stream)) {
325
23052
      ++number_premature_stream_resets_;
326
23052
    }
327
96765
  }
328
97061
  if (stream.max_stream_duration_timer_ != nullptr) {
329
45
    stream.max_stream_duration_timer_->disableTimer();
330
45
    stream.max_stream_duration_timer_ = nullptr;
331
45
  }
332
97061
  if (stream.stream_idle_timer_ != nullptr) {
333
93297
    stream.stream_idle_timer_->disableTimer();
334
93297
    stream.stream_idle_timer_ = nullptr;
335
93297
  }
336
97061
  stream.filter_manager_.disarmRequestTimeout();
337
97061
  if (stream.request_header_timer_ != nullptr) {
338
1
    stream.request_header_timer_->disableTimer();
339
1
    stream.request_header_timer_ = nullptr;
340
1
  }
341
97061
  if (stream.access_log_flush_timer_ != nullptr) {
342
10
    stream.access_log_flush_timer_->disableTimer();
343
10
    stream.access_log_flush_timer_ = nullptr;
344
10
  }
345

            
346
  // Only destroy the active stream if the underlying codec has notified us of
347
  // completion or we've internal redirect the stream.
348
97061
  if (!stream.canDestroyStream()) {
349
    // Track that this stream is not expecting any additional calls apart from
350
    // codec notification.
351
2996
    stream.state_.is_zombie_stream_ = true;
352
2996
    return;
353
2996
  }
354

            
355
94065
  if (stream.response_encoder_ != nullptr) {
356
93769
    stream.response_encoder_->getStream().registerCodecEventCallbacks(nullptr);
357
93769
  }
358

            
359
94065
  stream.completeRequest();
360
94065
  stream.filter_manager_.onStreamComplete();
361

            
362
  // For HTTP/3, skip access logging here and add deferred logging info
363
  // to stream info for QuicStatsGatherer to use later.
364
94065
  if (codec_ && codec_->protocol() == Protocol::Http3 &&
365
      // There was a downstream reset, log immediately.
366
94065
      !stream.filter_manager_.sawDownstreamReset() &&
367
      // On recreate stream, log immediately.
368
94065
      stream.response_encoder_ != nullptr &&
369
94065
      Runtime::runtimeFeatureEnabled(
370
1509
          "envoy.reloadable_features.quic_defer_logging_to_ack_listener")) {
371
1508
    stream.deferHeadersAndTrailers();
372
92703
  } else {
373
    // For HTTP/1 and HTTP/2, log here as usual.
374
92557
    stream.log(AccessLog::AccessLogType::DownstreamEnd);
375
92557
  }
376

            
377
94065
  stream.filter_manager_.destroyFilters();
378

            
379
94065
  dispatcher_->deferredDelete(stream.removeFromList(streams_));
380

            
381
  // The response_encoder should never be dangling (unless we're destroying a
382
  // stream we are recreating) as the codec level stream will either outlive the
383
  // ActiveStream, or be alive in deferred deletion queue at this point.
384
94065
  if (stream.response_encoder_) {
385
93769
    stream.response_encoder_->getStream().removeCallbacks(stream);
386
93769
  }
387

            
388
94065
  if (connection_idle_timer_ && streams_.empty()) {
389
27574
    connection_idle_timer_->enableTimer(config_->idleTimeout().value());
390
27574
  }
391
94065
  maybeDrainDueToPrematureResets();
392
94065
}
393

            
394
RequestDecoderHandlePtr ConnectionManagerImpl::newStreamHandle(ResponseEncoder& response_encoder,
395
6
                                                               bool is_internally_created) {
396
6
  RequestDecoder& decoder = newStream(response_encoder, is_internally_created);
397
6
  return std::make_unique<ActiveStreamHandle>(static_cast<ActiveStream&>(decoder));
398
6
}
399

            
400
RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder,
401
94065
                                                 bool is_internally_created) {
402
94065
  TRACE_EVENT("core", "ConnectionManagerImpl::newStream");
403
94065
  if (connection_idle_timer_) {
404
93282
    connection_idle_timer_->disableTimer();
405
93282
  }
406

            
407
94065
  ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());
408

            
409
94065
  Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
410
94065
      response_encoder.getStream().account();
411

            
412
94065
  if (downstream_stream_account == nullptr) {
413
    // Create account, wiring the stream to use it for tracking bytes.
414
    // If tracking is disabled, the wiring becomes a NOP.
415
94061
    auto& buffer_factory = dispatcher_->getWatermarkFactory();
416
94061
    downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
417
94061
    response_encoder.getStream().setAccount(downstream_stream_account);
418
94061
  }
419

            
420
94065
  auto new_stream = std::make_unique<ActiveStream>(
421
94065
      *this, response_encoder.getStream().bufferLimit(), std::move(downstream_stream_account));
422

            
423
94065
  accumulated_requests_++;
424
94065
  if (config_->maxRequestsPerConnection() > 0 &&
425
94065
      accumulated_requests_ >= config_->maxRequestsPerConnection()) {
426
54
    if (codec_->protocol() < Protocol::Http2) {
427
22
      new_stream->filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
428
      // Prevent erroneous debug log of closing due to incoming connection close header.
429
22
      drain_state_ = DrainState::Closing;
430
40
    } else if (drain_state_ == DrainState::NotDraining) {
431
21
      startDrainSequence();
432
21
    }
433
54
    ENVOY_CONN_LOG(debug, "max requests per connection reached", read_callbacks_->connection());
434
54
    stats_.named_.downstream_cx_max_requests_reached_.inc();
435
54
  }
436

            
437
94065
  if (soft_drain_http1_) {
438
8
    new_stream->filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
439
    // Prevent erroneous debug log of closing due to incoming connection close header.
440
8
    drain_state_ = DrainState::Closing;
441
8
  }
442

            
443
94065
  new_stream->state_.is_internally_created_ = is_internally_created;
444
94065
  new_stream->response_encoder_ = &response_encoder;
445
94065
  new_stream->response_encoder_->getStream().addCallbacks(*new_stream);
446
94065
  new_stream->response_encoder_->getStream().registerCodecEventCallbacks(new_stream.get());
447
94065
  if (config_->streamFlushTimeout().has_value()) {
448
93285
    new_stream->response_encoder_->getStream().setFlushTimeout(
449
93285
        config_->streamFlushTimeout().value());
450
93942
  } else {
451
780
    new_stream->response_encoder_->getStream().setFlushTimeout(config_->streamIdleTimeout());
452
780
  }
453
94065
  new_stream->streamInfo().setDownstreamBytesMeter(response_encoder.getStream().bytesMeter());
454
94065
  new_stream->streamInfo().setCodecStreamId(response_encoder.getStream().codecStreamId());
455
  // If the network connection is backed up, the stream should be made aware of it on creation.
456
  // Both HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacksHelper.
457
94065
  ASSERT(read_callbacks_->connection().aboveHighWatermark() == false ||
458
94065
         new_stream->filter_manager_.aboveHighWatermark());
459
94065
  LinkedList::moveIntoList(std::move(new_stream), streams_);
460
94065
  return **streams_.begin();
461
94065
}
462

            
463
void ConnectionManagerImpl::handleCodecErrorImpl(absl::string_view error, absl::string_view details,
464
1769
                                                 StreamInfo::CoreResponseFlag response_flag) {
465
1769
  ENVOY_CONN_LOG(debug, "dispatch error: {}", read_callbacks_->connection(), error);
466
1769
  read_callbacks_->connection().streamInfo().setResponseFlag(response_flag);
467

            
468
  // HTTP/1.1 codec has already sent a 400 response if possible. HTTP/2 codec has already sent
469
  // GOAWAY.
470
1769
  doConnectionClose(Network::ConnectionCloseType::FlushWriteAndDelay, response_flag, details);
471
1769
}
472

            
473
1759
void ConnectionManagerImpl::handleCodecError(absl::string_view error) {
474
1759
  handleCodecErrorImpl(error, absl::StrCat("codec_error:", StringUtil::replaceAllEmptySpace(error)),
475
1759
                       StreamInfo::CoreResponseFlag::DownstreamProtocolError);
476
1759
}
477

            
478
10
void ConnectionManagerImpl::handleCodecOverloadError(absl::string_view error) {
479
10
  handleCodecErrorImpl(error,
480
10
                       absl::StrCat("overload_error:", StringUtil::replaceAllEmptySpace(error)),
481
10
                       StreamInfo::CoreResponseFlag::OverloadManager);
482
10
}
483

            
484
22290
void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
485
22290
  ASSERT(!codec_);
486
22290
  codec_ = config_->createCodec(read_callbacks_->connection(), data, *this, overload_manager_);
487

            
488
22290
  switch (codec_->protocol()) {
489
1995
  case Protocol::Http3:
490
1995
    stats_.named_.downstream_cx_http3_total_.inc();
491
1995
    stats_.named_.downstream_cx_http3_active_.inc();
492
1995
    break;
493
7993
  case Protocol::Http2:
494
7993
    stats_.named_.downstream_cx_http2_total_.inc();
495
7993
    stats_.named_.downstream_cx_http2_active_.inc();
496
7993
    break;
497
12299
  case Protocol::Http11:
498
12302
  case Protocol::Http10:
499
12302
    stats_.named_.downstream_cx_http1_total_.inc();
500
12302
    stats_.named_.downstream_cx_http1_active_.inc();
501
12302
    break;
502
22290
  }
503
22290
}
504

            
505
69564
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
506
69564
  requests_during_dispatch_count_ = 0;
507
69564
  if (!codec_) {
508
    // Close connections if Envoy is under pressure, typically memory, before creating codec.
509
20284
    if (hcm_ondata_creating_codec_ != nullptr && hcm_ondata_creating_codec_->shouldShedLoad()) {
510
4
      stats_.named_.downstream_rq_overload_close_.inc();
511
4
      handleCodecOverloadError("onData codec creation overload");
512
4
      return Network::FilterStatus::StopIteration;
513
4
    }
514
    // Http3 codec should have been instantiated by now.
515
20280
    createCodec(data);
516
20280
  }
517

            
518
69560
  bool redispatch;
519
69566
  do {
520
69566
    redispatch = false;
521

            
522
69566
    const Status status = codec_->dispatch(data);
523

            
524
69566
    if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
525
33
      handleCodecError(status.message());
526
33
      return Network::FilterStatus::StopIteration;
527
69533
    } else if (isCodecProtocolError(status)) {
528
1711
      stats_.named_.downstream_cx_protocol_error_.inc();
529
1711
      handleCodecError(status.message());
530
1711
      return Network::FilterStatus::StopIteration;
531
67878
    } else if (isEnvoyOverloadError(status)) {
532
6
      stats_.named_.downstream_rq_overload_close_.inc();
533
6
      handleCodecOverloadError(status.message());
534
6
      return Network::FilterStatus::StopIteration;
535
6
    }
536
67816
    ASSERT(status.ok());
537

            
538
    // Processing incoming data may release outbound data so check for closure here as well.
539
67816
    checkForDeferredClose(false);
540

            
541
    // The HTTP/1 codec will pause dispatch after a single message is complete. We want to
542
    // either redispatch if there are no streams and we have more data. If we have a single
543
    // complete non-WebSocket stream but have not responded yet we will pause socket reads
544
    // to apply back pressure.
545
67816
    if (codec_->protocol() < Protocol::Http2) {
546
17979
      if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
547
17979
          data.length() > 0 && streams_.empty()) {
548
6
        redispatch = true;
549
6
      }
550
17979
    }
551
67816
  } while (redispatch);
552

            
553
67810
  if (!read_callbacks_->connection().streamInfo().protocol()) {
554
18615
    read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
555
18615
  }
556

            
557
67810
  return Network::FilterStatus::StopIteration;
558
69560
}
559

            
560
22185
Network::FilterStatus ConnectionManagerImpl::onNewConnection() {
561
22185
  if (!read_callbacks_->connection().streamInfo().protocol()) {
562
    // For Non-QUIC traffic, continue passing data to filters.
563
20191
    return Network::FilterStatus::Continue;
564
20191
  }
565
  // Only QUIC connection's stream_info_ specifies protocol.
566
1994
  Buffer::OwnedImpl dummy;
567
1994
  createCodec(dummy);
568
1994
  ASSERT(codec_->protocol() == Protocol::Http3);
569
  // Stop iterating through network filters for QUIC. Currently QUIC connections bypass the
570
  // onData() interface because QUICHE already handles de-multiplexing.
571
1994
  return Network::FilterStatus::StopIteration;
572
22185
}
573

            
574
void ConnectionManagerImpl::resetAllStreams(
575
667
    absl::optional<StreamInfo::CoreResponseFlag> response_flag, absl::string_view details) {
576
45397
  while (!streams_.empty()) {
577
    // Mimic a downstream reset in this case. We must also remove callbacks here. Though we are
578
    // about to close the connection and will disable further reads, it is possible that flushing
579
    // data out can cause stream callbacks to fire (e.g., low watermark callbacks).
580
    //
581
    // TODO(mattklein123): I tried to actually reset through the codec here, but ran into issues
582
    // with nghttp2 state and being unhappy about sending reset frames after the connection had
583
    // been terminated via GOAWAY. It might be possible to do something better here inside the h2
584
    // codec but there are no easy answers and this seems simpler.
585
44730
    auto& stream = *streams_.front();
586
44730
    stream.response_encoder_->getStream().removeCallbacks(stream);
587
44730
    if (!stream.response_encoder_->getStream().responseDetails().empty()) {
588
323
      stream.filter_manager_.streamInfo().setResponseCodeDetails(
589
323
          stream.response_encoder_->getStream().responseDetails());
590
44695
    } else if (!details.empty()) {
591
44407
      stream.filter_manager_.streamInfo().setResponseCodeDetails(details);
592
44407
    }
593
44730
    if (response_flag.has_value()) {
594
41113
      stream.filter_manager_.streamInfo().setResponseFlag(response_flag.value());
595
41113
    }
596
44730
    stream.onResetStream(StreamResetReason::ConnectionTermination, absl::string_view());
597
44730
  }
598
667
}
599

            
600
44039
void ConnectionManagerImpl::onEvent(Network::ConnectionEvent event) {
601
44039
  if (event == Network::ConnectionEvent::LocalClose) {
602
3124
    stats_.named_.downstream_cx_destroy_local_.inc();
603
3124
  }
604

            
605
44039
  if (event == Network::ConnectionEvent::RemoteClose ||
606
44039
      event == Network::ConnectionEvent::LocalClose) {
607

            
608
22353
    std::string details;
609
22353
    if (event == Network::ConnectionEvent::RemoteClose) {
610
19229
      remote_close_ = true;
611
19229
      stats_.named_.downstream_cx_destroy_remote_.inc();
612
19229
      details = StreamInfo::ResponseCodeDetails::get().DownstreamRemoteDisconnect;
613
19522
    } else {
614
3124
      absl::string_view local_close_reason = read_callbacks_->connection().localCloseReason();
615
3124
      ENVOY_BUG(!local_close_reason.empty(), "Local Close Reason was not set!");
616
3124
      details = fmt::format(
617
3124
          fmt::runtime(StreamInfo::ResponseCodeDetails::get().DownstreamLocalDisconnect),
618
3124
          StringUtil::replaceAllEmptySpace(local_close_reason));
619
3124
    }
620

            
621
    // TODO(mattklein123): It is technically possible that something outside of the filter causes
622
    // a local connection close, so we still guard against that here. A better solution would be to
623
    // have some type of "pre-close" callback that we could hook for cleanup that would get called
624
    // regardless of where local close is invoked from.
625
    // NOTE: that this will cause doConnectionClose() to get called twice in the common local close
626
    // cases, but the method protects against that.
627
    // NOTE: In the case where a local close comes from outside the filter, this will cause any
628
    // stream closures to increment remote close stats. We should do better here in the future,
629
    // via the pre-close callback mentioned above.
630
22353
    doConnectionClose(absl::nullopt, StreamInfo::CoreResponseFlag::DownstreamConnectionTermination,
631
22353
                      details);
632
22353
  }
633
44039
}
634

            
635
void ConnectionManagerImpl::doConnectionClose(
636
    absl::optional<Network::ConnectionCloseType> close_type,
637
26613
    absl::optional<StreamInfo::CoreResponseFlag> response_flag, absl::string_view details) {
638
26613
  if (connection_idle_timer_) {
639
21756
    connection_idle_timer_->disableTimer();
640
21756
    connection_idle_timer_.reset();
641
21756
  }
642

            
643
26613
  if (connection_duration_timer_) {
644
85
    connection_duration_timer_->disableTimer();
645
85
    connection_duration_timer_.reset();
646
85
  }
647

            
648
26613
  if (drain_timer_) {
649
187
    drain_timer_->disableTimer();
650
187
    drain_timer_.reset();
651
187
  }
652

            
653
26613
  if (!streams_.empty()) {
654
667
    const Network::ConnectionEvent event = close_type.has_value()
655
667
                                               ? Network::ConnectionEvent::LocalClose
656
667
                                               : Network::ConnectionEvent::RemoteClose;
657
667
    if (event == Network::ConnectionEvent::LocalClose) {
658
377
      stats_.named_.downstream_cx_destroy_local_active_rq_.inc();
659
377
    }
660
667
    if (event == Network::ConnectionEvent::RemoteClose) {
661
290
      stats_.named_.downstream_cx_destroy_remote_active_rq_.inc();
662
290
    }
663

            
664
667
    stats_.named_.downstream_cx_destroy_active_rq_.inc();
665
667
    user_agent_.onConnectionDestroy(event, true);
666
    // Note that resetAllStreams() does not actually write anything to the wire. It just resets
667
    // all upstream streams and their filter stacks. Thus, there are no issues around recursive
668
    // entry.
669
667
    resetAllStreams(response_flag, details);
670
667
  }
671

            
672
26613
  if (close_type.has_value()) {
673
4260
    read_callbacks_->connection().close(close_type.value(), details);
674
4260
  }
675
26613
}
676

            
677
96765
bool ConnectionManagerImpl::isPrematureRstStream(const ActiveStream& stream) const {
678
  // Check if the request was prematurely reset, by comparing its lifetime to the configured
679
  // threshold.
680
96765
  ASSERT(!stream.state_.is_internally_destroyed_);
681
96765
  absl::optional<std::chrono::nanoseconds> duration =
682
96765
      stream.filter_manager_.streamInfo().currentDuration();
683

            
684
  // Check if request lifetime is longer than the premature reset threshold.
685
96765
  if (duration) {
686
96765
    const uint64_t lifetime = std::chrono::duration_cast<std::chrono::seconds>(*duration).count();
687
96765
    const uint64_t min_lifetime = runtime_.snapshot().getInteger(
688
96765
        ConnectionManagerImpl::PrematureResetMinStreamLifetimeSecondsKey, 1);
689
96765
    if (lifetime > min_lifetime) {
690
33820
      return false;
691
33820
    }
692
96765
  }
693

            
694
  // If request has completed before configured threshold, also check if the Envoy proxied the
695
  // response from the upstream. Requests without the response status were reset.
696
  // TODO(RyanTheOptimist): Possibly support half_closed_local instead.
697
62945
  return !stream.filter_manager_.streamInfo().responseCode();
698
96765
}
699

            
700
// Sends a GOAWAY if too many streams have been reset prematurely on this
701
// connection.
702
94065
void ConnectionManagerImpl::maybeDrainDueToPrematureResets() {
703
  // If the connection has been drained due to premature resets, do not check this again.
704
  // Without this flag, recursion may occur, as shown in the following stack trace:
705
  //
706
  //   maybeDrainDueToPrematureResets()
707
  //   doConnectionClose()
708
  //   resetAllStreams()
709
  //   onResetStream()
710
  //   doDeferredStreamDestroy()
711
  //   maybeDrainDueToPrematureResets()
712
  //   ...
713
  //
714
  // The recursion will continue until all streams are destroyed. If there are many streams
715
  // that may result in a stack overflow. This flag is used to avoid above recursion.
716
94065
  if (drained_due_to_premature_resets_) {
717
3600
    return;
718
3600
  }
719

            
720
90465
  if (closed_non_internally_destroyed_requests_ == 0) {
721
266
    return;
722
266
  }
723

            
724
90199
  const uint64_t limit =
725
90199
      runtime_.snapshot().getInteger(ConnectionManagerImpl::PrematureResetTotalStreamCountKey, 500);
726

            
727
90199
  if (closed_non_internally_destroyed_requests_ < limit) {
728
    // Even though the total number of streams have not reached `limit`, check if the number of bad
729
    // streams is high enough that even if every subsequent stream is good, the connection
730
    // would be closed once the limit is reached, and if so close the connection now.
731
37286
    if (number_premature_stream_resets_ * 2 < limit) {
732
36280
      return;
733
36280
    }
734
82492
  } else {
735
52913
    if (number_premature_stream_resets_ * 2 < closed_non_internally_destroyed_requests_) {
736
20965
      return;
737
20965
    }
738
52913
  }
739

            
740
32954
  if (read_callbacks_->connection().state() == Network::Connection::State::Open) {
741
8
    stats_.named_.downstream_rq_too_many_premature_resets_.inc();
742

            
743
    // Mark the the connection has been drained due to too many premature resets.
744
8
    drained_due_to_premature_resets_ = true;
745

            
746
8
    doConnectionClose(Network::ConnectionCloseType::Abort, absl::nullopt,
747
8
                      "too_many_premature_resets");
748
8
  }
749
32954
}
750

            
751
1
void ConnectionManagerImpl::onGoAway(GoAwayErrorCode) {
752
  // Currently we do nothing with remote go away frames. In the future we can decide to no longer
753
  // push resources if applicable.
754
1
}
755

            
756
38
void ConnectionManagerImpl::onIdleTimeout() {
757
38
  ENVOY_CONN_LOG(debug, "idle timeout", read_callbacks_->connection());
758
38
  stats_.named_.downstream_cx_idle_timeout_.inc();
759
38
  if (!codec_) {
760
    // No need to delay close after flushing since an idle timeout has already fired. Attempt to
761
    // write out buffered data one last time and issue a local close if successful.
762
1
    doConnectionClose(Network::ConnectionCloseType::FlushWrite, absl::nullopt,
763
1
                      StreamInfo::LocalCloseReasons::get().IdleTimeoutOnConnection);
764
38
  } else if (drain_state_ == DrainState::NotDraining) {
765
37
    startDrainSequence();
766
37
  }
767
38
}
768

            
769
85
void ConnectionManagerImpl::onConnectionDurationTimeout() {
770
85
  ENVOY_CONN_LOG(debug, "max connection duration reached", read_callbacks_->connection());
771
85
  stats_.named_.downstream_cx_max_duration_reached_.inc();
772
85
  if (!codec_) {
773
    // Attempt to write out buffered data one last time and issue a local close if successful.
774
8
    doConnectionClose(Network::ConnectionCloseType::FlushWrite,
775
8
                      StreamInfo::CoreResponseFlag::DurationTimeout,
776
8
                      StreamInfo::ResponseCodeDetails::get().DurationTimeout);
777
78
  } else if (drain_state_ == DrainState::NotDraining) {
778
77
    if (config_->http1SafeMaxConnectionDuration() && codec_->protocol() < Protocol::Http2) {
779
9
      ENVOY_CONN_LOG(debug,
780
9
                     "HTTP1-safe max connection duration is configured -- skipping drain sequence.",
781
9
                     read_callbacks_->connection());
782
9
      stats_.named_.downstream_cx_http1_soft_drain_.inc();
783
9
      soft_drain_http1_ = true;
784
70
    } else {
785
68
      startDrainSequence();
786
68
    }
787
77
  }
788
85
}
789

            
790
124
void ConnectionManagerImpl::onDrainTimeout() {
791
124
  ASSERT(drain_state_ != DrainState::NotDraining);
792
124
  codec_->goAway();
793
124
  drain_state_ = DrainState::Closing;
794
124
  checkForDeferredClose(false);
795
124
}
796

            
797
18
void ConnectionManagerImpl::sendGoAwayAndClose(bool graceful) {
798
18
  ENVOY_CONN_LOG(trace, "connection manager sendGoAwayAndClose was triggerred from filters.",
799
18
                 read_callbacks_->connection());
800
18
  if (go_away_sent_) {
801
    return;
802
  }
803

            
804
  // Use graceful drain sequence if graceful shutdown is requested.
805
  // startDrainSequence() works for both HTTP/1 and HTTP/2:
806
  // - HTTP/1: shutdownNotice() + goAway() provides graceful close
807
  // - HTTP/2: shutdownNotice() sends GOAWAY with high stream ID, then goAway() sends final GOAWAY
808
18
  if (graceful) {
809
1
    if (drain_state_ == DrainState::NotDraining) {
810
1
      startDrainSequence();
811
1
    }
812
    // Consider the "go away" process started once draining begins.
813
    // The actual GOAWAY frame will be sent in onDrainTimeout(), but we want to
814
    // prevent multiple calls to sendGoAwayAndClose() from starting multiple drain sequences.
815
1
    go_away_sent_ = true;
816
18
  } else {
817
    // Immediate close - send GOAWAY and close immediately
818
17
    codec_->shutdownNotice();
819
17
    codec_->goAway();
820
17
    go_away_sent_ = true;
821
17
    doConnectionClose(Network::ConnectionCloseType::FlushWriteAndDelay, absl::nullopt,
822
17
                      "forced_goaway");
823
17
  }
824
18
}
825

            
826
void ConnectionManagerImpl::chargeTracingStats(const Tracing::Reason& tracing_reason,
827
271
                                               ConnectionManagerTracingStats& tracing_stats) {
828
271
  switch (tracing_reason) {
829
1
  case Tracing::Reason::ClientForced:
830
1
    tracing_stats.client_enabled_.inc();
831
1
    break;
832
40
  case Tracing::Reason::Sampling:
833
40
    tracing_stats.random_sampling_.inc();
834
40
    break;
835
11
  case Tracing::Reason::ServiceForced:
836
11
    tracing_stats.service_forced_.inc();
837
11
    break;
838
219
  default:
839
219
    tracing_stats.not_traceable_.inc();
840
219
    break;
841
271
  }
842
271
}
843

            
844
absl::optional<absl::string_view>
845
3031
ConnectionManagerImpl::HttpStreamIdProviderImpl::toStringView() const {
846
3031
  if (parent_.request_headers_ == nullptr) {
847
1
    return {};
848
1
  }
849
3030
  ASSERT(parent_.connection_manager_.config_->requestIDExtension() != nullptr);
850
3030
  return parent_.connection_manager_.config_->requestIDExtension()->get(*parent_.request_headers_);
851
3031
}
852

            
853
1
absl::optional<uint64_t> ConnectionManagerImpl::HttpStreamIdProviderImpl::toInteger() const {
854
1
  if (parent_.request_headers_ == nullptr) {
855
    return {};
856
  }
857
1
  ASSERT(parent_.connection_manager_.config_->requestIDExtension() != nullptr);
858
1
  return parent_.connection_manager_.config_->requestIDExtension()->getInteger(
859
1
      *parent_.request_headers_);
860
1
}
861

            
862
namespace {
863
constexpr absl::string_view kRouteFactoryName = "envoy.route_config_update_requester.default";
864
} // namespace
865

            
866
ConnectionManagerImpl::ActiveStream::ActiveStream(ConnectionManagerImpl& connection_manager,
867
                                                  uint32_t buffer_limit,
868
                                                  Buffer::BufferMemoryAccountSharedPtr account)
869
94065
    : connection_manager_(connection_manager),
870
94065
      connection_manager_tracing_config_(connection_manager_.config_->tracingConfig() == nullptr
871
94065
                                             ? absl::nullopt
872
94065
                                             : makeOptRef<const TracingConnectionManagerConfig>(
873
346
                                                   *connection_manager_.config_->tracingConfig())),
874
94065
      stream_id_(connection_manager.random_generator_.random()),
875
94065
      filter_manager_(*this, *connection_manager_.dispatcher_,
876
94065
                      connection_manager_.read_callbacks_->connection(), stream_id_,
877
94065
                      std::move(account), connection_manager_.config_->proxy100Continue(),
878
94065
                      buffer_limit, connection_manager_.config_->filterFactory(),
879
94065
                      connection_manager_.config_->localReply(),
880
94065
                      connection_manager_.codec_->protocol(), connection_manager_.timeSource(),
881
94065
                      connection_manager_.read_callbacks_->connection().streamInfo().filterState(),
882
94065
                      connection_manager_.overload_manager_),
883
94065
      request_response_timespan_(new Stats::HistogramCompletableTimespanImpl(
884
94065
          connection_manager_.stats_.named_.downstream_rq_time_, connection_manager_.timeSource())),
885
      has_explicit_global_flush_timeout_(
886
94065
          connection_manager.config_->streamFlushTimeout().has_value()),
887
      header_validator_(
888
94065
          connection_manager.config_->makeHeaderValidator(connection_manager.codec_->protocol())),
889
94065
      trace_refresh_after_route_refresh_(Runtime::runtimeFeatureEnabled(
890
94065
          "envoy.reloadable_features.trace_refresh_after_route_refresh")) {
891
94065
  ASSERT(!connection_manager.config_->isRoutable() ||
892
94065
             ((connection_manager.config_->routeConfigProvider() == nullptr &&
893
94065
               connection_manager.config_->scopedRouteConfigProvider() != nullptr &&
894
94065
               connection_manager.config_->scopeKeyBuilder().has_value()) ||
895
94065
              (connection_manager.config_->routeConfigProvider() != nullptr &&
896
94065
               connection_manager.config_->scopedRouteConfigProvider() == nullptr &&
897
94065
               !connection_manager.config_->scopeKeyBuilder().has_value())),
898
94065
         "Either routeConfigProvider or (scopedRouteConfigProvider and scopeKeyBuilder) should be "
899
94065
         "set in "
900
94065
         "ConnectionManagerImpl.");
901

            
902
94065
  filter_manager_.streamInfo().setStreamIdProvider(
903
94065
      std::make_shared<HttpStreamIdProviderImpl>(*this));
904

            
905
94065
  filter_manager_.streamInfo().setShouldSchemeMatchUpstream(
906
94065
      connection_manager.config_->shouldSchemeMatchUpstream());
907

            
908
  // TODO(chaoqin-li1123): can this be moved to the on demand filter?
909
94065
  auto factory = Envoy::Config::Utility::getFactoryByName<RouteConfigUpdateRequesterFactory>(
910
94065
      kRouteFactoryName);
911
94065
  if (connection_manager_.config_->isRoutable() &&
912
94065
      connection_manager.config_->routeConfigProvider() != nullptr && factory) {
913
93441
    route_config_update_requester_ = factory->createRouteConfigUpdateRequester(
914
93441
        connection_manager.config_->routeConfigProvider());
915
93848
  } else if (connection_manager_.config_->isRoutable() &&
916
624
             connection_manager.config_->scopedRouteConfigProvider() != nullptr &&
917
624
             connection_manager.config_->scopeKeyBuilder().has_value() && factory) {
918
183
    route_config_update_requester_ = factory->createRouteConfigUpdateRequester(
919
183
        connection_manager.config_->scopedRouteConfigProvider(),
920
183
        connection_manager.config_->scopeKeyBuilder());
921
183
  }
922
94065
  ScopeTrackerScopeState scope(this,
923
94065
                               connection_manager_.read_callbacks_->connection().dispatcher());
924

            
925
94065
  connection_manager_.stats_.named_.downstream_rq_total_.inc();
926
94065
  connection_manager_.stats_.named_.downstream_rq_active_.inc();
927
94065
  if (connection_manager_.codec_->protocol() == Protocol::Http2) {
928
74740
    connection_manager_.stats_.named_.downstream_rq_http2_total_.inc();
929
88741
  } else if (connection_manager_.codec_->protocol() == Protocol::Http3) {
930
2529
    connection_manager_.stats_.named_.downstream_rq_http3_total_.inc();
931
17995
  } else {
932
16796
    connection_manager_.stats_.named_.downstream_rq_http1_total_.inc();
933
16796
  }
934

            
935
94065
  if (connection_manager_.config_->streamIdleTimeout().count()) {
936
93293
    idle_timeout_ms_ = connection_manager_.config_->streamIdleTimeout();
937
93293
    stream_idle_timer_ = connection_manager_.dispatcher_->createScaledTimer(
938
93293
        Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
939
93293
        [this]() -> void { onIdleTimeout(); });
940
93293
    resetIdleTimer();
941
93293
  }
942

            
943
94065
  if (connection_manager_.config_->requestTimeout().count()) {
944
71
    std::chrono::milliseconds request_timeout = connection_manager_.config_->requestTimeout();
945
71
    request_timer_ =
946
71
        connection_manager.dispatcher_->createTimer([this]() -> void { onRequestTimeout(); });
947
71
    request_timer_->enableTimer(request_timeout, this);
948
71
  }
949

            
950
94065
  if (connection_manager_.config_->requestHeadersTimeout().count()) {
951
2
    std::chrono::milliseconds request_headers_timeout =
952
2
        connection_manager_.config_->requestHeadersTimeout();
953
2
    request_header_timer_ =
954
2
        connection_manager.dispatcher_->createTimer([this]() -> void { onRequestHeaderTimeout(); });
955
2
    request_header_timer_->enableTimer(request_headers_timeout, this);
956
2
  }
957

            
958
94065
  const auto max_stream_duration = connection_manager_.config_->maxStreamDuration();
959
94065
  if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
960
38
    max_stream_duration_timer_ = connection_manager.dispatcher_->createTimer(
961
38
        [this]() -> void { onStreamMaxDurationReached(); });
962
38
    max_stream_duration_timer_->enableTimer(
963
38
        connection_manager_.config_->maxStreamDuration().value(), this);
964
38
  }
965

            
966
94065
  if (connection_manager_.config_->accessLogFlushInterval().has_value()) {
967
11
    access_log_flush_timer_ = connection_manager.dispatcher_->createTimer([this]() -> void {
968
      // If the request is complete, we've already done the stream-end access-log, and shouldn't
969
      // do the periodic log.
970
11
      if (!streamInfo().requestComplete().has_value()) {
971
11
        log(AccessLog::AccessLogType::DownstreamPeriodic);
972
11
        refreshAccessLogFlushTimer();
973
11
      }
974
11
      const SystemTime now = connection_manager_.timeSource().systemTime();
975
      // Downstream bytes meter is guaranteed to be non-null because ActiveStream and the timer
976
      // event are created on the same thread that sets the meter in
977
      // ConnectionManagerImpl::newStream.
978
11
      filter_manager_.streamInfo().getDownstreamBytesMeter()->takeDownstreamPeriodicLoggingSnapshot(
979
11
          now);
980
11
      if (auto& upstream_bytes_meter = filter_manager_.streamInfo().getUpstreamBytesMeter();
981
11
          upstream_bytes_meter != nullptr) {
982
11
        upstream_bytes_meter->takeDownstreamPeriodicLoggingSnapshot(now);
983
11
      }
984
11
    });
985
10
    refreshAccessLogFlushTimer();
986
10
  }
987
94065
}
988

            
989
92585
void ConnectionManagerImpl::ActiveStream::log(AccessLog::AccessLogType type) {
990
92585
  const Formatter::Context log_context{
991
92585
      request_headers_.get(), response_headers_.get(), response_trailers_.get(), {}, type,
992
92585
      active_span_.get()};
993

            
994
92585
  filter_manager_.log(log_context);
995

            
996
92758
  for (const auto& access_logger : connection_manager_.config_->accessLogs()) {
997
72951
    access_logger->log(log_context, filter_manager_.streamInfo());
998
72951
  }
999
92585
}
94065
void ConnectionManagerImpl::ActiveStream::completeRequest() {
94065
  filter_manager_.streamInfo().onRequestComplete();
94065
  connection_manager_.stats_.named_.downstream_rq_active_.dec();
94065
  if (filter_manager_.streamInfo().healthCheck()) {
264
    connection_manager_.config_->tracingStats().health_check_.inc();
264
  }
94065
  if (active_span_) {
52
    Tracing::HttpTracerUtility::finalizeDownstreamSpan(
52
        *active_span_, request_headers_.get(), response_headers_.get(), response_trailers_.get(),
52
        filter_manager_.streamInfo(), *this);
52
  }
94065
  if (state_.successful_upgrade_) {
601
    connection_manager_.stats_.named_.downstream_cx_upgrades_active_.dec();
601
  }
94065
}
564933
void ConnectionManagerImpl::ActiveStream::resetIdleTimer() {
564933
  if (stream_idle_timer_ != nullptr) {
    // TODO(htuch): If this shows up in performance profiles, optimize by only
    // updating a timestamp here and doing periodic checks for idle timeouts
    // instead, or reducing the accuracy of timers.
557464
    stream_idle_timer_->enableTimer(idle_timeout_ms_);
557464
  }
564933
}
184
void ConnectionManagerImpl::ActiveStream::onIdleTimeout() {
184
  connection_manager_.stats_.named_.downstream_rq_idle_timeout_.inc();
184
  filter_manager_.streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::StreamIdleTimeout);
184
  sendLocalReply(
184
      Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
184
      "stream timeout", nullptr, absl::nullopt,
184
      StreamInfo::ResponseCodeDetails::get().StreamIdleTimeout);
184
}
27
void ConnectionManagerImpl::ActiveStream::onRequestTimeout() {
27
  connection_manager_.stats_.named_.downstream_rq_timeout_.inc();
27
  sendLocalReply(
27
      Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
27
      "request timeout", nullptr, absl::nullopt,
27
      StreamInfo::ResponseCodeDetails::get().RequestOverallTimeout);
27
}
1
void ConnectionManagerImpl::ActiveStream::onRequestHeaderTimeout() {
1
  connection_manager_.stats_.named_.downstream_rq_header_timeout_.inc();
1
  sendLocalReply(
1
      Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
1
      "request header timeout", nullptr, absl::nullopt,
1
      StreamInfo::ResponseCodeDetails::get().RequestHeaderTimeout);
1
}
32
void ConnectionManagerImpl::ActiveStream::onStreamMaxDurationReached() {
32
  ENVOY_STREAM_LOG(debug, "Stream max duration time reached", *this);
32
  connection_manager_.stats_.named_.downstream_rq_max_duration_reached_.inc();
32
  sendLocalReply(
32
      Http::Utility::maybeRequestTimeoutCode(filter_manager_.hasLastDownstreamByteReceived()),
32
      "downstream duration timeout", nullptr, Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded,
32
      StreamInfo::ResponseCodeDetails::get().MaxDurationTimeout);
32
}
47577
void ConnectionManagerImpl::ActiveStream::chargeStats(const ResponseHeaderMap& headers) {
47577
  if (trace_refresh_after_route_refresh_ && connection_manager_tracing_config_.has_value()) {
266
    const Tracing::Decision tracing_decision =
266
        Tracing::TracerUtility::shouldTraceRequest(filter_manager_.streamInfo());
266
    ConnectionManagerImpl::chargeTracingStats(tracing_decision.reason,
266
                                              connection_manager_.config_->tracingStats());
266
  }
47577
  uint64_t response_code = Utility::getResponseStatus(headers);
47577
  filter_manager_.streamInfo().setResponseCode(response_code);
47577
  if (filter_manager_.streamInfo().healthCheck()) {
264
    return;
264
  }
  // No response is sent back downstream for internal redirects, so don't charge downstream stats.
47313
  const absl::optional<std::string>& response_code_details =
47313
      filter_manager_.streamInfo().responseCodeDetails();
47313
  if (response_code_details.has_value() &&
47313
      response_code_details == Envoy::StreamInfo::ResponseCodeDetails::get().InternalRedirect) {
151
    return;
151
  }
47162
  connection_manager_.stats_.named_.downstream_rq_completed_.inc();
47162
  connection_manager_.listener_stats_.downstream_rq_completed_.inc();
47162
  if (CodeUtility::is1xx(response_code)) {
253
    connection_manager_.stats_.named_.downstream_rq_1xx_.inc();
253
    connection_manager_.listener_stats_.downstream_rq_1xx_.inc();
46917
  } else if (CodeUtility::is2xx(response_code)) {
37231
    connection_manager_.stats_.named_.downstream_rq_2xx_.inc();
37231
    connection_manager_.listener_stats_.downstream_rq_2xx_.inc();
41922
  } else if (CodeUtility::is3xx(response_code)) {
194
    connection_manager_.stats_.named_.downstream_rq_3xx_.inc();
194
    connection_manager_.listener_stats_.downstream_rq_3xx_.inc();
9584
  } else if (CodeUtility::is4xx(response_code)) {
5726
    connection_manager_.stats_.named_.downstream_rq_4xx_.inc();
5726
    connection_manager_.listener_stats_.downstream_rq_4xx_.inc();
8947
  } else if (CodeUtility::is5xx(response_code)) {
3746
    connection_manager_.stats_.named_.downstream_rq_5xx_.inc();
3746
    connection_manager_.listener_stats_.downstream_rq_5xx_.inc();
3746
  }
47162
}
91795
const Network::Connection* ConnectionManagerImpl::ActiveStream::connection() {
91795
  return &connection_manager_.read_callbacks_->connection();
91795
}
90323
uint32_t ConnectionManagerImpl::ActiveStream::localPort() {
90323
  auto ip = connection()->connectionInfoProvider().localAddress()->ip();
90323
  if (ip == nullptr) {
15
    return 0;
15
  }
90308
  return ip->port();
90323
}
namespace {
5
bool streamErrorOnlyErrors(absl::string_view error_details) {
  // Pre UHV HCM did not respect stream_error_on_invalid_http_message
  // and only sent 400 for specific errors.
  // TODO(#28555): make these errors respect the stream_error_on_invalid_http_message
5
  return error_details == UhvResponseCodeDetail::get().FragmentInUrlPath ||
5
         error_details == UhvResponseCodeDetail::get().EscapedSlashesInPath ||
5
         error_details == UhvResponseCodeDetail::get().Percent00InPath;
5
}
} // namespace
51
void ConnectionManagerImpl::ActiveStream::setRequestDecorator(RequestHeaderMap& headers) {
51
  ASSERT(active_span_ != nullptr);
51
  const Router::Decorator* decorater = route_decorator_;
  // If a decorator has been defined, apply it to the active span.
51
  absl::string_view decorated_operation;
51
  if (decorater != nullptr) {
11
    decorated_operation = decorater->getOperation();
11
    decorater->apply(*active_span_);
11
    state_.decorated_propagate_ = decorater->propagate();
11
  }
51
  if (connection_manager_tracing_config_->operation_name_ == Tracing::OperationName::Egress) {
    // For egress (outbound) requests, pass the decorator's operation name (if defined and
    // propagation enabled) as a request header to enable the receiving service to use it in its
    // server span.
3
    if (!decorated_operation.empty() && state_.decorated_propagate_) {
2
      headers.setEnvoyDecoratorOperation(decorated_operation);
2
    }
48
  } else {
48
    absl::string_view req_operation_override = headers.getEnvoyDecoratorOperationValue();
    // For ingress (inbound) requests, if a decorator operation name has been provided, it
    // should be used to override the active span's operation.
48
    if (!req_operation_override.empty()) {
1
      active_span_->setOperation(req_operation_override);
      // Set the decorator operation as overridden to avoid propagating the route decorator
      // operation to the client when the setResponseDecorator() is called.
1
      state_.decorator_overriden_ = true;
1
    }
    // Remove header so not propagated to service
48
    headers.removeEnvoyDecoratorOperation();
48
  }
51
}
49
void ConnectionManagerImpl::ActiveStream::setResponseDecorator(ResponseHeaderMap& headers) {
49
  ASSERT(active_span_ != nullptr);
49
  if (connection_manager_tracing_config_->operation_name_ == Tracing::OperationName::Ingress) {
    // For ingress (inbound) responses, if the request headers do not include a
    // decorator operation (override), and the decorated operation should be
    // propagated, then pass the decorator's operation name (if defined)
    // as a response header to enable the client service to use it in its client span.
46
    if (state_.decorated_propagate_ && !state_.decorator_overriden_) {
44
      absl::string_view decorated_operation =
44
          route_decorator_ != nullptr ? route_decorator_->getOperation() : absl::string_view();
44
      if (!decorated_operation.empty()) {
        // If the decorator operation is defined, set it as the response header.
4
        headers.setEnvoyDecoratorOperation(decorated_operation);
4
      }
44
    }
46
  } else if (connection_manager_tracing_config_->operation_name_ ==
3
             Tracing::OperationName::Egress) {
3
    const absl::string_view resp_operation_override = headers.getEnvoyDecoratorOperationValue();
    // For Egress (outbound) response, if a decorator operation name has been provided, it
    // should be used to override the active span's operation.
3
    if (!resp_operation_override.empty()) {
1
      active_span_->setOperation(resp_operation_override);
1
    }
    // Remove header so not propagated to service.
3
    headers.removeEnvoyDecoratorOperation();
3
  }
49
}
91472
bool ConnectionManagerImpl::ActiveStream::validateHeaders() {
91472
  if (header_validator_) {
10
    auto validation_result = header_validator_->validateRequestHeaders(*request_headers_);
10
    bool failure = !validation_result.ok();
10
    bool redirect = false;
10
    bool is_grpc = Grpc::Common::hasGrpcContentType(*request_headers_);
10
    std::string failure_details(validation_result.details());
10
    if (!failure) {
7
      auto transformation_result = header_validator_->transformRequestHeaders(*request_headers_);
7
      failure = !transformation_result.ok();
7
      redirect = transformation_result.action() ==
7
                 Http::ServerHeaderValidator::RequestHeadersTransformationResult::Action::Redirect;
7
      failure_details = std::string(transformation_result.details());
7
      if (redirect && !is_grpc) {
1
        connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
6
      } else if (failure) {
1
        connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
1
      }
7
    }
10
    if (failure) {
5
      std::function<void(ResponseHeaderMap & headers)> modify_headers;
5
      Code response_code = failure_details == Http1ResponseCodeDetail::get().InvalidTransferEncoding
5
                               ? Code::NotImplemented
5
                               : Code::BadRequest;
5
      absl::optional<Grpc::Status::GrpcStatus> grpc_status;
5
      if (redirect && !is_grpc) {
1
        response_code = Code::TemporaryRedirect;
1
        modify_headers = [new_path = request_headers_->Path()->value().getStringView()](
1
                             Http::ResponseHeaderMap& response_headers) -> void {
1
          response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
1
        };
5
      } else if (is_grpc) {
2
        grpc_status = Grpc::Status::WellKnownGrpcStatus::Internal;
2
      }
5
      filter_manager_.streamInfo().setResponseFlag(
5
          StreamInfo::CoreResponseFlag::DownstreamProtocolError);
      // H/2 codec was resetting requests that were rejected due to headers with underscores,
      // instead of sending 400. Preserving this behavior for now.
      // TODO(#24466): Make H/2 behavior consistent with H/1 and H/3.
5
      if (failure_details == UhvResponseCodeDetail::get().InvalidUnderscore &&
5
          connection_manager_.codec_->protocol() == Protocol::Http2) {
        filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
        resetStream();
5
      } else {
5
        sendLocalReply(response_code, "", modify_headers, grpc_status, failure_details);
5
        if (!response_encoder_->streamErrorOnInvalidHttpMessage() &&
5
            !streamErrorOnlyErrors(failure_details)) {
5
          connection_manager_.handleCodecError(failure_details);
5
        }
5
      }
5
      return false;
5
    }
10
  }
91467
  return true;
91472
}
493
bool ConnectionManagerImpl::ActiveStream::validateTrailers(RequestTrailerMap& trailers) {
493
  if (!header_validator_) {
489
    return true;
489
  }
4
  auto validation_result = header_validator_->validateRequestTrailers(trailers);
4
  std::string failure_details(validation_result.details());
4
  if (validation_result.ok()) {
1
    auto transformation_result = header_validator_->transformRequestTrailers(trailers);
1
    if (transformation_result.ok()) {
      return true;
    }
1
    failure_details = std::string(transformation_result.details());
1
  }
4
  Code response_code = Code::BadRequest;
4
  absl::optional<Grpc::Status::GrpcStatus> grpc_status;
4
  if (Grpc::Common::hasGrpcContentType(*request_headers_)) {
    grpc_status = Grpc::Status::WellKnownGrpcStatus::Internal;
  }
4
  filter_manager_.streamInfo().setResponseFlag(
4
      StreamInfo::CoreResponseFlag::DownstreamProtocolError);
  // H/2 codec was resetting requests that were rejected due to headers with underscores,
  // instead of sending 400. Preserving this behavior for now.
  // TODO(#24466): Make H/2 behavior consistent with H/1 and H/3.
4
  if (failure_details == UhvResponseCodeDetail::get().InvalidUnderscore &&
4
      connection_manager_.codec_->protocol() == Protocol::Http2) {
    filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
    resetStream();
4
  } else {
    // TODO(#24735): Harmonize H/2 and H/3 behavior with H/1
4
    if (connection_manager_.codec_->protocol() < Protocol::Http2) {
2
      sendLocalReply(response_code, "", nullptr, grpc_status, failure_details);
4
    } else {
2
      filter_manager_.streamInfo().setResponseCodeDetails(failure_details);
2
      resetStream();
2
    }
4
    if (!response_encoder_->streamErrorOnInvalidHttpMessage()) {
4
      connection_manager_.handleCodecError(failure_details);
4
    }
4
  }
4
  return false;
4
}
334657
void ConnectionManagerImpl::ActiveStream::maybeRecordLastByteReceived(bool end_stream) {
  // If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
334657
  if (end_stream && !filter_manager_.hasLastDownstreamByteReceived()) {
84948
    filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
84948
        connection_manager_.dispatcher_->timeSource());
84948
    ENVOY_STREAM_LOG(debug, "request end stream timestamp recorded", *this);
84948
  }
334657
}
// Ordering in this function is complicated, but important.
//
// We want to do minimal work before selecting route and creating a filter
// chain to maximize the number of requests which get custom filter behavior,
// e.g. registering access logging.
//
// This must be balanced by doing sanity checking for invalid requests (one
// can't route select properly without full headers), checking state required to
// serve error responses (connection close, head requests, etc), and
// modifications which may themselves affect route selection.
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapSharedPtr&& headers,
91472
                                                        bool end_stream) {
91472
  ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
91472
                   *headers);
  // We only want to record this when reading the headers the first time, not when recreating
  // a stream.
91472
  if (!filter_manager_.hasLastDownstreamByteReceived()) {
91176
    filter_manager_.streamInfo().downstreamTiming().onLastDownstreamHeaderRxByteReceived(
91176
        connection_manager_.dispatcher_->timeSource());
91176
  }
91472
  ScopeTrackerScopeState scope(this,
91472
                               connection_manager_.read_callbacks_->connection().dispatcher());
91472
  request_headers_ = std::move(headers);
91472
  filter_manager_.requestHeadersInitialized();
91472
  if (request_header_timer_ != nullptr) {
1
    request_header_timer_->disableTimer();
1
    request_header_timer_.reset();
1
  }
  // Both shouldDrainConnectionUponCompletion() and is_head_request_ affect local replies: set them
  // as early as possible.
91472
  const Protocol protocol = connection_manager_.codec_->protocol();
91472
  if (HeaderUtility::shouldCloseConnection(protocol, *request_headers_)) {
    // Only mark the connection to be closed if the request indicates so. The connection might
    // already be marked so before this step, in which case if shouldCloseConnection() returns
    // false, the stream info value shouldn't be overridden.
22
    filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
22
  }
91472
  filter_manager_.streamInfo().protocol(protocol);
  // We end the decode here to mark that the downstream stream is complete.
91472
  maybeRecordLastByteReceived(end_stream);
91472
  if (!validateHeaders()) {
5
    ENVOY_STREAM_LOG(debug, "request headers validation failed\n{}", *this, *request_headers_);
5
    return;
5
  }
  // We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
91467
  if (connection_manager_.config_->isRoutable()) {
91028
    if (connection_manager_.config_->routeConfigProvider() != nullptr) {
90846
      snapped_route_config_ = connection_manager_.config_->routeConfigProvider()->configCast();
90932
    } else if (connection_manager_.config_->scopedRouteConfigProvider() != nullptr &&
182
               connection_manager_.config_->scopeKeyBuilder().has_value()) {
182
      snapped_scoped_routes_config_ =
182
          connection_manager_.config_->scopedRouteConfigProvider()->config<Router::ScopedConfig>();
182
      snapScopedRouteConfig();
182
    }
91348
  } else {
439
    snapped_route_config_ = connection_manager_.config_->routeConfigProvider()->configCast();
439
  }
  // Drop new requests when overloaded as soon as we have decoded the headers.
91467
  const bool drop_request_due_to_overload =
91467
      (connection_manager_.accept_new_http_stream_ != nullptr &&
91467
       connection_manager_.accept_new_http_stream_->shouldShedLoad()) ||
91467
      connection_manager_.random_generator_.bernoulli(
91459
          connection_manager_.overload_stop_accepting_requests_ref_.value());
91467
  if (drop_request_due_to_overload) {
    // In this one special case, do not create the filter chain. If there is a risk of memory
    // overload it is more important to avoid unnecessary allocation than to create the filters.
49
    filter_manager_.skipFilterChainCreation();
49
    connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
49
    sendLocalReply(
49
        Http::Code::ServiceUnavailable, "envoy overloaded",
49
        [this](Http::ResponseHeaderMap& headers) {
49
          if (connection_manager_.config_->appendLocalOverload()) {
16
            headers.addReference(Http::Headers::get().EnvoyLocalOverloaded,
16
                                 Http::Headers::get().EnvoyOverloadedValues.True);
16
          }
49
        },
49
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().Overload);
49
    return;
49
  }
91418
  if (!connection_manager_.config_->proxy100Continue() && request_headers_->Expect() &&
      // The Expect field-value is case-insensitive.
      // https://tools.ietf.org/html/rfc7231#section-5.1.1
91418
      absl::EqualsIgnoreCase((request_headers_->Expect()->value().getStringView()),
40
                             Headers::get().ExpectValues._100Continue)) {
    // Note in the case Envoy is handling 100-Continue complexity, it skips the filter chain
    // and sends the 100-Continue directly to the encoder.
40
    chargeStats(continueHeader());
40
    response_encoder_->encode1xxHeaders(continueHeader());
    // Remove the Expect header so it won't be handled again upstream.
40
    request_headers_->removeExpect();
40
  }
91418
  connection_manager_.user_agent_.initializeFromHeaders(*request_headers_,
91418
                                                        connection_manager_.stats_.prefixStatName(),
91418
                                                        connection_manager_.stats_.scope_);
91418
  if (!request_headers_->Host()) {
    // Require host header. For HTTP/1.1 Host has already been translated to :authority.
1022
    sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
1022
                   StreamInfo::ResponseCodeDetails::get().MissingHost);
1022
    return;
1022
  }
  // Apply header sanity checks.
90396
  absl::optional<std::reference_wrapper<const absl::string_view>> error =
90396
      HeaderUtility::requestHeadersValid(*request_headers_);
90396
  if (error != absl::nullopt) {
6
    sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt, error.value().get());
6
    if (!response_encoder_->streamErrorOnInvalidHttpMessage()) {
6
      connection_manager_.handleCodecError(error.value().get());
6
    }
6
    return;
6
  }
  // Check for the existence of the :path header for non-CONNECT requests, or present-but-empty
  // :path header for CONNECT requests. We expect the codec to have broken the path into pieces if
  // applicable. NOTE: Currently the HTTP/1.1 codec only does this when the allow_absolute_url flag
  // is enabled on the HCM.
90390
  if ((!HeaderUtility::isConnect(*request_headers_) || request_headers_->Path()) &&
90390
      request_headers_->getPathValue().empty()) {
2
    sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
2
                   StreamInfo::ResponseCodeDetails::get().MissingPath);
2
    return;
2
  }
  // Rewrite the host of CONNECT-UDP requests.
90388
  if (HeaderUtility::isConnectUdpRequest(*request_headers_) &&
90388
      !HeaderUtility::rewriteAuthorityForConnectUdp(*request_headers_)) {
16
    sendLocalReply(Code::NotFound, "The path is incorrect for CONNECT-UDP", nullptr, absl::nullopt,
16
                   StreamInfo::ResponseCodeDetails::get().InvalidPath);
16
    return;
16
  }
  // Currently we only support relative paths at the application layer.
90372
  if (!request_headers_->getPathValue().empty() && request_headers_->getPathValue()[0] != '/') {
2
    connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
2
    sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
2
                   StreamInfo::ResponseCodeDetails::get().AbsolutePath);
2
    return;
2
  }
90370
#ifndef ENVOY_ENABLE_UHV
  // In UHV mode path normalization is done in the UHV
  // Path sanitization should happen before any path access other than the above sanity check.
90370
  const auto action =
90370
      ConnectionManagerUtility::maybeNormalizePath(*request_headers_, *connection_manager_.config_);
  // gRPC requests are rejected if Envoy is configured to redirect post-normalization. This is
  // because gRPC clients do not support redirect.
90370
  if (action == ConnectionManagerUtility::NormalizePathAction::Reject ||
90370
      (action == ConnectionManagerUtility::NormalizePathAction::Redirect &&
90328
       Grpc::Common::hasGrpcContentType(*request_headers_))) {
43
    connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
43
    sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
43
                   StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
43
    return;
90327
  } else if (action == ConnectionManagerUtility::NormalizePathAction::Redirect) {
4
    connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
4
    sendLocalReply(
4
        Code::TemporaryRedirect, "",
4
        [new_path = request_headers_->Path()->value().getStringView()](
4
            Http::ResponseHeaderMap& response_headers) -> void {
4
          response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
4
        },
4
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
4
    return;
4
  }
90323
  ASSERT(action == ConnectionManagerUtility::NormalizePathAction::Continue);
90323
#endif
90323
  auto optional_port = ConnectionManagerUtility::maybeNormalizeHost(
90323
      *request_headers_, *connection_manager_.config_, localPort());
90323
  if (optional_port.has_value() &&
90323
      requestWasConnect(request_headers_, connection_manager_.codec_->protocol())) {
11
    filter_manager_.streamInfo().filterState()->setData(
11
        Router::OriginalConnectPort::key(),
11
        std::make_unique<Router::OriginalConnectPort>(optional_port.value()),
11
        StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Request);
11
  }
90323
  if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
    // Modify the downstream remote address depending on configuration and headers.
90027
    const auto mutate_result = ConnectionManagerUtility::mutateRequestHeaders(
90027
        *request_headers_, connection_manager_.read_callbacks_->connection(),
90027
        *connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_,
90027
        filter_manager_.streamInfo());
    // IP detection failed, reject the request.
90027
    if (mutate_result.reject_request.has_value()) {
1
      const auto& reject_request_params = mutate_result.reject_request.value();
1
      connection_manager_.stats_.named_.downstream_rq_rejected_via_ip_detection_.inc();
1
      sendLocalReply(reject_request_params.response_code, reject_request_params.body, nullptr,
1
                     absl::nullopt,
1
                     StreamInfo::ResponseCodeDetails::get().OriginalIPDetectionFailed);
1
      return;
1
    }
90026
    filter_manager_.setDownstreamRemoteAddress(mutate_result.final_remote_address);
90026
  }
90322
  ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);
90322
  ASSERT(!cached_route_);
90322
  refreshCachedRoute();
90322
  if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
90026
    filter_manager_.streamInfo().setTraceReason(
90026
        ConnectionManagerUtility::mutateTracingRequestHeader(
90026
            *request_headers_, connection_manager_.runtime_, *connection_manager_.config_,
90026
            cached_route_.value().get()));
90026
  }
90322
  filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
90322
  const FilterManager::CreateChainResult create_chain_result =
90322
      filter_manager_.createDownstreamFilterChain();
90322
  if (create_chain_result.upgradeAccepted()) {
601
    connection_manager_.stats_.named_.downstream_cx_upgrades_total_.inc();
601
    connection_manager_.stats_.named_.downstream_cx_upgrades_active_.inc();
601
    state_.successful_upgrade_ = true;
601
  }
90322
  if (connection_manager_.config_->flushAccessLogOnNewRequest()) {
12
    log(AccessLog::AccessLogType::DownstreamStart);
12
  }
  // TODO if there are no filters when starting a filter iteration, the connection manager
  // should return 404. The current returns no response if there is no router filter.
90322
  if (hasCachedRoute()) {
    // Do not allow upgrades if the route does not support it.
87620
    if (create_chain_result.upgradeRejected()) {
      // While downstream servers should not send upgrade payload without the upgrade being
      // accepted, err on the side of caution and refuse to process any further requests on this
      // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
      // contains a smuggled HTTP request.
2
      filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
2
      connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
2
      sendLocalReply(Code::Forbidden, "", nullptr, absl::nullopt,
2
                     StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
2
      return;
2
    }
    // Allow non websocket requests to go through websocket enabled routes.
87620
  }
  // Check if tracing is enabled.
90320
  if (connection_manager_tracing_config_.has_value()) {
298
    traceRequest();
298
  }
90320
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
90320
  if (!connection_manager_.shouldDeferRequestProxyingToNextIoCycle()) {
49892
    filter_manager_.decodeHeaders(*request_headers_, end_stream);
87841
  } else {
40428
    state_.deferred_to_next_io_iteration_ = true;
40428
    state_.deferred_end_stream_ = end_stream;
40428
  }
  // Reset it here for both global and overridden cases.
90320
  resetIdleTimer();
90320
}
298
void ConnectionManagerImpl::ActiveStream::traceRequest() {
298
  ASSERT(connection_manager_tracing_config_.has_value());
298
  const Tracing::Decision tracing_decision =
298
      Tracing::TracerUtility::shouldTraceRequest(filter_manager_.streamInfo());
298
  if (!trace_refresh_after_route_refresh_) {
1
    ConnectionManagerImpl::chargeTracingStats(tracing_decision.reason,
1
                                              connection_manager_.config_->tracingStats());
1
  }
298
  Tracing::HttpTraceContext trace_context(*request_headers_);
298
  active_span_ = connection_manager_.tracer().startSpan(
298
      *this, trace_context, filter_manager_.streamInfo(), tracing_decision);
298
  if (!active_span_) {
246
    return;
246
  }
52
  if (hasCachedRoute()) {
52
    route_decorator_ = cached_route_.value()->decorator();
52
    route_tracing_ = cached_route_.value()->tracingConfig();
52
  }
52
  if (!operationNameFormatter(*connection_manager_tracing_config_, route_tracing_)) {
    // Only set decorator when there is no operation name formatter configured at either
    // the HCM level or the route level.
49
    setRequestDecorator(*request_headers_);
49
  }
52
}
242696
void ConnectionManagerImpl::ActiveStream::decodeData(Buffer::Instance& data, bool end_stream) {
242696
  ScopeTrackerScopeState scope(this,
242696
                               connection_manager_.read_callbacks_->connection().dispatcher());
242696
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
242696
  maybeRecordLastByteReceived(end_stream);
242696
  filter_manager_.streamInfo().addBytesReceived(data.length());
242696
  if (!state_.deferred_to_next_io_iteration_) {
242422
    filter_manager_.decodeData(data, end_stream);
242475
  } else {
274
    if (!deferred_data_) {
274
      deferred_data_ = std::make_unique<Buffer::OwnedImpl>();
274
    }
274
    deferred_data_->move(data);
274
    state_.deferred_end_stream_ = end_stream;
274
  }
242696
}
493
void ConnectionManagerImpl::ActiveStream::decodeTrailers(RequestTrailerMapPtr&& trailers) {
493
  ENVOY_STREAM_LOG(debug, "request trailers complete:\n{}", *this, *trailers);
493
  ScopeTrackerScopeState scope(this,
493
                               connection_manager_.read_callbacks_->connection().dispatcher());
493
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
493
  resetIdleTimer();
493
  ASSERT(!request_trailers_);
493
  if (!validateTrailers(*trailers)) {
4
    ENVOY_STREAM_LOG(debug, "request trailers validation failed:\n{}", *this, *trailers);
4
    return;
4
  }
489
  maybeRecordLastByteReceived(true);
489
  if (!state_.deferred_to_next_io_iteration_) {
345
    request_trailers_ = std::move(trailers);
345
    filter_manager_.decodeTrailers(*request_trailers_);
444
  } else {
    // Save trailers in a different variable since `request_trailers_` is available to the filter
    // manager via `requestTrailers()` callback and makes filter manager see trailers prematurely
    // when deferred request is processed.
144
    deferred_request_trailers_ = std::move(trailers);
144
  }
489
}
864
void ConnectionManagerImpl::ActiveStream::decodeMetadata(MetadataMapPtr&& metadata_map) {
864
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
864
  resetIdleTimer();
864
  if (!state_.deferred_to_next_io_iteration_) {
    // After going through filters, the ownership of metadata_map will be passed to terminal filter.
    // The terminal filter may encode metadata_map to the next hop immediately or store metadata_map
    // and encode later when connection pool is ready.
806
    filter_manager_.decodeMetadata(*metadata_map);
826
  } else {
58
    deferred_metadata_.push(std::move(metadata_map));
58
  }
864
}
192605
void ConnectionManagerImpl::ActiveStream::disarmRequestTimeout() {
192605
  if (request_timer_) {
172
    request_timer_->disableTimer();
172
  }
192605
}
204
void ConnectionManagerImpl::startDrainSequence() {
204
  ASSERT(drain_state_ == DrainState::NotDraining);
204
  drain_state_ = DrainState::Draining;
204
  codec_->shutdownNotice();
204
  drain_timer_ = dispatcher_->createTimer([this]() -> void { onDrainTimeout(); });
204
  drain_timer_->enableTimer(config_->drainTimeout());
204
}
406
void ConnectionManagerImpl::ActiveStream::snapScopedRouteConfig() {
  // NOTE: if a RDS subscription hasn't got a RouteConfiguration back, a Router::NullConfigImpl is
  // returned, in that case we let it pass.
406
  auto scope_key =
406
      connection_manager_.config_->scopeKeyBuilder()->computeScopeKey(*request_headers_);
406
  snapped_route_config_ = snapped_scoped_routes_config_->getRouteConfig(scope_key);
406
  if (snapped_route_config_ == nullptr) {
158
    ENVOY_STREAM_LOG(trace, "can't find SRDS scope.", *this);
    // TODO(stevenzzzz): Consider to pass an error message to router filter, so that it can
    // send back 404 with some more details.
158
    snapped_route_config_ = std::make_shared<Router::NullConfigImpl>();
158
  }
406
}
90378
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { refreshCachedRoute(nullptr); }
93452
void ConnectionManagerImpl::ActiveStream::refreshDurationTimeout() {
93452
  if (!hasCachedRoute() || !request_headers_) {
5205
    return;
5205
  }
88247
  const Router::RouteEntry* route = cached_route_.value()->routeEntry();
88247
  if (route == nullptr) {
276
    return;
276
  }
87971
  auto grpc_timeout = Grpc::Common::getGrpcTimeout(*request_headers_);
87971
  std::chrono::milliseconds timeout;
87971
  bool disable_timer = false;
87971
  if (!grpc_timeout || !route->grpcTimeoutHeaderMax()) {
    // Either there is no grpc-timeout header or special timeouts for it are not
    // configured. Use stream duration.
87957
    if (route->maxStreamDuration()) {
2
      timeout = route->maxStreamDuration().value();
2
      if (timeout == std::chrono::milliseconds(0)) {
        // Explicitly configured 0 means no timeout.
        disable_timer = true;
      }
87955
    } else {
      // Fall back to HCM config. If no HCM duration limit exists, disable
      // timers set by any prior route configuration.
87955
      const auto max_stream_duration = connection_manager_.config_->maxStreamDuration();
87955
      if (max_stream_duration.has_value() && max_stream_duration.value().count()) {
36
        timeout = max_stream_duration.value();
87919
      } else {
87919
        disable_timer = true;
87919
      }
87955
    }
87957
  } else {
    // Start with the timeout equal to the gRPC timeout header.
14
    timeout = grpc_timeout.value();
    // If there's a valid cap, apply it.
14
    if (timeout > route->grpcTimeoutHeaderMax().value() &&
14
        route->grpcTimeoutHeaderMax().value() != std::chrono::milliseconds(0)) {
5
      timeout = route->grpcTimeoutHeaderMax().value();
5
    }
    // Apply the configured offset.
14
    if (timeout != std::chrono::milliseconds(0) && route->grpcTimeoutHeaderOffset()) {
2
      const auto offset = route->grpcTimeoutHeaderOffset().value();
2
      if (offset < timeout) {
1
        timeout -= offset;
1
      } else {
1
        timeout = std::chrono::milliseconds(0);
1
      }
2
    }
14
  }
  // Disable any existing timer if configured to do so.
87971
  if (disable_timer) {
87919
    if (max_stream_duration_timer_) {
2
      max_stream_duration_timer_->disableTimer();
2
      if (route->usingNewTimeouts() && Grpc::Common::isGrpcRequestHeaders(*request_headers_)) {
        request_headers_->removeGrpcTimeout();
      }
2
    }
87919
    return;
87919
  }
  // Set the header timeout before doing used-time adjustments.
  // This may result in the upstream not getting the latest results, but also
  // avoids every request getting a custom timeout based on envoy think time.
52
  if (route->usingNewTimeouts() && Grpc::Common::isGrpcRequestHeaders(*request_headers_)) {
6
    Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(timeout), *request_headers_);
6
  }
  // See how long this stream has been alive, and adjust the timeout
  // accordingly.
52
  std::chrono::milliseconds time_used = std::chrono::duration_cast<std::chrono::milliseconds>(
52
      connection_manager_.timeSource().monotonicTime() -
52
      filter_manager_.streamInfo().startTimeMonotonic());
52
  if (timeout > time_used) {
49
    timeout -= time_used;
49
  } else {
3
    timeout = std::chrono::milliseconds(0);
3
  }
  // Finally create (if necessary) and enable the timer.
52
  if (!max_stream_duration_timer_) {
7
    max_stream_duration_timer_ = connection_manager_.dispatcher_->createTimer(
7
        [this]() -> void { onStreamMaxDurationReached(); });
7
  }
52
  max_stream_duration_timer_->enableTimer(timeout);
52
}
93436
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
  // If the cached route is blocked then any attempt to clear it or refresh it
  // will be ignored.
93436
  if (routeCacheBlocked()) {
    return;
  }
93436
  Router::VirtualHostRoute route_result;
93436
  if (request_headers_ != nullptr) {
92087
    if (connection_manager_.config_->isRoutable() &&
92087
        connection_manager_.config_->scopedRouteConfigProvider() != nullptr &&
92087
        connection_manager_.config_->scopeKeyBuilder().has_value()) {
      // NOTE: re-select scope as well in case the scope key header has been changed by a filter.
224
      snapScopedRouteConfig();
224
    }
92087
    if (snapped_route_config_ != nullptr) {
92082
      route_result = snapped_route_config_->route(cb, *request_headers_,
92082
                                                  filter_manager_.streamInfo(), stream_id_);
92082
    }
92087
  }
93436
  setVirtualHostRoute(std::move(route_result));
93436
}
93452
void ConnectionManagerImpl::ActiveStream::refreshTracing() {
93452
  if (!trace_refresh_after_route_refresh_) {
2
    return;
2
  }
93450
  if (!connection_manager_tracing_config_.has_value() || active_span_ == nullptr ||
93450
      request_headers_ == nullptr) {
93448
    return;
93448
  }
2
  ASSERT(cached_route_.has_value());
  // NOTE: if the trace reason have been encoded into the request id then the trace reason may
  // not be updated. That means we may cannot to force a traced request to be untraced by the
  // refreshing.
2
  const auto trace_reason = ConnectionManagerUtility::mutateTracingRequestHeader(
2
      *request_headers_, connection_manager_.runtime_, *connection_manager_.config_,
2
      cached_route_.value().get());
2
  filter_manager_.streamInfo().setTraceReason(trace_reason);
2
  const Tracing::Decision tracing_decision =
2
      Tracing::TracerUtility::shouldTraceRequest(filter_manager_.streamInfo());
2
  if (active_span_->useLocalDecision()) {
1
    active_span_->setSampled(tracing_decision.traced);
1
  }
2
  if (hasCachedRoute()) {
2
    route_decorator_ = cached_route_.value()->decorator();
2
    route_tracing_ = cached_route_.value()->tracingConfig();
2
  }
2
  if (!operationNameFormatter(*connection_manager_tracing_config_, route_tracing_)) {
    // Only set decorator when there is no operation name formatter configured at either
    // the HCM level or the route level.
2
    setRequestDecorator(*request_headers_);
2
  }
2
}
// TODO(chaoqin-li1123): Make on demand vhds and on demand srds works at the same time.
void ConnectionManagerImpl::ActiveStream::requestRouteConfigUpdate(
76
    Http::RouteConfigUpdatedCallbackSharedPtr route_config_updated_cb) {
76
  ENVOY_BUG(route_config_update_requester_.has_value(),
76
            "RouteConfigUpdate requested but RDS not compiled into the binary. Try linking "
76
            "//source/common/http:rds_lib");
76
  if (route_config_update_requester_.has_value()) {
76
    (*route_config_update_requester_)
76
        ->requestRouteConfigUpdate(*this, route_config_updated_cb, routeConfig(),
76
                                   *connection_manager_.dispatcher_, *request_headers_);
76
  }
76
}
76
absl::optional<Router::ConfigConstSharedPtr> ConnectionManagerImpl::ActiveStream::routeConfig() {
76
  if (connection_manager_.config_->routeConfigProvider() != nullptr) {
24
    return {connection_manager_.config_->routeConfigProvider()->configCast()};
24
  }
52
  return {};
76
}
10124
void ConnectionManagerImpl::ActiveStream::onLocalReply(Code code) {
  // The BadRequest error code indicates there has been a messaging error.
10124
  if (code == Http::Code::BadRequest && connection_manager_.codec_->protocol() < Protocol::Http2 &&
10124
      !response_encoder_->streamErrorOnInvalidHttpMessage()) {
1374
    filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
1374
  }
10124
}
129
void ConnectionManagerImpl::ActiveStream::encode1xxHeaders(ResponseHeaderMap& response_headers) {
129
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
  // Strip the T-E headers etc. Defer other header additions as well as drain-close logic to the
  // continuation headers.
129
  ConnectionManagerUtility::mutateResponseHeaders(
129
      response_headers, request_headers_.get(), *connection_manager_.config_, EMPTY_STRING,
129
      filter_manager_.streamInfo(), connection_manager_.proxy_name_,
129
      connection_manager_.clear_hop_by_hop_response_headers_);
  // Count both the 1xx and follow-up response code in stats.
129
  chargeStats(response_headers);
129
  ENVOY_STREAM_LOG(debug, "encoding 1xx continue headers via codec:\n{}", *this, response_headers);
  // Now actually encode via the codec.
129
  response_encoder_->encode1xxHeaders(response_headers);
129
}
void ConnectionManagerImpl::ActiveStream::encodeHeaders(ResponseHeaderMap& headers,
47257
                                                        bool end_stream) {
47257
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
  // Base headers.
  // We want to preserve the original date header, but we add a date header if it is absent
47257
  if (!headers.Date()) {
46900
    connection_manager_.config_->dateProvider().setDateHeader(headers);
46900
  }
  // Following setReference() is safe because serverName() is constant for the life of the
  // listener.
47257
  const auto transformation = connection_manager_.config_->serverHeaderTransformation();
47257
  if (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::OVERWRITE ||
47257
      (transformation == ConnectionManagerConfig::HttpConnectionManagerProto::APPEND_IF_ABSENT &&
47254
       headers.Server() == nullptr)) {
47254
    headers.setReferenceServer(connection_manager_.config_->serverName());
47254
  }
47257
  ConnectionManagerUtility::mutateResponseHeaders(
47257
      headers, request_headers_.get(), *connection_manager_.config_,
47257
      connection_manager_.config_->via(), filter_manager_.streamInfo(),
47257
      connection_manager_.proxy_name_, connection_manager_.clear_hop_by_hop_response_headers_);
47257
  bool drain_connection_due_to_overload = false;
47257
  if (connection_manager_.drain_state_ == DrainState::NotDraining &&
47257
      connection_manager_.random_generator_.bernoulli(
47191
          connection_manager_.overload_disable_keepalive_ref_.value())) {
6
    ENVOY_STREAM_LOG(debug, "disabling keepalive due to envoy overload", *this);
6
    drain_connection_due_to_overload = true;
6
    connection_manager_.stats_.named_.downstream_cx_overload_disable_keepalive_.inc();
6
  }
  // If we're an inbound listener, then we should drain even if the drain direction is inbound only.
  // If not, then we only drain if the drain direction is all.
47257
  Network::DrainDirection drain_scope =
47257
      connection_manager_.direction_ == envoy::config::core::v3::TrafficDirection::INBOUND
47257
          ? Network::DrainDirection::InboundOnly
47257
          : Network::DrainDirection::All;
  // See if we want to drain/close the connection. Send the go away frame prior to encoding the
  // header block. Only drain if the drain direction is not inbound only or the connection is
  // inbound.
47257
  if (connection_manager_.drain_state_ == DrainState::NotDraining &&
47257
      (connection_manager_.drain_close_.drainClose(drain_scope) ||
47191
       drain_connection_due_to_overload)) {
    // This doesn't really do anything for HTTP/1.1 other then give the connection another boost
    // of time to race with incoming requests. For HTTP/2 connections, send a GOAWAY frame to
    // prevent any new streams.
69
    connection_manager_.startDrainSequence();
69
    connection_manager_.stats_.named_.downstream_cx_drain_close_.inc();
69
    ENVOY_STREAM_LOG(debug, "drain closing connection", *this);
69
  }
47257
  if (connection_manager_.codec_->protocol() == Protocol::Http10) {
    // As HTTP/1.0 and below can not do chunked encoding, if there is no content
    // length the response will be framed by connection close.
18
    if (!headers.ContentLength()) {
10
      filter_manager_.streamInfo().setShouldDrainConnectionUponCompletion(true);
10
    }
    // If the request came with a keep-alive and no other factor resulted in a
    // connection close header, send an explicit keep-alive header.
18
    if (!filter_manager_.streamInfo().shouldDrainConnectionUponCompletion()) {
8
      headers.setConnection(Headers::get().ConnectionValues.KeepAlive);
8
    }
18
  }
47257
  if (connection_manager_.drain_state_ == DrainState::NotDraining &&
47257
      filter_manager_.streamInfo().shouldDrainConnectionUponCompletion()) {
1414
    ENVOY_STREAM_LOG(debug, "closing connection due to connection close header", *this);
    // For HTTP/2 and HTTP/3, send GOAWAY and allow current stream to complete.
    // For HTTP/1.1, go directly to Closing (Connection: close header will be added below).
1414
    if (connection_manager_.codec_->protocol() >= Protocol::Http2) {
8
      connection_manager_.startDrainSequence();
1406
    } else {
1406
      connection_manager_.drain_state_ = DrainState::Closing;
1406
    }
1414
  }
  // If we are destroying a stream before remote is complete and the connection does not support
  // multiplexing, we should disconnect since we don't want to wait around for the request to
  // finish.
47257
  if (!filter_manager_.hasLastDownstreamByteReceived()) {
2951
    if (connection_manager_.codec_->protocol() < Protocol::Http2) {
1921
      connection_manager_.drain_state_ = DrainState::Closing;
1921
    }
2951
    connection_manager_.stats_.named_.downstream_rq_response_before_rq_complete_.inc();
2951
  }
47257
  if (Utility::isUpgrade(headers) ||
47257
      HeaderUtility::isConnectResponse(request_headers_.get(), *responseHeaders())) {
341
    state_.is_tunneling_ = true;
341
  }
  // Block route cache if the response headers is received and processed. Because after this
  // point, the cached route should never be updated or refreshed.
47257
  blockRouteCache();
47257
  if (connection_manager_.drain_state_ != DrainState::NotDraining &&
47257
      connection_manager_.codec_->protocol() < Protocol::Http2) {
    // If the connection manager is draining send "Connection: Close" on HTTP/1.1 connections.
    // Do not do this for H2 (which drains via GOAWAY) or Upgrade or CONNECT (as the
    // payload is no longer HTTP/1.1)
2051
    if (!state_.is_tunneling_) {
1982
      headers.setReferenceConnection(Headers::get().ConnectionValues.Close);
1982
    }
2051
  }
47257
  if (active_span_ != nullptr) {
52
    ASSERT(connection_manager_tracing_config_.has_value());
52
    if (!operationNameFormatter(*connection_manager_tracing_config_, route_tracing_)) {
      // Only apply decorator if there is no operation name formatter configured at either
      // the HCM level or the route level.
49
      setResponseDecorator(headers);
49
    }
52
  }
47257
  chargeStats(headers);
47257
  if (state_.is_tunneling_ &&
47257
      connection_manager_.config_->flushAccessLogOnTunnelSuccessfullyEstablished()) {
5
    log(AccessLog::AccessLogType::DownstreamTunnelSuccessfullyEstablished);
5
  }
47257
  ENVOY_STREAM_LOG(debug, "encoding headers via codec (end_stream={}):\n{}", *this, end_stream,
47257
                   headers);
47257
  filter_manager_.streamInfo().downstreamTiming().onFirstDownstreamTxByteSent(
47257
      connection_manager_.time_source_);
47257
  if (header_validator_) {
9
    auto result = header_validator_->transformResponseHeaders(headers);
9
    if (!result.status.ok()) {
      // It is possible that the header map is invalid if an encoder filter makes invalid
      // modifications
      // TODO(yanavlasov): add handling for this case.
9
    } else if (result.new_headers) {
      response_encoder_->encodeHeaders(*result.new_headers, end_stream);
      return;
    }
9
  }
  // Now actually encode via the codec.
47257
  response_encoder_->encodeHeaders(headers, end_stream);
47257
}
70894
void ConnectionManagerImpl::ActiveStream::encodeData(Buffer::Instance& data, bool end_stream) {
70894
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
70894
  ENVOY_STREAM_LOG(trace, "encoding data via codec (size={} end_stream={})", *this, data.length(),
70894
                   end_stream);
70894
  filter_manager_.streamInfo().addBytesSent(data.length());
70894
  response_encoder_->encodeData(data, end_stream);
70894
}
448
void ConnectionManagerImpl::ActiveStream::encodeTrailers(ResponseTrailerMap& trailers) {
448
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
448
  ENVOY_STREAM_LOG(debug, "encoding trailers via codec:\n{}", *this, trailers);
448
  response_encoder_->encodeTrailers(trailers);
448
}
1590
void ConnectionManagerImpl::ActiveStream::encodeMetadata(MetadataMapPtr&& metadata) {
1590
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
1590
  MetadataMapVector metadata_map_vector;
1590
  metadata_map_vector.emplace_back(std::move(metadata));
1590
  ENVOY_STREAM_LOG(debug, "encoding metadata via codec:\n{}", *this, metadata_map_vector);
1590
  response_encoder_->encodeMetadata(metadata_map_vector);
1590
}
220406
void ConnectionManagerImpl::ActiveStream::onDecoderFilterBelowWriteBufferLowWatermark() {
220406
  ENVOY_STREAM_LOG(debug, "Read-enabling downstream stream due to filter callbacks.", *this);
  // If the state is destroyed, the codec's stream is already torn down. On
  // teardown the codec will unwind any remaining read disable calls.
220406
  if (!filter_manager_.destroyed()) {
220404
    response_encoder_->getStream().readDisable(false);
220404
  }
220406
  connection_manager_.stats_.named_.downstream_flow_control_resumed_reading_total_.inc();
220406
}
220408
void ConnectionManagerImpl::ActiveStream::onDecoderFilterAboveWriteBufferHighWatermark() {
220408
  ENVOY_STREAM_LOG(debug, "Read-disabling downstream stream due to filter callbacks.", *this);
220408
  response_encoder_->getStream().readDisable(true);
220408
  connection_manager_.stats_.named_.downstream_flow_control_paused_reading_total_.inc();
220408
}
void ConnectionManagerImpl::ActiveStream::onResetStream(StreamResetReason reset_reason,
49630
                                                        absl::string_view) {
  // NOTE: This function gets called in all of the following cases:
  //       1) We TX an app level reset
  //       2) The codec TX a codec level reset
  //       3) The codec RX a reset
  //       4) The overload manager reset the stream
  //       If we need to differentiate we need to do it inside the codec. Can start with this.
49630
  const absl::string_view encoder_details = response_encoder_->getStream().responseDetails();
49630
  ENVOY_STREAM_LOG(debug, "stream reset: reset reason: {}, response details: {}", *this,
49630
                   Http::Utility::resetReasonToString(reset_reason),
49630
                   encoder_details.empty() ? absl::string_view{"-"} : encoder_details);
49630
  connection_manager_.stats_.named_.downstream_rq_rx_reset_.inc();
49630
  state_.on_reset_stream_called_ = true;
  // If the codec sets its responseDetails() for a reason other than peer reset, set a
  // DownstreamProtocolError. Either way, propagate details.
49630
  if (reset_reason == StreamResetReason::ProtocolError ||
49630
      (!encoder_details.empty() && reset_reason == StreamResetReason::LocalReset)) {
2282
    filter_manager_.streamInfo().setResponseFlag(
2282
        StreamInfo::CoreResponseFlag::DownstreamProtocolError);
2282
  }
49630
  if (!encoder_details.empty()) {
3629
    if (reset_reason == StreamResetReason::ConnectError ||
3629
        reset_reason == StreamResetReason::RemoteReset ||
3629
        reset_reason == StreamResetReason::RemoteRefusedStreamReset) {
1055
      filter_manager_.streamInfo().setResponseFlag(
1055
          StreamInfo::CoreResponseFlag::DownstreamRemoteReset);
1055
    }
3629
    filter_manager_.streamInfo().setResponseCodeDetails(encoder_details);
3629
  }
  // Check if we're in the overload manager reset case.
  // encoder_details should be empty in this case as we don't have a codec error.
49630
  if (encoder_details.empty() && reset_reason == StreamResetReason::OverloadManager) {
12
    filter_manager_.streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::OverloadManager);
12
    filter_manager_.streamInfo().setResponseCodeDetails(
12
        StreamInfo::ResponseCodeDetails::get().Overload);
12
  }
49630
  filter_manager_.onDownstreamReset();
49630
  connection_manager_.doDeferredStreamDestroy(*this);
49630
}
19341
void ConnectionManagerImpl::ActiveStream::onAboveWriteBufferHighWatermark() {
19341
  ENVOY_STREAM_LOG(debug, "Disabling upstream stream due to downstream stream watermark.", *this);
19341
  filter_manager_.callHighWatermarkCallbacks();
19341
}
19189
void ConnectionManagerImpl::ActiveStream::onBelowWriteBufferLowWatermark() {
19189
  ENVOY_STREAM_LOG(debug, "Enabling upstream stream due to downstream stream watermark.", *this);
19189
  filter_manager_.callLowWatermarkCallbacks();
19189
}
49927
bool ConnectionManagerImpl::ActiveStream::shouldSkipDeferredCloseDelay() const {
  // If HTTP/1.0 has no content length, it is framed by close and won't consider
  // the request complete until the FIN is read. Don't delay close in this case.
49927
  const bool http_10_sans_cl = (connection_manager_.codec_->protocol() == Protocol::Http10) &&
49927
                               (!response_headers_ || !response_headers_->ContentLength());
  // We also don't delay-close in the case of HTTP/1.1 where the request is
  // fully read, as there's no race condition to avoid.
49927
  const bool connection_close = filter_manager_.streamInfo().shouldDrainConnectionUponCompletion();
49927
  const bool request_complete = filter_manager_.hasLastDownstreamByteReceived();
  // Don't do delay close for HTTP/1.0 or if the request is complete.
49927
  return connection_close && (request_complete || http_10_sans_cl);
49927
}
46122
void ConnectionManagerImpl::ActiveStream::onCodecEncodeComplete() {
46122
  ASSERT(!state_.codec_encode_complete_);
46122
  ENVOY_STREAM_LOG(debug, "Codec completed encoding stream.", *this);
46122
  state_.codec_encode_complete_ = true;
  // Update timing
46122
  filter_manager_.streamInfo().downstreamTiming().onLastDownstreamTxByteSent(
46122
      connection_manager_.time_source_);
46122
  request_response_timespan_->complete();
  // Only reap stream once.
46122
  if (state_.is_zombie_stream_) {
2801
    const bool skip_delay = shouldSkipDeferredCloseDelay();
2801
    connection_manager_.doDeferredStreamDestroy(*this);
    // After destroying a zombie stream, check if the connection should be
    // closed. doEndStream() call that created the zombie may have set
    // drain_state_ to Closing, but checkForDeferredClose() couldn't close the
    // connection at that time because streams_ wasn't empty yet.
2801
    if (connection_manager_.close_connection_on_zombie_stream_complete_) {
2800
      connection_manager_.checkForDeferredClose(skip_delay);
2800
    }
2801
  }
46122
}
446
void ConnectionManagerImpl::ActiveStream::onCodecLowLevelReset() {
446
  ASSERT(!state_.codec_encode_complete_);
446
  state_.on_reset_stream_called_ = true;
446
  ENVOY_STREAM_LOG(debug, "Codec low level reset", *this);
  // TODO(kbaichoo): Update streamInfo to account for the reset.
  // Only reap stream once.
446
  if (state_.is_zombie_stream_) {
180
    const bool skip_delay = shouldSkipDeferredCloseDelay();
180
    connection_manager_.doDeferredStreamDestroy(*this);
    // After destroying a zombie stream, check if the connection should be
    // closed. doEndStream() call that created the zombie may have set
    // drain_state_ to Closing, but checkForDeferredClose() couldn't close the
    // connection at that time because streams_ wasn't empty yet.
180
    if (connection_manager_.close_connection_on_zombie_stream_complete_) {
180
      connection_manager_.checkForDeferredClose(skip_delay);
180
    }
180
  }
446
}
107
Tracing::OperationName ConnectionManagerImpl::ActiveStream::operationName() const {
107
  ASSERT(connection_manager_tracing_config_.has_value());
107
  return connection_manager_tracing_config_->operation_name_;
107
}
void ConnectionManagerImpl::ActiveStream::modifySpan(Tracing::Span& span,
69
                                                     bool upstream_span) const {
69
  ASSERT(connection_manager_tracing_config_.has_value());
69
  const Tracing::HttpTraceContext trace_context(*request_headers_);
69
  const Formatter::Context formatter_context{
69
      request_headers_.get(), response_headers_.get(), response_trailers_.get(), {}, {},
69
      active_span_.get()};
69
  const Tracing::CustomTagContext ctx{trace_context, filter_manager_.streamInfo(),
69
                                      formatter_context};
  // Cache the optional custom tags from the route first.
69
  OptRef<const Tracing::CustomTagMap> route_custom_tags;
69
  if (route_tracing_ != nullptr) {
7
    route_custom_tags.emplace(route_tracing_->getCustomTags());
12
    for (const auto& tag : *route_custom_tags) {
8
      tag.second->applySpan(span, ctx);
8
    }
7
  }
77
  for (const auto& tag : connection_manager_tracing_config_->custom_tags_) {
24
    if (!route_custom_tags.has_value() || !route_custom_tags->contains(tag.first)) {
      // If the tag is defined in both the connection manager and the route,
      // use the route's tag.
20
      tag.second->applySpan(span, ctx);
20
    }
24
  }
  // For same stream, there is only one downstream span. It's the active span.
  // So we can determine whether the span is downstream span by comparing the
  // span pointer.
69
  const Formatter::Formatter* operation =
69
      upstream_span
69
          ? upstreamOperationNameFormatter(*connection_manager_tracing_config_, route_tracing_)
69
          : operationNameFormatter(*connection_manager_tracing_config_, route_tracing_);
69
  if (operation != nullptr) {
6
    const auto op = operation->format(formatter_context, filter_manager_.streamInfo());
6
    if (!op.empty()) {
6
      span.setOperation(op);
6
    }
6
  }
69
}
66
bool ConnectionManagerImpl::ActiveStream::verbose() const {
66
  ASSERT(connection_manager_tracing_config_.has_value());
66
  return connection_manager_tracing_config_->verbose_;
66
}
52
uint32_t ConnectionManagerImpl::ActiveStream::maxPathTagLength() const {
52
  ASSERT(connection_manager_tracing_config_.has_value());
52
  return connection_manager_tracing_config_->max_path_tag_length_;
52
}
68
bool ConnectionManagerImpl::ActiveStream::spawnUpstreamSpan() const {
68
  ASSERT(connection_manager_tracing_config_.has_value());
68
  return connection_manager_tracing_config_->spawn_upstream_span_;
68
}
35
bool ConnectionManagerImpl::ActiveStream::noContextPropagation() const {
35
  ASSERT(connection_manager_tracing_config_.has_value());
35
  return connection_manager_tracing_config_->no_context_propagation_;
35
}
652
const Router::RouteEntry::UpgradeMap* ConnectionManagerImpl::ActiveStream::upgradeMap() {
  // We must check if the 'cached_route_' optional is populated since this function can be called
  // early via sendLocalReply(), before the cached route is populated.
652
  if (hasCachedRoute() && cached_route_.value()->routeEntry()) {
597
    return &cached_route_.value()->routeEntry()->upgradeMap();
597
  }
55
  return nullptr;
652
}
131201
Tracing::Span& ConnectionManagerImpl::ActiveStream::activeSpan() {
131201
  if (active_span_) {
128
    return *active_span_;
131172
  } else {
131073
    return Tracing::NullSpan::instance();
131073
  }
131201
}
43676
OptRef<const Tracing::Config> ConnectionManagerImpl::ActiveStream::tracingConfig() const {
43676
  if (connection_manager_tracing_config_.has_value()) {
52
    return makeOptRef<const Tracing::Config>(*this);
52
  }
43624
  return {};
43676
}
164795
const ScopeTrackedObject& ConnectionManagerImpl::ActiveStream::scope() { return *this; }
671
Upstream::ClusterInfoConstSharedPtr ConnectionManagerImpl::ActiveStream::clusterInfo() {
  // NOTE: Refreshing route caches clusterInfo as well.
671
  if (!cached_route_.has_value()) {
16
    refreshCachedRoute();
16
  }
671
  return cached_cluster_info_.value();
671
}
Router::RouteConstSharedPtr
151243
ConnectionManagerImpl::ActiveStream::route(const Router::RouteCallback& cb) {
151243
  if (cached_route_.has_value()) {
148185
    return cached_route_.value();
148185
  }
3058
  refreshCachedRoute(cb);
3058
  return cached_route_.value();
151243
}
17
void ConnectionManagerImpl::ActiveStream::setRoute(Router::RouteConstSharedPtr route) {
17
  Router::VirtualHostRoute vhost_route;
17
  if (route != nullptr) {
15
    vhost_route.vhost = route->virtualHost();
15
    vhost_route.route = std::move(route);
15
  }
17
  setVirtualHostRoute(std::move(vhost_route));
17
}
/**
 * Sets the cached route to the RouteConstSharedPtr argument passed in. Handles setting the
 * cached_route_/cached_cluster_info_ ActiveStream attributes, the FilterManager streamInfo, tracing
 * tags, and timeouts.
 *
 * Declared as a StreamFilterCallbacks member function for filters to call directly, but also
 * functions as a helper to refreshCachedRoute(const Router::RouteCallback& cb).
 */
void ConnectionManagerImpl::ActiveStream::setVirtualHostRoute(
93453
    Router::VirtualHostRoute vhost_route) {
  // If the cached route is blocked then any attempt to clear it or refresh it
  // will be ignored.
93453
  if (routeCacheBlocked()) {
1
    return;
1
  }
93452
  Router::RouteConstSharedPtr route = std::move(vhost_route.route);
  // Update the cached route.
93452
  setCachedRoute({route});
  // Update the cached cluster info based on the new route.
93452
  if (nullptr == route || nullptr == route->routeEntry()) {
5481
    cached_cluster_info_ = nullptr;
91106
  } else {
87971
    auto* cluster = connection_manager_.cluster_manager_.getThreadLocalCluster(
87971
        route->routeEntry()->clusterName());
87971
    cached_cluster_info_ = (nullptr == cluster) ? nullptr : cluster->info();
87971
  }
  // Update route, vhost and cluster info in the filter manager's stream info.
  // Now can move route here safely.
93452
  filter_manager_.streamInfo().route_ = std::move(route);
93452
  filter_manager_.streamInfo().vhost_ = std::move(vhost_route.vhost);
93452
  filter_manager_.streamInfo().setUpstreamClusterInfo(cached_cluster_info_.value());
93452
  refreshTracing();
93452
  refreshDurationTimeout();
93452
  refreshIdleAndFlushTimeouts();
93452
  refreshBufferLimit();
93452
}
93452
void ConnectionManagerImpl::ActiveStream::refreshIdleAndFlushTimeouts() {
93452
  if (!hasCachedRoute()) {
5205
    return;
5205
  }
88247
  const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
88247
  if (route_entry == nullptr) {
276
    return;
276
  }
87971
  if (route_entry->idleTimeout().has_value()) {
16193
    idle_timeout_ms_ = route_entry->idleTimeout().value();
16193
    if (idle_timeout_ms_.count()) {
      // If we have a route-level idle timeout but no global stream idle timeout, create a timer.
16192
      if (stream_idle_timer_ == nullptr) {
5
        stream_idle_timer_ = connection_manager_.dispatcher_->createScaledTimer(
5
            Event::ScaledTimerType::HttpDownstreamIdleStreamTimeout,
5
            [this]() -> void { onIdleTimeout(); });
5
      }
16192
    } else if (stream_idle_timer_ != nullptr) {
      // If we had a global stream idle timeout but the route-level idle timeout is set to zero
      // (to override), we disable the idle timer.
1
      stream_idle_timer_->disableTimer();
1
      stream_idle_timer_ = nullptr;
1
    }
16193
  }
87971
  if (route_entry->flushTimeout().has_value()) {
4
    response_encoder_->getStream().setFlushTimeout(route_entry->flushTimeout().value());
87967
  } else if (!has_explicit_global_flush_timeout_ && route_entry->idleTimeout().has_value()) {
    // If there is no route-level flush timeout, and the global flush timeout was also inherited
    // from the idle timeout, also inherit the route-level idle timeout. This is for backwards
    // compatibility.
8
    response_encoder_->getStream().setFlushTimeout(idle_timeout_ms_);
8
  }
87971
}
21
void ConnectionManagerImpl::ActiveStream::refreshAccessLogFlushTimer() {
21
  if (connection_manager_.config_->accessLogFlushInterval().has_value()) {
21
    access_log_flush_timer_->enableTimer(
21
        connection_manager_.config_->accessLogFlushInterval().value(), this);
21
  }
21
}
93452
void ConnectionManagerImpl::ActiveStream::refreshBufferLimit() {
93452
  if (!hasCachedRoute()) {
5205
    return;
5205
  }
88247
  const Router::RouteEntry* route_entry = cached_route_.value()->routeEntry();
88247
  if (route_entry == nullptr) {
276
    return;
276
  }
87971
  const uint64_t buffer_limit = route_entry->requestBodyBufferLimit();
87971
  if (buffer_limit == std::numeric_limits<uint64_t>::max()) {
    // Max uint64_t means no valid limit configured.
87920
    return;
87920
  }
  // Only increase the buffer limit automatically. This is to ensure same
  // behavior as previous logic in router filter.
51
  if (buffer_limit > filter_manager_.bufferLimit()) {
5
    ENVOY_STREAM_LOG(debug, "Setting new filter manager buffer limit: {}", *this, buffer_limit);
5
    filter_manager_.setBufferLimit(buffer_limit);
5
  }
51
}
832
void ConnectionManagerImpl::ActiveStream::clearRouteCache() {
  // If the cached route is blocked then any attempt to clear it or refresh it
  // will be ignored.
832
  if (routeCacheBlocked()) {
1
    return;
1
  }
831
  setCachedRoute({});
831
  cached_cluster_info_ = absl::optional<Upstream::ClusterInfoConstSharedPtr>();
831
}
4
void ConnectionManagerImpl::ActiveStream::refreshRouteCluster() {
  // If there is no cached route, or route cache is frozen, or the request headers are not
  // available, then do not refresh the route cluster.
4
  if (!hasCachedRoute() || routeCacheBlocked() || request_headers_ == nullptr) {
2
    return;
2
  }
2
  if (const auto* entry = (*cached_route_)->routeEntry(); entry != nullptr) {
    // Refresh the cluster if possible.
2
    entry->refreshRouteCluster(*request_headers_, filter_manager_.streamInfo());
    // Refresh the cached cluster info is necessary.
2
    if (!cached_cluster_info_.has_value() || cached_cluster_info_.value() == nullptr ||
2
        (*cached_cluster_info_)->name() != entry->clusterName()) {
2
      auto* cluster =
2
          connection_manager_.cluster_manager_.getThreadLocalCluster(entry->clusterName());
2
      cached_cluster_info_ = (nullptr == cluster) ? nullptr : cluster->info();
2
      filter_manager_.streamInfo().setUpstreamClusterInfo(cached_cluster_info_.value());
2
    }
2
  }
2
}
void ConnectionManagerImpl::ActiveStream::setCachedRoute(
94283
    absl::optional<Router::RouteConstSharedPtr>&& route) {
94283
  if (hasCachedRoute()) {
    // The configuration of the route may be referenced by some filters.
    // Cache the route to avoid it being destroyed before the stream is destroyed.
587
    cleared_cached_routes_.emplace_back(std::move(cached_route_.value()));
587
  }
94283
  cached_route_ = std::move(route);
94283
}
47257
void ConnectionManagerImpl::ActiveStream::blockRouteCache() {
47257
  route_cache_blocked_ = true;
  // Clear the snapped route configuration because it is unnecessary to keep it.
47257
  snapped_route_config_.reset();
47257
  snapped_scoped_routes_config_.reset();
47257
}
85
void ConnectionManagerImpl::ActiveStream::onRequestDataTooLarge() {
85
  connection_manager_.stats_.named_.downstream_rq_too_large_.inc();
85
}
void ConnectionManagerImpl::ActiveStream::recreateStream(
296
    StreamInfo::FilterStateSharedPtr filter_state) {
296
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
296
  ResponseEncoder* response_encoder = response_encoder_;
296
  response_encoder_ = nullptr;
296
  Buffer::InstancePtr request_data = std::make_unique<Buffer::OwnedImpl>();
296
  const auto& buffered_request_data = filter_manager_.bufferedRequestData();
296
  const bool proxy_body = buffered_request_data != nullptr && buffered_request_data->length() > 0;
296
  if (proxy_body) {
30
    request_data->move(*buffered_request_data);
30
  }
296
  response_encoder->getStream().removeCallbacks(*this);
  // This functionally deletes the stream (via deferred delete) so do not
  // reference anything beyond this point.
  // Make sure to not check for deferred close as we'll be immediately creating a new stream.
296
  state_.is_internally_destroyed_ = true;
296
  connection_manager_.doEndStream(*this, /*check_for_deferred_close*/ false);
296
  RequestDecoder& new_stream = connection_manager_.newStream(*response_encoder, true);
  // Set the new RequestDecoder on the ResponseEncoder. Even though all of the decoder callbacks
  // have already been called at this point, the encoder still needs the new decoder for deferred
  // logging in some cases.
  // This doesn't currently work for HTTP/1 as the H/1 ResponseEncoder doesn't hold the active
  // stream's pointer to the RequestDecoder.
296
  response_encoder->setRequestDecoder(new_stream);
  // We don't need to copy over the old parent FilterState from the old StreamInfo if it did not
  // store any objects with a LifeSpan at or above DownstreamRequest. This is to avoid unnecessary
  // heap allocation.
  // TODO(snowp): In the case where connection level filter state has been set on the connection
  // FilterState that we inherit, we'll end up copying this every time even though we could get
  // away with just resetting it to the HCM filter_state_.
296
  if (filter_state->hasDataAtOrAboveLifeSpan(StreamInfo::FilterState::LifeSpan::Request)) {
296
    (*connection_manager_.streams_.begin())->filter_manager_.streamInfo().filter_state_ =
296
        std::make_shared<StreamInfo::FilterStateImpl>(
296
            filter_state->parent(), StreamInfo::FilterState::LifeSpan::FilterChain);
296
  }
  // Make sure that relevant information makes it from the original stream info
  // to the new one. Generally this should consist of all downstream related
  // data, and not include upstream related data.
296
  (*connection_manager_.streams_.begin())
296
      ->filter_manager_.streamInfo()
296
      .setFromForRecreateStream(filter_manager_.streamInfo());
296
  new_stream.decodeHeaders(std::move(request_headers_), !proxy_body);
296
  if (proxy_body) {
    // This functionality is currently only used for internal redirects, which the router only
    // allows if the full request has been read (end_stream = true) so we don't need to handle the
    // case of upstream sending an early response mid-request.
30
    new_stream.decodeData(*request_data, true);
30
  }
296
}
1
Http1StreamEncoderOptionsOptRef ConnectionManagerImpl::ActiveStream::http1StreamEncoderOptions() {
1
  return response_encoder_->http1StreamEncoderOptions();
1
}
76
void ConnectionManagerImpl::ActiveStream::onResponseDataTooLarge() {
76
  connection_manager_.stats_.named_.rs_too_large_.inc();
76
}
312
void ConnectionManagerImpl::ActiveStream::resetStream(Http::StreamResetReason, absl::string_view) {
312
  connection_manager_.stats_.named_.downstream_rq_tx_reset_.inc();
312
  connection_manager_.doEndStream(*this);
312
}
5681
bool ConnectionManagerImpl::ActiveStream::onDeferredRequestProcessing() {
  // TODO(yanavlasov): Merge this with the filter manager continueIteration() method
5681
  if (!state_.deferred_to_next_io_iteration_) {
5327
    return false;
5327
  }
354
  ENVOY_EXECUTION_SCOPE(trackedStream(), active_span_.get());
354
  state_.deferred_to_next_io_iteration_ = false;
354
  bool end_stream = state_.deferred_end_stream_ && deferred_data_ == nullptr &&
354
                    deferred_request_trailers_ == nullptr && deferred_metadata_.empty();
354
  filter_manager_.decodeHeaders(*request_headers_, end_stream);
354
  if (end_stream) {
60
    return true;
60
  }
  // Send metadata before data, as data may have an associated end_stream.
352
  while (!deferred_metadata_.empty()) {
58
    MetadataMapPtr& metadata = deferred_metadata_.front();
58
    filter_manager_.decodeMetadata(*metadata);
58
    deferred_metadata_.pop();
58
  }
  // Filter manager will return early from decodeData and decodeTrailers if
  // request has completed.
294
  if (deferred_data_ != nullptr) {
274
    end_stream = state_.deferred_end_stream_ && deferred_request_trailers_ == nullptr;
274
    filter_manager_.decodeData(*deferred_data_, end_stream);
274
  }
294
  if (deferred_request_trailers_ != nullptr) {
144
    request_trailers_ = std::move(deferred_request_trailers_);
144
    filter_manager_.decodeTrailers(*request_trailers_);
144
  }
294
  return true;
354
}
90674
bool ConnectionManagerImpl::shouldDeferRequestProxyingToNextIoCycle() {
  // Do not defer this stream if stream deferral is disabled
90674
  if (deferred_request_processing_callback_ == nullptr) {
49856
    return false;
49856
  }
  // Defer this stream if there are already deferred streams, so they are not
  // processed out of order
40818
  if (deferred_request_processing_callback_->enabled()) {
40406
    return true;
40406
  }
412
  ++requests_during_dispatch_count_;
412
  bool defer = requests_during_dispatch_count_ > max_requests_during_dispatch_;
412
  if (defer) {
328
    deferred_request_processing_callback_->scheduleCallbackNextIteration();
328
  }
412
  return defer;
40818
}
324
void ConnectionManagerImpl::onDeferredRequestProcessing() {
324
  if (streams_.empty()) {
2
    return;
2
  }
322
  requests_during_dispatch_count_ = 1; // 1 stream is always let through
  // Streams are inserted at the head of the list. As such process deferred
  // streams in the reverse order.
322
  auto reverse_iter = std::prev(streams_.end());
322
  bool at_first_element = false;
5681
  do {
5681
    at_first_element = reverse_iter == streams_.begin();
    // Move the iterator to the previous item in case the `onDeferredRequestProcessing` call removes
    // the stream from the list.
5681
    auto previous_element = std::prev(reverse_iter);
5681
    bool was_deferred = (*reverse_iter)->onDeferredRequestProcessing();
5681
    if (was_deferred && shouldDeferRequestProxyingToNextIoCycle()) {
306
      break;
306
    }
5375
    reverse_iter = previous_element;
    // TODO(yanavlasov): see if `rend` can be used.
5375
  } while (!at_first_element);
322
}
} // namespace Http
} // namespace Envoy