1
#include "source/common/router/router.h"
2

            
3
#include <algorithm>
4
#include <chrono>
5
#include <cstdint>
6
#include <functional>
7
#include <memory>
8
#include <string>
9

            
10
#include "envoy/event/dispatcher.h"
11
#include "envoy/event/timer.h"
12
#include "envoy/grpc/status.h"
13
#include "envoy/http/conn_pool.h"
14
#include "envoy/runtime/runtime.h"
15
#include "envoy/upstream/cluster_manager.h"
16
#include "envoy/upstream/health_check_host_monitor.h"
17
#include "envoy/upstream/upstream.h"
18

            
19
#include "source/common/access_log/access_log_impl.h"
20
#include "source/common/common/assert.h"
21
#include "source/common/common/cleanup.h"
22
#include "source/common/common/enum_to_int.h"
23
#include "source/common/common/utility.h"
24
#include "source/common/config/utility.h"
25
#include "source/common/grpc/common.h"
26
#include "source/common/http/codes.h"
27
#include "source/common/http/header_map_impl.h"
28
#include "source/common/http/headers.h"
29
#include "source/common/http/message_impl.h"
30
#include "source/common/http/utility.h"
31
#include "source/common/network/transport_socket_options_impl.h"
32
#include "source/common/network/upstream_server_name.h"
33
#include "source/common/network/upstream_socket_options_filter_state.h"
34
#include "source/common/network/upstream_subject_alt_names.h"
35
#include "source/common/orca/orca_load_metrics.h"
36
#include "source/common/orca/orca_parser.h"
37
#include "source/common/router/debug_config.h"
38
#include "source/common/router/retry_state_impl.h"
39
#include "source/common/runtime/runtime_features.h"
40
#include "source/common/stream_info/uint32_accessor_impl.h"
41

            
42
namespace Envoy {
43
namespace Router {
44
namespace {
45
constexpr absl::string_view NumInternalRedirectsFilterStateName = "num_internal_redirects";
46

            
47
185704
uint32_t getLength(const Buffer::Instance* instance) { return instance ? instance->length() : 0; }
48

            
49
bool schemeIsHttp(const Http::RequestHeaderMap& downstream_headers,
50
188
                  OptRef<const Network::Connection> connection) {
51
188
  if (Http::Utility::schemeIsHttp(downstream_headers.getSchemeValue())) {
52
186
    return true;
53
186
  }
54
2
  if (connection.has_value() && !connection->ssl()) {
55
    return true;
56
  }
57
2
  return false;
58
2
}
59

            
60
constexpr uint64_t TimeoutPrecisionFactor = 100;
61

            
62
} // namespace
63

            
64
absl::StatusOr<std::unique_ptr<FilterConfig>>
65
FilterConfig::create(Stats::StatName stat_prefix, Server::Configuration::FactoryContext& context,
66
                     ShadowWriterPtr&& shadow_writer,
67
9855
                     const envoy::extensions::filters::http::router::v3::Router& config) {
68
9855
  absl::Status creation_status = absl::OkStatus();
69
9855
  auto ret = std::unique_ptr<FilterConfig>(
70
9855
      new FilterConfig(stat_prefix, context, std::move(shadow_writer), config, creation_status));
71
9855
  RETURN_IF_NOT_OK(creation_status);
72
9855
  return ret;
73
9855
}
74

            
75
FilterConfig::FilterConfig(Stats::StatName stat_prefix,
76
                           Server::Configuration::FactoryContext& context,
77
                           ShadowWriterPtr&& shadow_writer,
78
                           const envoy::extensions::filters::http::router::v3::Router& config,
79
                           absl::Status& creation_status)
80
9855
    : FilterConfig(
81
9855
          context.serverFactoryContext(), stat_prefix, context.scope(),
82
9855
          context.serverFactoryContext().clusterManager(), context.serverFactoryContext().runtime(),
83
9855
          context.serverFactoryContext().api().randomGenerator(), std::move(shadow_writer),
84
9855
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, dynamic_stats, true), config.start_child_span(),
85
9855
          config.suppress_envoy_headers(), config.respect_expected_rq_timeout(),
86
9855
          config.suppress_grpc_request_failure_code_stats(),
87
9855
          config.has_upstream_log_options()
88
9855
              ? config.upstream_log_options().flush_upstream_log_on_upstream_stream()
89
9855
              : false,
90
9855
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, reject_connect_request_early_data, false),
91
9855
          config.strict_check_headers(), context.serverFactoryContext().api().timeSource(),
92
9855
          context.serverFactoryContext().httpContext(),
93
9855
          context.serverFactoryContext().routerContext()) {
94
9855
  for (const auto& upstream_log : config.upstream_log()) {
95
19
    upstream_logs_.push_back(AccessLog::AccessLogFactory::fromProto(upstream_log, context));
96
19
  }
97

            
98
9855
  if (config.has_upstream_log_options() &&
99
9855
      config.upstream_log_options().has_upstream_log_flush_interval()) {
100
2
    upstream_log_flush_interval_ = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
101
2
        config.upstream_log_options().upstream_log_flush_interval()));
102
2
  }
103

            
104
9855
  if (!config.upstream_http_filters().empty()) {
105
    // TODO(wbpcode): To validate the terminal filter is upstream codec filter by the proto.
106
35
    Server::Configuration::ServerFactoryContext& server_factory_ctx =
107
35
        context.serverFactoryContext();
108
35
    std::shared_ptr<Http::UpstreamFilterConfigProviderManager> filter_config_provider_manager =
109
35
        Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
110
35
            server_factory_ctx);
111
35
    std::string prefix = context.scope().symbolTable().toString(context.scope().prefix());
112
35
    upstream_ctx_ = std::make_unique<Upstream::UpstreamFactoryContextImpl>(
113
35
        server_factory_ctx, context.initManager(), context.scope());
114
35
    Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
115
35
                            Server::Configuration::UpstreamHttpFilterConfigFactory>
116
35
        helper(*filter_config_provider_manager, server_factory_ctx,
117
35
               context.serverFactoryContext().clusterManager(), *upstream_ctx_, prefix);
118
35
    SET_AND_RETURN_IF_NOT_OK(helper.processFilters(config.upstream_http_filters(),
119
35
                                                   "router upstream http", "router upstream http",
120
35
                                                   upstream_http_filter_factories_),
121
35
                             creation_status);
122
35
  }
123
9855
}
124

            
125
// Express percentage as [0, TimeoutPrecisionFactor] because stats do not accept floating point
126
// values, and getting multiple significant figures on the histogram would be nice.
127
uint64_t FilterUtility::percentageOfTimeout(const std::chrono::milliseconds response_time,
128
595
                                            const std::chrono::milliseconds timeout) {
129
  // Timeouts of 0 are considered infinite. Any portion of an infinite timeout used is still
130
  // none of it.
131
595
  if (timeout.count() == 0) {
132
440
    return 0;
133
440
  }
134

            
135
155
  return static_cast<uint64_t>(response_time.count() * TimeoutPrecisionFactor / timeout.count());
136
595
}
137

            
138
void FilterUtility::setUpstreamScheme(Http::RequestHeaderMap& headers, bool downstream_ssl,
139
48054
                                      bool upstream_ssl, bool use_upstream) {
140
48054
  if (use_upstream) {
141
1064
    if (upstream_ssl) {
142
4
      headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https);
143
1060
    } else {
144
1060
      headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
145
1060
    }
146
1064
    return;
147
1064
  }
148

            
149
46990
  if (Http::Utility::schemeIsValid(headers.getSchemeValue())) {
150
44369
    return;
151
44369
  }
152
  // After all the changes in https://github.com/envoyproxy/envoy/issues/14587
153
  // this path should only occur if a buggy filter has removed the :scheme
154
  // header. In that case best-effort set from X-Forwarded-Proto.
155
2621
  absl::string_view xfp = headers.getForwardedProtoValue();
156
2621
  if (Http::Utility::schemeIsValid(xfp)) {
157
4
    headers.setScheme(xfp);
158
4
    return;
159
4
  }
160

            
161
2617
  if (downstream_ssl) {
162
2
    headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https);
163
2616
  } else {
164
2615
    headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
165
2615
  }
166
2617
}
167

            
168
bool FilterUtility::shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime,
169
103
                                 uint64_t stable_random) {
170

            
171
  // The policy's default value is set correctly regardless of whether there is a runtime key
172
  // or not, thus this call is sufficient for all cases (100% if no runtime set, otherwise
173
  // using the default value within the runtime fractional percent setting).
174
103
  return runtime.snapshot().featureEnabled(policy.runtimeKey(), policy.defaultValue(),
175
103
                                           stable_random);
176
103
}
177

            
178
TimeoutData FilterUtility::finalTimeout(const RouteEntry& route,
179
                                        Http::RequestHeaderMap& request_headers,
180
                                        bool insert_envoy_expected_request_timeout_ms,
181
                                        bool grpc_request, bool per_try_timeout_hedging_enabled,
182
47550
                                        bool respect_expected_rq_timeout) {
183
  // See if there is a user supplied timeout in a request header. If there is we take that.
184
  // Otherwise if the request is gRPC and a maximum gRPC timeout is configured we use the timeout
185
  // in the gRPC headers (or infinity when gRPC headers have no timeout), but cap that timeout to
186
  // the configured maximum gRPC timeout (which may also be infinity, represented by a 0 value),
187
  // or the default from the route config otherwise.
188
47550
  TimeoutData timeout;
189
47550
  if (!route.usingNewTimeouts()) {
190
47544
    if (grpc_request && route.maxGrpcTimeout()) {
191
14
      const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value();
192
14
      auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers);
193
14
      std::chrono::milliseconds grpc_timeout =
194
14
          header_timeout ? header_timeout.value() : std::chrono::milliseconds(0);
195
14
      if (route.grpcTimeoutOffset()) {
196
        // We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as
197
        // setting it to 0 means infinity and a negative timeout makes no sense.
198
2
        const auto offset = *route.grpcTimeoutOffset();
199
2
        if (offset < grpc_timeout) {
200
1
          grpc_timeout -= offset;
201
1
        }
202
2
      }
203

            
204
      // Cap gRPC timeout to the configured maximum considering that 0 means infinity.
205
14
      if (max_grpc_timeout != std::chrono::milliseconds(0) &&
206
14
          (grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) {
207
4
        grpc_timeout = max_grpc_timeout;
208
4
      }
209
14
      timeout.global_timeout_ = grpc_timeout;
210
47530
    } else {
211
47530
      timeout.global_timeout_ = route.timeout();
212
47530
    }
213
47544
  }
214
47550
  timeout.per_try_timeout_ = route.retryPolicy()->perTryTimeout();
215
47550
  timeout.per_try_idle_timeout_ = route.retryPolicy()->perTryIdleTimeout();
216

            
217
47550
  uint64_t header_timeout;
218

            
219
47550
  if (respect_expected_rq_timeout) {
220
    // Check if there is timeout set by egress Envoy.
221
    // If present, use that value as route timeout and don't override
222
    // *x-envoy-expected-rq-timeout-ms* header. At this point *x-envoy-upstream-rq-timeout-ms*
223
    // header should have been sanitized by egress Envoy.
224
5
    const Http::HeaderEntry* header_expected_timeout_entry =
225
5
        request_headers.EnvoyExpectedRequestTimeoutMs();
226
5
    if (header_expected_timeout_entry) {
227
3
      trySetGlobalTimeout(*header_expected_timeout_entry, timeout);
228
3
    } else {
229
2
      const Http::HeaderEntry* header_timeout_entry =
230
2
          request_headers.EnvoyUpstreamRequestTimeoutMs();
231

            
232
2
      if (header_timeout_entry) {
233
2
        trySetGlobalTimeout(*header_timeout_entry, timeout);
234
2
        request_headers.removeEnvoyUpstreamRequestTimeoutMs();
235
2
      }
236
2
    }
237
47545
  } else {
238
47545
    const Http::HeaderEntry* header_timeout_entry = request_headers.EnvoyUpstreamRequestTimeoutMs();
239

            
240
47545
    if (header_timeout_entry) {
241
41
      trySetGlobalTimeout(*header_timeout_entry, timeout);
242
41
      request_headers.removeEnvoyUpstreamRequestTimeoutMs();
243
41
    }
244
47545
  }
245

            
246
  // See if there is a per try/retry timeout. If it's >= global we just ignore it.
247
47550
  const absl::string_view per_try_timeout_entry =
248
47550
      request_headers.getEnvoyUpstreamRequestPerTryTimeoutMsValue();
249
47550
  if (!per_try_timeout_entry.empty()) {
250
45
    if (absl::SimpleAtoi(per_try_timeout_entry, &header_timeout)) {
251
43
      timeout.per_try_timeout_ = std::chrono::milliseconds(header_timeout);
252
43
    }
253
45
    request_headers.removeEnvoyUpstreamRequestPerTryTimeoutMs();
254
45
  }
255

            
256
47550
  if (timeout.per_try_timeout_ >= timeout.global_timeout_ && timeout.global_timeout_.count() != 0) {
257
33
    timeout.per_try_timeout_ = std::chrono::milliseconds(0);
258
33
  }
259

            
260
47550
  setTimeoutHeaders(0, timeout, route, request_headers, insert_envoy_expected_request_timeout_ms,
261
47550
                    grpc_request, per_try_timeout_hedging_enabled);
262

            
263
47550
  return timeout;
264
47550
}
265

            
266
void FilterUtility::setTimeoutHeaders(uint64_t elapsed_time, const TimeoutData& timeout,
267
                                      const RouteEntry& route,
268
                                      Http::RequestHeaderMap& request_headers,
269
                                      bool insert_envoy_expected_request_timeout_ms,
270
47803
                                      bool grpc_request, bool per_try_timeout_hedging_enabled) {
271

            
272
47803
  const uint64_t global_timeout = timeout.global_timeout_.count();
273

            
274
  // See if there is any timeout to write in the expected timeout header.
275
47803
  uint64_t expected_timeout = timeout.per_try_timeout_.count();
276

            
277
  // Use the global timeout if no per try timeout was specified or if we're
278
  // doing hedging when there are per try timeouts. Either of these scenarios
279
  // mean that the upstream server can use the full global timeout.
280
47803
  if (per_try_timeout_hedging_enabled || expected_timeout == 0) {
281
45498
    expected_timeout = global_timeout;
282
45498
  }
283

            
284
  // If the expected timeout is 0 set no timeout, as Envoy treats 0 as infinite timeout.
285
47803
  if (expected_timeout > 0) {
286

            
287
45234
    if (global_timeout > 0) {
288
45222
      if (elapsed_time >= global_timeout) {
289
        // We are out of time, but 0 would be an infinite timeout. So instead we send a 1ms timeout
290
        // and assume the timers armed by onRequestComplete() will fire very soon.
291
2
        expected_timeout = 1;
292
45220
      } else {
293
45220
        expected_timeout = std::min(expected_timeout, global_timeout - elapsed_time);
294
45220
      }
295
45222
    }
296

            
297
45234
    if (insert_envoy_expected_request_timeout_ms) {
298
45206
      request_headers.setEnvoyExpectedRequestTimeoutMs(expected_timeout);
299
45206
    }
300

            
301
    // If we've configured max_grpc_timeout, override the grpc-timeout header with
302
    // the expected timeout. This ensures that the optional per try timeout is reflected
303
    // in grpc-timeout, ensuring that the upstream gRPC server is aware of the actual timeout.
304
45234
    if (grpc_request && !route.usingNewTimeouts() && route.maxGrpcTimeout()) {
305
13
      Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(expected_timeout), request_headers);
306
13
    }
307
45234
  }
308
47803
}
309

            
310
absl::optional<std::chrono::milliseconds>
311
60
FilterUtility::tryParseHeaderTimeout(const Http::HeaderEntry& header_timeout_entry) {
312
60
  uint64_t header_timeout;
313
60
  if (absl::SimpleAtoi(header_timeout_entry.value().getStringView(), &header_timeout)) {
314
58
    return std::chrono::milliseconds(header_timeout);
315
58
  }
316
2
  return absl::nullopt;
317
60
}
318

            
319
void FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry& header_timeout_entry,
320
46
                                        TimeoutData& timeout) {
321
46
  const auto timeout_ms = tryParseHeaderTimeout(header_timeout_entry);
322
46
  if (timeout_ms.has_value()) {
323
44
    timeout.global_timeout_ = timeout_ms.value();
324
44
  }
325
46
}
326

            
327
FilterUtility::HedgingParams
328
FilterUtility::finalHedgingParams(const RouteEntry& route,
329
47529
                                  Http::RequestHeaderMap& request_headers) {
330
47529
  HedgingParams hedging_params;
331
47529
  hedging_params.hedge_on_per_try_timeout_ = route.hedgePolicy().hedgeOnPerTryTimeout();
332

            
333
47529
  const Http::HeaderEntry* hedge_on_per_try_timeout_entry =
334
47529
      request_headers.EnvoyHedgeOnPerTryTimeout();
335
47529
  if (hedge_on_per_try_timeout_entry) {
336
16
    if (hedge_on_per_try_timeout_entry->value() == "true") {
337
11
      hedging_params.hedge_on_per_try_timeout_ = true;
338
11
    }
339
16
    if (hedge_on_per_try_timeout_entry->value() == "false") {
340
2
      hedging_params.hedge_on_per_try_timeout_ = false;
341
2
    }
342

            
343
16
    request_headers.removeEnvoyHedgeOnPerTryTimeout();
344
16
  }
345

            
346
47529
  return hedging_params;
347
47529
}
348

            
349
95467
Filter::~Filter() {
350
  // Upstream resources should already have been cleaned.
351
95467
  ASSERT(upstream_requests_.empty());
352
95467
  ASSERT(!retry_state_);
353
95467
}
354

            
355
const FilterUtility::StrictHeaderChecker::HeaderCheckResult
356
FilterUtility::StrictHeaderChecker::checkHeader(Http::RequestHeaderMap& headers,
357
72
                                                const Http::LowerCaseString& target_header) {
358
72
  if (target_header == Http::Headers::get().EnvoyUpstreamRequestTimeoutMs) {
359
29
    return isInteger(headers.EnvoyUpstreamRequestTimeoutMs());
360
47
  } else if (target_header == Http::Headers::get().EnvoyUpstreamRequestPerTryTimeoutMs) {
361
19
    return isInteger(headers.EnvoyUpstreamRequestPerTryTimeoutMs());
362
27
  } else if (target_header == Http::Headers::get().EnvoyMaxRetries) {
363
5
    return isInteger(headers.EnvoyMaxRetries());
364
20
  } else if (target_header == Http::Headers::get().EnvoyRetryOn) {
365
12
    return hasValidRetryFields(headers.EnvoyRetryOn(), &Router::RetryStateImpl::parseRetryOn);
366
12
  } else if (target_header == Http::Headers::get().EnvoyRetryGrpcOn) {
367
7
    return hasValidRetryFields(headers.EnvoyRetryGrpcOn(),
368
7
                               &Router::RetryStateImpl::parseRetryGrpcOn);
369
7
  }
370
  // Should only validate headers for which we have implemented a validator.
371
  PANIC("unexpectedly reached");
372
}
373

            
374
77349
Stats::StatName Filter::upstreamZone(Upstream::HostDescriptionOptConstRef upstream_host) {
375
77349
  return upstream_host ? upstream_host->localityZoneStatName() : config_->empty_stat_name_;
376
77349
}
377

            
378
void Filter::chargeUpstreamCode(uint64_t response_status_code,
379
                                const Http::ResponseHeaderMap& response_headers,
380
41319
                                Upstream::HostDescriptionOptConstRef upstream_host, bool dropped) {
381
  // Passing the response_status_code explicitly is an optimization to avoid
382
  // multiple calls to slow Http::Utility::getResponseStatus.
383
41319
  ASSERT(response_status_code == Http::Utility::getResponseStatus(response_headers));
384
41319
  if (config_->emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck()) {
385
41319
    const Http::HeaderEntry* upstream_canary_header = response_headers.EnvoyUpstreamCanary();
386
41319
    const bool is_canary = (upstream_canary_header && upstream_canary_header->value() == "true") ||
387
41319
                           (upstream_host ? upstream_host->canary() : false);
388
41319
    const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
389

            
390
41319
    Stats::StatName upstream_zone = upstreamZone(upstream_host);
391
41319
    Http::CodeStats::ResponseStatInfo info{
392
41319
        config_->scope_,
393
41319
        cluster_->statsScope(),
394
41319
        config_->empty_stat_name_,
395
41319
        response_status_code,
396
41319
        internal_request,
397
41319
        route_->virtualHost()->statName(),
398
41319
        request_vcluster_ ? request_vcluster_->statName() : config_->empty_stat_name_,
399
41319
        route_stats_context_.has_value() ? route_stats_context_->statName()
400
41319
                                         : config_->empty_stat_name_,
401
41319
        config_->zone_name_,
402
41319
        upstream_zone,
403
41319
        is_canary};
404

            
405
41319
    Http::CodeStats& code_stats = httpContext().codeStats();
406
41319
    code_stats.chargeResponseStat(info, exclude_http_code_stats_);
407

            
408
41319
    if (alt_stat_prefix_ != nullptr) {
409
3
      Http::CodeStats::ResponseStatInfo alt_info{config_->scope_,
410
3
                                                 cluster_->statsScope(),
411
3
                                                 alt_stat_prefix_->statName(),
412
3
                                                 response_status_code,
413
3
                                                 internal_request,
414
3
                                                 config_->empty_stat_name_,
415
3
                                                 config_->empty_stat_name_,
416
3
                                                 config_->empty_stat_name_,
417
3
                                                 config_->zone_name_,
418
3
                                                 upstream_zone,
419
3
                                                 is_canary};
420
3
      code_stats.chargeResponseStat(alt_info, exclude_http_code_stats_);
421
3
    }
422

            
423
41319
    if (dropped) {
424
51
      cluster_->loadReportStats().upstream_rq_dropped_.inc();
425
51
    }
426
41319
    if (upstream_host && Http::CodeUtility::is5xx(response_status_code)) {
427
1728
      upstream_host->stats().rq_error_.inc();
428
1728
    }
429
41319
  }
430
41319
}
431

            
432
void Filter::chargeUpstreamCode(Http::Code code, Upstream::HostDescriptionOptConstRef upstream_host,
433
1543
                                bool dropped) {
434
1543
  const uint64_t response_status_code = enumToInt(code);
435
1543
  const auto fake_response_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(
436
1543
      {{Http::Headers::get().Status, std::to_string(response_status_code)}});
437
1543
  chargeUpstreamCode(response_status_code, *fake_response_headers, upstream_host, dropped);
438
1543
}
439

            
440
50011
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
441
50011
  downstream_headers_ = &headers;
442

            
443
  // Initialize the `modify_headers_` function that will be used to modify the response headers for
444
  // all upstream responses or local responses.
445
50011
  modify_headers_ = [this](Http::ResponseHeaderMap& headers) {
446
41360
    if (route_entry_ == nullptr) {
447
      return;
448
    }
449

            
450
41360
    if (modify_headers_from_upstream_lb_) {
451
1
      modify_headers_from_upstream_lb_(headers);
452
1
    }
453

            
454
41360
    if (attempt_count_ == 0 || !route_entry_->includeAttemptCountInResponse()) {
455
41344
      return;
456
41344
    }
457
    // This header is added without checking for config_->suppress_envoy_headers_ to mirror what
458
    // is done for upstream requests.
459
16
    headers.setEnvoyAttemptCount(attempt_count_);
460
16
  };
461

            
462
  // TODO: Maybe add a filter API for this.
463
50011
  grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
464
50011
  exclude_http_code_stats_ = grpc_request_ && config_->suppress_grpc_request_failure_code_stats_;
465

            
466
  // Only increment rq total stat if we actually decode headers here. This does not count requests
467
  // that get handled by earlier filters.
468
50011
  stats_.rq_total_.inc();
469

            
470
  // Determine if there is a route entry or a direct response for the request.
471
50011
  route_ = callbacks_->route();
472
50011
  if (!route_) {
473
2210
    stats_.no_route_.inc();
474
2210
    ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue());
475

            
476
2210
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoRouteFound);
477
2210
    callbacks_->sendLocalReply(Http::Code::NotFound, "", nullptr, absl::nullopt,
478
2210
                               StreamInfo::ResponseCodeDetails::get().RouteNotFound);
479
2210
    return Http::FilterHeadersStatus::StopIteration;
480
2210
  }
481

            
482
  // Determine if there is a direct response for the request.
483
47801
  const auto* direct_response = route_->directResponseEntry();
484
47801
  if (direct_response != nullptr) {
485
148
    stats_.rq_direct_response_.inc();
486
148
    direct_response->rewritePathHeader(headers, !config_->suppress_envoy_headers_);
487
148
    std::string body;
488
148
    absl::string_view direct_response_body = direct_response->formatBody(
489
148
        (downstream_headers_ == nullptr) ? *Http::StaticEmptyHeaders::get().request_headers
490
148
                                         : *downstream_headers_,
491
148
        *Http::StaticEmptyHeaders::get().response_headers, callbacks_->streamInfo(), body);
492

            
493
148
    callbacks_->sendLocalReply(
494
148
        direct_response->responseCode(), direct_response_body,
495
148
        [this, direct_response](Http::ResponseHeaderMap& response_headers) -> void {
496
148
          std::string new_uri;
497
148
          ASSERT(downstream_headers_ != nullptr);
498
148
          if (downstream_headers_->Path()) {
499
148
            new_uri = direct_response->newUri(*downstream_headers_);
500
148
          }
501
          // See https://tools.ietf.org/html/rfc7231#section-7.1.2.
502
148
          const auto add_location =
503
148
              direct_response->responseCode() == Http::Code::Created ||
504
148
              Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode()));
505
148
          if (!new_uri.empty() && add_location) {
506
19
            response_headers.addReferenceKey(Http::Headers::get().Location, new_uri);
507
19
          }
508
148
          const Formatter::Context formatter_context(downstream_headers_, &response_headers, {}, {},
509
148
                                                     {}, &callbacks_->activeSpan());
510
148
          direct_response->finalizeResponseHeaders(response_headers, formatter_context,
511
148
                                                   callbacks_->streamInfo());
512
          // Apply content_type from body_format if configured.
513
148
          const absl::string_view content_type = direct_response->responseContentType();
514
148
          if (!content_type.empty()) {
515
1
            response_headers.setReferenceKey(Http::Headers::get().ContentType, content_type);
516
1
          }
517
148
        },
518
148
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse);
519
148
    return Http::FilterHeadersStatus::StopIteration;
520
148
  }
521

            
522
  // A route entry matches for the request.
523
47653
  route_entry_ = route_->routeEntry();
524
  // Store buffer limits from the route entry.
525
  // The requestBodyBufferLimit() method handles both legacy per_request_buffer_limit_bytes
526
  // and new request_body_buffer_limit configurations automatically.
527
47653
  request_body_buffer_limit_ = route_entry_->requestBodyBufferLimit();
528
47653
  Upstream::ThreadLocalCluster* cluster =
529
47653
      config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
530
47653
  if (!cluster) {
531
85
    stats_.no_cluster_.inc();
532
85
    ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName());
533

            
534
85
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoClusterFound);
535
85
    callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers_,
536
85
                               absl::nullopt,
537
85
                               StreamInfo::ResponseCodeDetails::get().ClusterNotFound);
538
85
    return Http::FilterHeadersStatus::StopIteration;
539
85
  }
540
47568
  cluster_ = cluster->info();
541

            
542
  // Set up stat prefixes, etc.
543
47568
  request_vcluster_ = route_->virtualHost()->virtualCluster(headers);
544
47568
  if (request_vcluster_ != nullptr) {
545
285
    callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name());
546
285
  }
547
47568
  route_stats_context_ = route_entry_->routeStatsContext();
548
47568
  ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_,
549
47568
                   route_entry_->clusterName(), headers.getPathValue());
550

            
551
47568
  if (config_->strict_check_headers_ != nullptr) {
552
62
    for (const auto& header : *config_->strict_check_headers_) {
553
62
      const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header);
554
62
      if (!res.valid_) {
555
30
        callbacks_->streamInfo().setResponseFlag(
556
30
            StreamInfo::CoreResponseFlag::InvalidEnvoyRequestHeaders);
557
30
        const std::string body = fmt::format("invalid header '{}' with value '{}'",
558
30
                                             std::string(res.entry_->key().getStringView()),
559
30
                                             std::string(res.entry_->value().getStringView()));
560
30
        const std::string details =
561
30
            absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{",
562
30
                         StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}");
563
30
        callbacks_->sendLocalReply(Http::Code::BadRequest, body, modify_headers_, absl::nullopt,
564
30
                                   details);
565
30
        return Http::FilterHeadersStatus::StopIteration;
566
30
      }
567
62
    }
568
31
  }
569

            
570
47538
  const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName();
571
47538
  if (request_alt_name) {
572
3
    alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>(
573
3
        request_alt_name->value().getStringView(), config_->scope_.symbolTable());
574
3
    headers.removeEnvoyUpstreamAltStatName();
575
3
  }
576

            
577
  // See if we are supposed to immediately kill some percentage of this cluster's traffic.
578
47538
  if (cluster_->maintenanceMode()) {
579
3
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
580
3
    chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, true);
581
3
    callbacks_->sendLocalReply(
582
3
        Http::Code::ServiceUnavailable, "maintenance mode",
583
3
        [this](Http::ResponseHeaderMap& headers) {
584
3
          if (!config_->suppress_envoy_headers_) {
585
1
            headers.addReference(Http::Headers::get().EnvoyOverloaded,
586
1
                                 Http::Headers::get().EnvoyOverloadedValues.True);
587
1
          }
588
          // Note: append_cluster_info does not respect suppress_envoy_headers.
589
3
          modify_headers_(headers);
590
3
        },
591
3
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
592
3
    cluster_->trafficStats()->upstream_rq_maintenance_mode_.inc();
593
3
    return Http::FilterHeadersStatus::StopIteration;
594
3
  }
595

            
596
  // Support DROP_OVERLOAD config from control plane to drop certain percentage of traffic.
597
47535
  if (checkDropOverload(*cluster)) {
598
14
    return Http::FilterHeadersStatus::StopIteration;
599
14
  }
600

            
601
  // If large request buffering is enabled and its size is more than current buffer limit, update
602
  // the buffer limit to a new larger value.
603
47521
  uint64_t effective_buffer_limit = calculateEffectiveBufferLimit();
604
47521
  if (effective_buffer_limit != std::numeric_limits<uint64_t>::max() &&
605
47521
      effective_buffer_limit > callbacks_->bufferLimit()) {
606
4
    ENVOY_STREAM_LOG(debug, "Setting new filter manager buffer limit: {}", *callbacks_,
607
4
                     effective_buffer_limit);
608
4
    callbacks_->setBufferLimit(effective_buffer_limit);
609
4
  }
610

            
611
  // Increment the attempt count from 0 to 1 at the first upstream request.
612
47521
  attempt_count_++;
613
47521
  callbacks_->streamInfo().setAttemptCount(attempt_count_);
614

            
615
  // Set hedging params before finalizeRequestHeaders so timeout calculation is correct.
616
47521
  hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers);
617

            
618
  // Calculate timeout and set x-envoy-expected-rq-timeout-ms before finalizeRequestHeaders.
619
  // This allows request_headers_to_add to reference the timeout header.
620
47521
  timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_->suppress_envoy_headers_,
621
47521
                                         grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
622
47521
                                         config_->respect_expected_rq_timeout_);
623

            
624
  // Set x-envoy-attempt-count before finalizeRequestHeaders so it can be referenced.
625
47521
  include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest();
626
47521
  if (include_attempt_count_in_request_) {
627
30
    headers.setEnvoyAttemptCount(attempt_count_);
628
30
  }
629

            
630
  // Finalize request headers (host/path rewriting + request_headers_to_add) before host selection.
631
  // At this point, router-set headers (x-envoy-expected-rq-timeout-ms, x-envoy-attempt-count)
632
  // are already set, so request_headers_to_add can reference them and affect load balancing.
633
47521
  const Formatter::Context formatter_context(&headers, {}, {}, {}, {}, &callbacks_->activeSpan());
634
47521
  route_entry_->finalizeRequestHeaders(headers, formatter_context, callbacks_->streamInfo(),
635
47521
                                       !config_->suppress_envoy_headers_);
636

            
637
  // Fetch a connection pool for the upstream cluster.
638
47521
  const auto& upstream_http_protocol_options =
639
47521
      cluster_->httpProtocolOptions().upstreamHttpProtocolOptions();
640
47521
  if (upstream_http_protocol_options && (upstream_http_protocol_options->auto_sni() ||
641
2304
                                         upstream_http_protocol_options->auto_san_validation())) {
642
    // Default the header to Host/Authority header.
643
2299
    absl::string_view header_value = headers.getHostValue();
644

            
645
    // Check whether `override_auto_sni_header` is specified.
646
2299
    if (const auto& override_header = upstream_http_protocol_options->override_auto_sni_header();
647
2299
        !override_header.empty()) {
648
      // Use the header value from `override_auto_sni_header` to set the SNI value.
649
13
      const auto override_header_value = headers.get(Http::LowerCaseString(override_header));
650
13
      if (override_header_value.size() > 1) {
651
1
        ENVOY_STREAM_LOG(info, "Multiple values for auto sni header '{}' and use the first one",
652
1
                         *callbacks_, override_header);
653
1
      }
654
13
      if (!override_header_value.empty() && !override_header_value[0]->value().empty()) {
655
11
        header_value = override_header_value[0]->value().getStringView();
656
11
      }
657
13
    }
658

            
659
2299
    auto& filter_state = callbacks_->streamInfo().filterState();
660

            
661
    // Parse the authority header value to extract the host and check if it's an IP address.
662
2299
    const auto parsed_authority = Http::Utility::parseAuthority(header_value);
663

            
664
2299
    if (!parsed_authority.is_ip_address_ && upstream_http_protocol_options->auto_sni() &&
665
2299
        !filter_state->hasDataWithName(Network::UpstreamServerName::key())) {
666
2283
      filter_state->setData(Network::UpstreamServerName::key(),
667
2283
                            std::make_unique<Network::UpstreamServerName>(parsed_authority.host_),
668
2283
                            StreamInfo::FilterState::StateType::Mutable);
669
2283
    }
670

            
671
2299
    if (upstream_http_protocol_options->auto_san_validation() &&
672
2299
        !filter_state->hasDataWithName(Network::UpstreamSubjectAltNames::key())) {
673
157
      filter_state->setData(Network::UpstreamSubjectAltNames::key(),
674
157
                            std::make_unique<Network::UpstreamSubjectAltNames>(
675
157
                                std::vector<std::string>{std::string(parsed_authority.host_)}),
676
157
                            StreamInfo::FilterState::StateType::Mutable);
677
157
    }
678
2299
  }
679

            
680
47521
  transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
681
47521
      *callbacks_->streamInfo().filterState());
682

            
683
47521
  if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) {
684
44380
    if (auto typed_state = downstream_connection->streamInfo()
685
44380
                               .filterState()
686
44380
                               .getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
687
44380
                                   Network::UpstreamSocketOptionsFilterState::key());
688
44380
        typed_state != nullptr) {
689
1
      auto downstream_options = typed_state->value();
690
1
      if (!upstream_options_) {
691
1
        upstream_options_ = std::make_shared<Network::Socket::Options>();
692
1
      }
693
1
      Network::Socket::appendOptions(upstream_options_, downstream_options);
694
1
    }
695
44380
  }
696

            
697
47521
  if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) {
698
    Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
699
  }
700

            
701
47521
  callbacks_->streamInfo().downstreamTiming().setValue(
702
47521
      "envoy.router.host_selection_start_ms",
703
47521
      callbacks_->dispatcher().timeSource().monotonicTime());
704

            
705
47521
  auto host_selection_response = cluster->chooseHost(this);
706
47521
  if (!host_selection_response.cancelable ||
707
47521
      !Runtime::runtimeFeatureEnabled("envoy.reloadable_features.async_host_selection")) {
708
33588
    if (host_selection_response.cancelable) {
709
7
      host_selection_response.cancelable->cancel();
710
7
    }
711
    // This branch handles the common case of synchronous host selection, as
712
    // well as handling unsupported asynchronous host selection by treating it
713
    // as host selection failure and calling sendNoHealthyUpstreamResponse.
714
33588
    continueDecodeHeaders(cluster, headers, end_stream, std::move(host_selection_response.host),
715
33588
                          host_selection_response.details);
716
33588
    return Http::FilterHeadersStatus::StopIteration;
717
33588
  }
718

            
719
13933
  ENVOY_STREAM_LOG(debug, "Doing asynchronous host selection\n", *callbacks_);
720
  // Latch the cancel handle and call it in Filter::onDestroy to avoid any use-after-frees for cases
721
  // like stream timeout.
722
13933
  host_selection_cancelable_ = std::move(host_selection_response.cancelable);
723
  // Configure a callback to be called on asynchronous host selection.
724
13933
  on_host_selected_ = ([this, cluster,
725
13933
                        end_stream](Upstream::HostConstSharedPtr&& host,
726
13933
                                    absl::string_view host_selection_details) -> void {
727
    // It should always be safe to call continueDecodeHeaders. In the case the
728
    // stream had a local reply before host selection completed,
729
    // the lookup should be canceled.
730
13477
    const bool should_continue_decoding = continueDecodeHeaders(
731
13477
        cluster, *downstream_headers_, end_stream, std::move(host), host_selection_details);
732
    // continueDecodeHeaders can itself send a local reply, in which case should_continue_decoding
733
    // should be false. If this is not the case, we can continue the filter chain due to successful
734
    // asynchronous host selection.
735
13477
    if (should_continue_decoding) {
736
13437
      ENVOY_STREAM_LOG(debug, "Continuing stream now that host resolution is complete\n",
737
13437
                       *callbacks_);
738
13437
      callbacks_->continueDecoding();
739
13437
    } else {
740
40
      ENVOY_STREAM_LOG(debug, "Aborting stream after host resolution is complete\n", *callbacks_);
741
40
    }
742
13477
  });
743
13933
  return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
744
47521
}
745

            
746
// When asynchronous host selection is complete, call the pre-configured on_host_selected_function.
747
13602
void Filter::onAsyncHostSelection(Upstream::HostConstSharedPtr&& host, std::string&& details) {
748
13602
  ENVOY_STREAM_LOG(debug, "Completing asynchronous host selection [{}]\n", *callbacks_, details);
749
13602
  std::unique_ptr<Upstream::AsyncHostSelectionHandle> local_scope =
750
13602
      std::move(host_selection_cancelable_);
751
13602
  on_host_selected_(std::move(host), details);
752
13602
}
753

            
754
bool Filter::continueDecodeHeaders(Upstream::ThreadLocalCluster* cluster,
755
                                   Http::RequestHeaderMap& headers, bool end_stream,
756
                                   Upstream::HostConstSharedPtr&& selected_host,
757
47065
                                   absl::string_view host_selection_details) {
758
47065
  callbacks_->streamInfo().downstreamTiming().setValue(
759
47065
      "envoy.router.host_selection_end_ms", callbacks_->dispatcher().timeSource().monotonicTime());
760

            
761
47065
  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster, selected_host);
762
47065
  if (!generic_conn_pool) {
763
73
    sendNoHealthyUpstreamResponse(host_selection_details);
764
73
    return false;
765
73
  }
766
46992
  Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();
767

            
768
  // If we've been instructed not to forward the request upstream, send an empty local response.
769
46992
  if (auto* debug_config =
770
46992
          callbacks_->streamInfo().filterState()->getDataReadOnly<DebugConfig>(DebugConfig::key());
771
46992
      debug_config != nullptr && debug_config->do_not_forward_) {
772
1
    callbacks_->sendLocalReply(
773
1
        Http::Code::NoContent, "",
774
1
        [this](Http::ResponseHeaderMap& headers) {
775
1
          headers.addReference(Http::Headers::get().EnvoyNotForwarded, "true");
776
1
          modify_headers_(headers);
777
1
        },
778
1
        absl::nullopt, "");
779
1
    return false;
780
1
  }
781

            
782
46991
  if (callbacks_->shouldLoadShed()) {
783
5
    callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", modify_headers_,
784
5
                               absl::nullopt, StreamInfo::ResponseCodeDetails::get().Overload);
785
5
    stats_.rq_overload_local_reply_.inc();
786
5
    return false;
787
5
  }
788

            
789
  // Handle additional header processing.
790
46986
  const Http::HeaderEntry* header_max_stream_duration_entry =
791
46986
      headers.EnvoyUpstreamStreamDurationMs();
792
46986
  if (header_max_stream_duration_entry) {
793
14
    dynamic_max_stream_duration_ =
794
14
        FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry);
795
14
    headers.removeEnvoyUpstreamStreamDurationMs();
796
14
  }
797

            
798
  // If this header is set with any value, use an alternate response code on timeout.
799
46986
  if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) {
800
2
    timeout_response_code_ = Http::Code::NoContent;
801
2
    headers.removeEnvoyUpstreamRequestTimeoutAltResponse();
802
2
  }
803

            
804
46986
  FilterUtility::setUpstreamScheme(
805
46986
      headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr,
806
46986
      host->transportSocketFactory().sslCtx() != nullptr,
807
46986
      callbacks_->streamInfo().shouldSchemeMatchUpstream());
808

            
809
  // Ensure an http transport scheme is selected before continuing with decoding.
810
46986
  ASSERT(headers.Scheme());
811

            
812
46986
  retry_state_ = createRetryState(*getEffectiveRetryPolicy(), headers, *cluster_, request_vcluster_,
813
46986
                                  route_stats_context_, config_->factory_context_,
814
46986
                                  callbacks_->dispatcher(), route_entry_->priority());
815

            
816
  // Determine which shadow policies to use. It's possible that we don't do any shadowing due to
817
  // runtime keys. Also the method CONNECT doesn't support shadowing.
818
46986
  auto method = headers.getMethodValue();
819
46986
  if (method != Http::Headers::get().MethodValues.Connect) {
820
    // Use cluster-level shadow policies if they are available (most specific wins).
821
    // If no cluster-level policies are configured, fall back to route-level policies.
822
46550
    const auto& cluster_shadow_policies = cluster_->httpProtocolOptions().shadowPolicies();
823
46550
    const auto& policies_to_evaluate =
824
46550
        !cluster_shadow_policies.empty() ? cluster_shadow_policies : route_entry_->shadowPolicies();
825

            
826
46550
    for (const auto& shadow_policy : policies_to_evaluate) {
827
98
      const auto& policy_ref = *shadow_policy;
828
98
      if (FilterUtility::shouldShadow(policy_ref, config_->runtime_, callbacks_->streamId())) {
829
98
        active_shadow_policies_.push_back(std::cref(policy_ref));
830
98
        shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_);
831
98
      }
832
98
    }
833
46550
  }
834

            
835
46986
  ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);
836

            
837
46986
  const bool can_send_early_data =
838
46986
      route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_);
839

            
840
46986
  include_timeout_retry_header_in_request_ = route_->virtualHost()->includeIsTimeoutRetryHeader();
841

            
842
  // Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config.
843
  // For retries etc, HTTP/3 usability may transition from true to false, but
844
  // will never transition from false to true.
845
46986
  bool can_use_http3 =
846
46986
      !transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value();
847
46986
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
848
46986
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3,
849
46986
      allow_multiplexed_upstream_half_close_ /*enable_half_close*/);
850
46986
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
851
46986
  upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
852
  // Start the shadow streams.
853
46986
  for (const auto& shadow_policy_wrapper : active_shadow_policies_) {
854
98
    const auto& shadow_policy = shadow_policy_wrapper.get();
855
98
    const absl::optional<absl::string_view> shadow_cluster_name =
856
98
        getShadowCluster(shadow_policy, *downstream_headers_);
857
98
    if (!shadow_cluster_name.has_value()) {
858
4
      continue;
859
4
    }
860
94
    auto shadow_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_);
861
94
    applyShadowPolicyHeaders(shadow_policy, *shadow_headers);
862
94
    const auto options =
863
94
        Http::AsyncClient::RequestOptions()
864
94
            .setTimeout(timeout_.global_timeout_)
865
94
            .setParentSpan(callbacks_->activeSpan())
866
94
            .setChildSpanName("mirror")
867
94
            .setSampled(shadow_policy.traceSampled())
868
94
            .setIsShadow(true)
869
94
            .setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend())
870
94
            .setBufferAccount(callbacks_->account())
871
            // Calculate effective buffer limit for shadow streams using the same logic as main
872
            // request. A buffer limit of 1 is set in the case that the effective limit == 0,
873
            // because a buffer limit of zero on async clients is interpreted as no buffer limit.
874
94
            .setBufferLimit([this]() -> uint64_t {
875
94
              const uint64_t effective_limit = calculateEffectiveBufferLimit();
876
94
              return effective_limit == 0 ? 1 : effective_limit;
877
94
            }())
878
94
            .setDiscardResponseBody(true)
879
94
            .setFilterConfig(config_)
880
94
            .setParentContext(Http::AsyncClient::ParentContext{&callbacks_->streamInfo()});
881
94
    if (end_stream) {
882
      // This is a header-only request, and can be dispatched immediately to the shadow
883
      // without waiting.
884
51
      Http::RequestMessagePtr request(new Http::RequestMessageImpl(
885
51
          Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
886
51
      applyShadowPolicyHeaders(shadow_policy, request->headers());
887
51
      config_->shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
888
51
                                     options);
889
85
    } else {
890
43
      Http::AsyncClient::OngoingRequest* shadow_stream = config_->shadowWriter().streamingShadow(
891
43
          std::string(shadow_cluster_name.value()), std::move(shadow_headers), options);
892
43
      if (shadow_stream != nullptr) {
893
43
        shadow_streams_.insert(shadow_stream);
894
43
        shadow_stream->setDestructorCallback(
895
43
            [this, shadow_stream]() { shadow_streams_.erase(shadow_stream); });
896
43
        shadow_stream->setWatermarkCallbacks(watermark_callbacks_);
897
43
      }
898
43
    }
899
94
  }
900
46986
  if (end_stream) {
901
32590
    onRequestComplete();
902
32590
  }
903

            
904
  // If this was called due to asynchronous host selection, the caller should continueDecoding.
905
46986
  return true;
906
46991
}
907

            
908
std::unique_ptr<GenericConnPool> Filter::createConnPool(Upstream::ThreadLocalCluster& cluster,
909
47311
                                                        Upstream::HostConstSharedPtr host) {
910
47311
  if (host == nullptr) {
911
72
    return nullptr;
912
72
  }
913
47239
  GenericConnPoolFactory* factory = nullptr;
914
47239
  ProtobufTypes::MessagePtr message;
915
47239
  if (cluster_->upstreamConfig().has_value()) {
916
15
    factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
917
15
        cluster_->upstreamConfig().ref());
918
15
    ENVOY_BUG(factory != nullptr,
919
15
              fmt::format("invalid factory type '{}', failing over to default upstream",
920
15
                          cluster_->upstreamConfig().ref().DebugString()));
921
15
    if (!cluster_->upstreamConfig()->typed_config().value().empty()) {
922
2
      message = Envoy::Config::Utility::translateToFactoryConfig(
923
2
          *cluster_->upstreamConfig(), config_->factory_context_.messageValidationVisitor(),
924
2
          *factory);
925
2
    }
926
15
  }
927
47239
  if (!factory) {
928
47225
    factory = &config_->router_context_.genericConnPoolFactory();
929
47225
  }
930

            
931
47239
  if (!message) {
932
47237
    message = factory->createEmptyConfigProto();
933
47237
  }
934

            
935
47239
  using UpstreamProtocol = Envoy::Router::GenericConnPoolFactory::UpstreamProtocol;
936
47239
  UpstreamProtocol upstream_protocol = UpstreamProtocol::HTTP;
937
47239
  if (route_entry_->connectConfig().has_value()) {
938
402
    auto method = downstream_headers_->getMethodValue();
939
402
    if (Http::HeaderUtility::isConnectUdpRequest(*downstream_headers_)) {
940
32
      upstream_protocol = UpstreamProtocol::UDP;
941
402
    } else if (method == Http::Headers::get().MethodValues.Connect ||
942
370
               (route_entry_->connectConfig()->allow_post() &&
943
369
                method == Http::Headers::get().MethodValues.Post)) {
944
      // Allow POST for proxying raw TCP if it is configured.
945
366
      upstream_protocol = UpstreamProtocol::TCP;
946
366
    }
947
402
  }
948

            
949
47239
  return factory->createGenericConnPool(host, cluster, upstream_protocol, route_entry_->priority(),
950
47239
                                        callbacks_->streamInfo().protocol(), this, *message);
951
47311
}
952

            
953
75
void Filter::sendNoHealthyUpstreamResponse(absl::string_view optional_details) {
954
75
  callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
955
75
  chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, false);
956
75
  absl::string_view details = optional_details.empty()
957
75
                                  ? StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream
958
75
                                  : optional_details;
959
75
  callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_,
960
75
                             absl::nullopt, details);
961
75
}
962

            
963
233319
uint64_t Filter::calculateEffectiveBufferLimit() const {
964
  // Use requestBodyBufferLimit() method which handles both legacy and new
965
  // configurations. If no buffer limit is configured, fall back to connection limit.
966
233319
  if (request_body_buffer_limit_ != std::numeric_limits<uint64_t>::max()) {
967
979
    return request_body_buffer_limit_;
968
979
  }
969

            
970
  // If no route-level buffer limit is set, use the stream buffer limit.
971
232340
  const uint32_t current_stream_limit = callbacks_->bufferLimit();
972
232340
  if (current_stream_limit != 0) {
973
231605
    return static_cast<uint64_t>(current_stream_limit);
974
231605
  }
975

            
976
  // If no limits are set at all, return unlimited.
977
735
  return std::numeric_limits<uint64_t>::max();
978
232340
}
979

            
980
182358
bool Filter::isEarlyConnectData() {
981
182358
  return reject_early_connect_data_enabled_ && downstream_headers_ != nullptr &&
982
182358
         Http::HeaderUtility::isConnect(*downstream_headers_) && !downstream_response_started_;
983
182358
}
984

            
985
185704
Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) {
986
185704
  if (data.length() > 0 && isEarlyConnectData()) {
987
    callbacks_->sendLocalReply(Http::Code::BadRequest, "", nullptr, absl::nullopt,
988
                               StreamInfo::ResponseCodeDetails::get().EarlyConnectData);
989
    return Http::FilterDataStatus::StopIterationNoBuffer;
990
  }
991
  // upstream_requests_.size() cannot be > 1 because that only happens when a per
992
  // try timeout occurs with hedge_on_per_try_timeout enabled but the per
993
  // try timeout timer is not started until onRequestComplete(). It could be zero
994
  // if the first request attempt has already failed and a retry is waiting for
995
  // a backoff timer..
996
185704
  ASSERT(upstream_requests_.size() <= 1);
997

            
998
185704
  const bool retry_enabled = retry_state_ && retry_state_->enabled();
999
185704
  const bool redirect_enabled = route_entry_ && route_entry_->internalRedirectPolicy().enabled();
185704
  const bool is_redirect_only = redirect_enabled && !retry_enabled;
185704
  const uint64_t effective_buffer_limit = calculateEffectiveBufferLimit();
185704
  bool buffering = retry_enabled || redirect_enabled;
  // Check if we would exceed buffer limits, regardless of current buffering state
  // This ensures error details are set even if retry state was cleared due to upstream reset.
185704
  const bool would_exceed_buffer =
185704
      (getLength(callbacks_->decodingBuffer()) + data.length() > effective_buffer_limit);
  // Handle retry buffer overflow, excluding redirect-only scenarios.
  // For redirect scenarios, buffer overflow should only affect redirect processing, not initial
  // request.
185704
  if (would_exceed_buffer && retry_enabled && !is_redirect_only && !request_buffer_overflowed_) {
312
    ENVOY_LOG(debug,
312
              "The request payload has at least {} bytes data which exceeds buffer limit {}. "
312
              "Giving up on buffering.",
312
              getLength(callbacks_->decodingBuffer()) + data.length(), effective_buffer_limit);
312
    cluster_->trafficStats()->retry_or_shadow_abandoned_.inc();
312
    retry_state_.reset();
312
    ENVOY_LOG(debug, "retry or shadow overflow: retry_state_ reset, buffering set to false");
312
    buffering = false;
312
    active_shadow_policies_.clear();
    // Only send local reply and cleanup if we're in a retry waiting state (no active upstream
    // requests). If there are active upstream requests, let the normal upstream failure handling
    // take precedence.
312
    if (upstream_requests_.empty()) {
20
      request_buffer_overflowed_ = true;
20
      ENVOY_LOG(debug,
20
                "retry or shadow overflow: No upstream requests, resetting and calling cleanup()");
20
      resetAll();
20
      cleanup();
20
      callbacks_->streamInfo().setResponseCodeDetails(
20
          StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit);
20
      callbacks_->sendLocalReply(
20
          Http::Code::InsufficientStorage, "exceeded request buffer limit while retrying upstream",
20
          modify_headers_, absl::nullopt,
20
          StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit);
20
      return Http::FilterDataStatus::StopIterationNoBuffer;
301
    } else {
292
      ENVOY_LOG(debug, "retry or shadow overflow: Upstream requests exist, deferring to normal "
292
                       "upstream failure handling");
292
    }
312
  }
  // Handle redirect-only buffer overflow when retry/shadow is not active.
  // For redirect scenarios, buffer overflow should only affect redirect processing, not initial
  // request.
185684
  if (would_exceed_buffer && is_redirect_only && !request_buffer_overflowed_) {
8
    ENVOY_LOG(debug,
8
              "The request payload has at least {} bytes data which exceeds buffer limit {}. "
8
              "Marking request as buffer overflowed to cancel internal redirects.",
8
              getLength(callbacks_->decodingBuffer()) + data.length(), effective_buffer_limit);
    // Set the flag to cancel internal redirect processing, but allow the request to proceed
    // normally.
8
    request_buffer_overflowed_ = true;
8
  }
185684
  for (auto* shadow_stream : shadow_streams_) {
164
    if (end_stream) {
32
      shadow_stream->removeDestructorCallback();
32
      shadow_stream->removeWatermarkCallbacks();
32
    }
164
    Buffer::OwnedImpl copy(data);
164
    shadow_stream->sendData(copy, end_stream);
164
  }
185684
  if (end_stream) {
6969
    shadow_streams_.clear();
6969
  }
185684
  if (buffering) {
1205
    if (!upstream_requests_.empty()) {
1197
      Buffer::OwnedImpl copy(data);
1197
      upstream_requests_.front()->acceptDataFromRouter(copy, end_stream);
1197
    }
    // If we are potentially going to retry or buffer shadow this request we need to buffer.
    // This will not cause the connection manager to 413 because before we hit the
    // buffer limit we give up on retries and buffering. We must buffer using addDecodedData()
    // so that all buffered data is available by the time we do request complete processing and
    // potentially shadow. Additionally, we can't do a copy here because there's a check down
    // this stack for whether `data` is the same buffer as already buffered data.
1205
    callbacks_->addDecodedData(data, true);
184990
  } else {
184479
    if (!upstream_requests_.empty()) {
184478
      upstream_requests_.front()->acceptDataFromRouter(data, end_stream);
184478
    } else {
      // not buffering any data for retry, shadow, and internal redirect, and there will be
      // no more upstream request, abort the request and clean up.
1
      cleanup();
1
      callbacks_->sendLocalReply(
1
          Http::Code::ServiceUnavailable,
1
          "upstream is closed prematurely during decoding data from downstream", modify_headers_,
1
          absl::nullopt, StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset);
1
      return Http::FilterDataStatus::StopIterationNoBuffer;
1
    }
184479
  }
185683
  if (end_stream) {
6969
    onRequestComplete();
6969
  }
185683
  return Http::FilterDataStatus::StopIterationNoBuffer;
185684
}
459
Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) {
459
  ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers);
459
  if (shadow_headers_) {
5
    shadow_trailers_ = Http::createHeaderMap<Http::RequestTrailerMapImpl>(trailers);
5
  }
  // upstream_requests_.size() cannot be > 1 because that only happens when a per
  // try timeout occurs with hedge_on_per_try_timeout enabled but the per
  // try timeout timer is not started until onRequestComplete(). It could be zero
  // if the first request attempt has already failed and a retry is waiting for
  // a backoff timer.
459
  ASSERT(upstream_requests_.size() <= 1);
459
  downstream_trailers_ = &trailers;
459
  if (!upstream_requests_.empty()) {
458
    upstream_requests_.front()->acceptTrailersFromRouter(trailers);
458
  }
459
  for (auto* shadow_stream : shadow_streams_) {
2
    shadow_stream->removeDestructorCallback();
2
    shadow_stream->removeWatermarkCallbacks();
2
    shadow_stream->captureAndSendTrailers(
2
        Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
2
  }
459
  shadow_streams_.clear();
459
  onRequestComplete();
459
  return Http::FilterTrailersStatus::StopIteration;
459
}
1292
Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) {
1292
  Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
1292
  if (!upstream_requests_.empty()) {
    // TODO(soya3129): Save metadata for retry, redirect and shadowing case.
1292
    upstream_requests_.front()->acceptMetadataFromRouter(std::move(metadata_map_ptr));
1292
  }
1292
  return Http::FilterMetadataStatus::Continue;
1292
}
95455
void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
95455
  callbacks_ = &callbacks;
95455
  watermark_callbacks_.setDecoderFilterCallbacks(callbacks_);
95455
}
135448
void Filter::cleanup() {
  // All callers of cleanup() should have cleaned out the upstream_requests_
  // list as appropriate.
135448
  ASSERT(upstream_requests_.empty());
135448
  ENVOY_LOG(debug, "Executing cleanup(): resetting retry_state_ and disabling timers");
135448
  retry_state_.reset();
135448
  if (response_timeout_) {
38661
    response_timeout_->disableTimer();
38661
    response_timeout_.reset();
38661
  }
135448
}
absl::optional<absl::string_view> Filter::getShadowCluster(const ShadowPolicy& policy,
98
                                                           const Http::HeaderMap& headers) const {
98
  if (!policy.cluster().empty()) {
92
    return policy.cluster();
92
  } else {
6
    ASSERT(!policy.clusterHeader().get().empty());
6
    const auto entry = headers.get(policy.clusterHeader());
6
    if (!entry.empty() && !entry[0]->value().empty()) {
2
      return entry[0]->value().getStringView();
2
    }
4
    ENVOY_STREAM_LOG(debug, "There is no cluster name in header: {}", *callbacks_,
4
                     policy.clusterHeader());
4
    return absl::nullopt;
6
  }
98
}
void Filter::applyShadowPolicyHeaders(const ShadowPolicy& shadow_policy,
145
                                      Http::RequestHeaderMap& headers) const {
145
  const Envoy::Formatter::Context formatter_context{&headers};
145
  shadow_policy.headerEvaluator().evaluateHeaders(headers, formatter_context,
145
                                                  callbacks_->streamInfo());
145
  if (!shadow_policy.hostRewriteLiteral().empty()) {
6
    headers.setHost(shadow_policy.hostRewriteLiteral());
6
  }
145
}
100
void Filter::setupRouteTimeoutForWebsocketUpgrade() {
  // Set up the route timeout early for websocket upgrades, since the upstream request
  // will be paused waiting for the upgrade response and we need the timeout active.
100
  if (!response_timeout_ && timeout_.global_timeout_.count() > 0) {
95
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
95
    response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
95
    response_timeout_->enableTimer(timeout_.global_timeout_);
95
  }
100
}
56
void Filter::disableRouteTimeoutForWebsocketUpgrade() {
  // Disable the route timeout after websocket upgrade completes successfully
  // to prevent timeout from firing after the upgrade is done.
56
  if (response_timeout_) {
56
    response_timeout_->disableTimer();
56
    response_timeout_.reset();
56
  }
56
}
40018
void Filter::onRequestComplete() {
  // This should be called exactly once, when the downstream request has been received in full.
40018
  ASSERT(!downstream_end_stream_);
40018
  downstream_end_stream_ = true;
40018
  Event::Dispatcher& dispatcher = callbacks_->dispatcher();
40018
  downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime();
  // Possible that we got an immediate reset.
40018
  if (!upstream_requests_.empty()) {
    // Even if we got an immediate reset, we could still shadow, but that is a riskier change and
    // seems unnecessary right now.
39383
    if (timeout_.global_timeout_.count() > 0 && !response_timeout_) {
38622
      response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
38622
      response_timeout_->enableTimer(timeout_.global_timeout_);
38622
    }
39383
    for (auto& upstream_request : upstream_requests_) {
39383
      if (upstream_request->createPerTryTimeoutOnRequestComplete()) {
14664
        upstream_request->setupPerTryTimeout();
14664
      }
39383
    }
39383
  }
40018
}
95267
void Filter::onDestroy() {
  // Cancel any in-flight host selection
95267
  if (host_selection_cancelable_) {
424
    host_selection_cancelable_->cancel();
424
  }
  // Reset any in-flight upstream requests.
95267
  resetAll();
  // Unregister from shadow stream notifications and cancel active streams.
95267
  for (auto* shadow_stream : shadow_streams_) {
6
    shadow_stream->removeDestructorCallback();
6
    shadow_stream->removeWatermarkCallbacks();
6
    shadow_stream->cancel();
6
  }
95267
  cleanup();
95267
}
164
void Filter::onResponseTimeout() {
164
  ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_);
  // Reset any upstream requests that are still in flight.
328
  while (!upstream_requests_.empty()) {
164
    UpstreamRequestPtr upstream_request =
164
        upstream_requests_.back()->removeFromList(upstream_requests_);
    // We want to record the upstream timeouts and increase the stats counters in all the cases.
    // For example, we also want to record the stats in the case of BiDi streaming APIs where we
    // might have already seen the headers.
164
    cluster_->trafficStats()->upstream_rq_timeout_.inc();
164
    if (request_vcluster_) {
12
      request_vcluster_->stats().upstream_rq_timeout_.inc();
12
    }
164
    if (route_stats_context_.has_value()) {
      route_stats_context_->stats().upstream_rq_timeout_.inc();
    }
164
    if (upstream_request->upstreamHost()) {
159
      upstream_request->upstreamHost()->stats().rq_timeout_.inc();
159
    }
164
    if (upstream_request->awaitingHeaders()) {
159
      if (cluster_->timeoutBudgetStats().has_value()) {
        // Cancel firing per-try timeout information, because the per-try timeout did not come into
        // play when the global timeout was hit.
14
        upstream_request->recordTimeoutBudget(false);
14
      }
      // If this upstream request already hit a "soft" timeout, then it
      // already recorded a timeout into outlier detection. Don't do it again.
159
      if (!upstream_request->outlierDetectionTimeoutRecorded()) {
158
        updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, *upstream_request,
158
                               absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
158
      }
159
      chargeUpstreamAbort(timeout_response_code_, false, *upstream_request);
159
    }
164
    upstream_request->resetStream();
164
  }
164
  onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout,
164
                         StreamInfo::ResponseCodeDetails::get().ResponseTimeout);
164
}
// Called when the per try timeout is hit but we didn't reset the request
// (hedge_on_per_try_timeout enabled).
18
void Filter::onSoftPerTryTimeout(UpstreamRequest& upstream_request) {
18
  ASSERT(!upstream_request.retried());
  // Track this as a timeout for outlier detection purposes even though we didn't
  // cancel the request yet and might get a 2xx later.
18
  updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
18
                         absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
18
  upstream_request.outlierDetectionTimeoutRecorded(true);
18
  if (!downstream_response_started_ && retry_state_) {
18
    RetryStatus retry_status = retry_state_->shouldHedgeRetryPerTryTimeout(
18
        [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_]() -> void {
          // Without any knowledge about what's going on in the connection pool, retry the request
          // with the safest settings which is no early data but keep using or not using alt-svc as
          // before. In this way, QUIC won't be falsely marked as broken.
18
          doRetry(/*can_send_early_data*/ false, can_use_http3, TimeoutRetry::Yes);
18
        });
18
    if (retry_status == RetryStatus::Yes) {
18
      runRetryOptionsPredicates(upstream_request);
18
      pending_retries_++;
      // Don't increment upstream_host->stats().rq_error_ here, we'll do that
      // later if 1) we hit global timeout or 2) we get bad response headers
      // back.
18
      upstream_request.retried(true);
      // TODO: cluster stat for hedge attempted.
18
    } else if (retry_status == RetryStatus::NoOverflow) {
      callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
    } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
      callbacks_->streamInfo().setResponseFlag(
          StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
    }
18
  }
18
}
13
void Filter::onPerTryIdleTimeout(UpstreamRequest& upstream_request) {
13
  onPerTryTimeoutCommon(upstream_request,
13
                        cluster_->trafficStats()->upstream_rq_per_try_idle_timeout_,
13
                        StreamInfo::ResponseCodeDetails::get().UpstreamPerTryIdleTimeout);
13
}
37
void Filter::onPerTryTimeout(UpstreamRequest& upstream_request) {
37
  onPerTryTimeoutCommon(upstream_request, cluster_->trafficStats()->upstream_rq_per_try_timeout_,
37
                        StreamInfo::ResponseCodeDetails::get().UpstreamPerTryTimeout);
37
}
void Filter::onPerTryTimeoutCommon(UpstreamRequest& upstream_request, Stats::Counter& error_counter,
50
                                   const std::string& response_code_details) {
50
  if (hedging_params_.hedge_on_per_try_timeout_) {
18
    onSoftPerTryTimeout(upstream_request);
18
    return;
18
  }
32
  error_counter.inc();
32
  if (upstream_request.upstreamHost()) {
32
    upstream_request.upstreamHost()->stats().rq_timeout_.inc();
32
  }
32
  upstream_request.resetStream();
32
  updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
32
                         absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
32
  if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::Yes)) {
5
    return;
5
  }
27
  chargeUpstreamAbort(timeout_response_code_, false, upstream_request);
  // Remove this upstream request from the list now that we're done with it.
27
  upstream_request.removeFromList(upstream_requests_);
27
  onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout,
27
                         response_code_details);
27
}
66
void Filter::onStreamMaxDurationReached(UpstreamRequest& upstream_request) {
66
  upstream_request.resetStream();
66
  if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::No)) {
19
    return;
19
  }
47
  upstream_request.removeFromList(upstream_requests_);
47
  cleanup();
47
  callbacks_->streamInfo().setResponseFlag(
47
      StreamInfo::CoreResponseFlag::UpstreamMaxStreamDurationReached);
  // Grab the const ref to call the const method of StreamInfo.
47
  const auto& stream_info = callbacks_->streamInfo();
47
  const bool downstream_decode_complete =
47
      stream_info.downstreamTiming().has_value() &&
47
      stream_info.downstreamTiming().value().get().lastDownstreamRxByteReceived().has_value();
  // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
47
  callbacks_->sendLocalReply(
47
      Http::Utility::maybeRequestTimeoutCode(downstream_decode_complete),
47
      "upstream max stream duration reached", modify_headers_, absl::nullopt,
47
      StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached);
47
}
void Filter::updateOutlierDetection(Upstream::Outlier::Result result,
                                    UpstreamRequest& upstream_request,
2844
                                    absl::optional<uint64_t> code) {
2844
  if (upstream_request.upstreamHost()) {
2835
    upstream_request.upstreamHost()->outlierDetector().putResult(result, code);
2835
  }
2844
}
2792
void Filter::chargeUpstreamAbort(Http::Code code, bool dropped, UpstreamRequest& upstream_request) {
2792
  if (downstream_response_started_) {
1341
    if (upstream_request.grpcRqSuccessDeferred()) {
1133
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1133
      stats_.rq_reset_after_downstream_response_started_.inc();
1133
    }
2373
  } else {
1451
    Upstream::HostDescriptionOptConstRef upstream_host = upstream_request.upstreamHost();
1451
    chargeUpstreamCode(code, upstream_host, dropped);
    // If we had non-5xx but still have been reset by backend or timeout before
    // starting response, we treat this as an error. We only get non-5xx when
    // timeout_response_code_ is used for code above, where this member can
    // assume values such as 204 (NoContent).
1451
    if (upstream_host.has_value() && !Http::CodeUtility::is5xx(enumToInt(code))) {
1
      upstream_host->stats().rq_error_.inc();
1
    }
1451
  }
2792
}
void Filter::onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag response_flags,
191
                                    absl::string_view details) {
191
  Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
191
  if (tb_stats.has_value()) {
21
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
21
    std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
21
        dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
21
    tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
21
        FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
21
  }
191
  const absl::string_view body =
191
      timeout_response_code_ == Http::Code::GatewayTimeout ? "upstream request timeout" : "";
191
  onUpstreamAbort(timeout_response_code_, response_flags, body, false, details);
191
}
void Filter::onUpstreamAbort(Http::Code code, StreamInfo::CoreResponseFlag response_flags,
2795
                             absl::string_view body, bool dropped, absl::string_view details) {
  // If we have not yet sent anything downstream, send a response with an appropriate status code.
  // Otherwise just reset the ongoing response.
2795
  callbacks_->streamInfo().setResponseFlag(response_flags);
  // Check if buffer overflow occurred and override error details accordingly
2795
  if (request_buffer_overflowed_) {
    code = Http::Code::InsufficientStorage;
    body = "exceeded request buffer limit while retrying upstream";
    details = StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit;
  }
  // This will destroy any created retry timers.
2795
  cleanup();
  // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
2795
  callbacks_->sendLocalReply(
2795
      code, body,
2795
      [dropped, this](Http::ResponseHeaderMap& headers) {
1461
        if (dropped && !config_->suppress_envoy_headers_) {
34
          headers.addReference(Http::Headers::get().EnvoyOverloaded,
34
                               Http::Headers::get().EnvoyOverloadedValues.True);
34
        }
1461
        modify_headers_(headers);
1461
      },
2795
      absl::nullopt, details);
2795
}
bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason,
2768
                             UpstreamRequest& upstream_request, TimeoutRetry is_timeout_retry) {
  // We don't retry if we already started the response, don't have a retry policy defined,
  // or if we've already retried this upstream request (currently only possible if a per
  // try timeout occurred and hedge_on_per_try_timeout is enabled).
2768
  if (downstream_response_started_ || !retry_state_ || upstream_request.retried()) {
2599
    return false;
2599
  }
169
  RetryState::Http3Used was_using_http3 = RetryState::Http3Used::Unknown;
169
  if (upstream_request.hadUpstream()) {
106
    was_using_http3 = (upstream_request.streamInfo().protocol().has_value() &&
106
                       upstream_request.streamInfo().protocol().value() == Http::Protocol::Http3)
106
                          ? RetryState::Http3Used::Yes
106
                          : RetryState::Http3Used::No;
106
  }
  // If the current request in this router has sent data to the upstream, we consider the request
  // started.
169
  upstream_request_started_ |= upstream_request.streamInfo()
169
                                   .upstreamInfo()
169
                                   ->upstreamTiming()
169
                                   .first_upstream_tx_byte_sent_.has_value();
169
  const RetryStatus retry_status = retry_state_->shouldRetryReset(
169
      reset_reason, was_using_http3,
169
      [this, can_send_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_,
169
       can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
169
       is_timeout_retry](bool disable_http3) -> void {
        // This retry might be because of ConnectionFailure of 0-RTT handshake. In this case, though
        // the original request is retried with the same can_send_early_data setting, it will not be
        // sent as early data by the underlying connection pool grid.
67
        doRetry(can_send_early_data, disable_http3 ? false : can_use_http3, is_timeout_retry);
67
      },
169
      upstream_request_started_);
169
  if (retry_status == RetryStatus::Yes) {
88
    runRetryOptionsPredicates(upstream_request);
88
    pending_retries_++;
88
    if (upstream_request.upstreamHost()) {
87
      upstream_request.upstreamHost()->stats().rq_error_.inc();
87
    }
88
    auto request_ptr = upstream_request.removeFromList(upstream_requests_);
88
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
88
    return true;
141
  } else if (retry_status == RetryStatus::NoOverflow) {
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
81
  } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
16
    callbacks_->streamInfo().setResponseFlag(
16
        StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
16
  }
81
  return false;
169
}
void Filter::onUpstreamReset(Http::StreamResetReason reset_reason,
                             absl::string_view transport_failure_reason,
2670
                             UpstreamRequest& upstream_request) {
2670
  ENVOY_STREAM_LOG(debug, "upstream reset: reset reason: {}, transport failure reason: {}",
2670
                   *callbacks_, Http::Utility::resetReasonToString(reset_reason),
2670
                   transport_failure_reason);
2670
  const bool dropped = reset_reason == Http::StreamResetReason::Overflow;
  // Ignore upstream reset caused by a resource overflow.
  // Currently, circuit breakers can only produce this reset reason.
  // It means that this reason is cluster-wise, not upstream-related.
  // Therefore removing an upstream in the case of an overloaded cluster
  // would make the situation even worse.
  // https://github.com/envoyproxy/envoy/issues/25487
2670
  if (!dropped) {
    // TODO: The reset may also come from upstream over the wire. In this case it should be
    // treated as external origin error and distinguished from local origin error.
    // This matters only when running OutlierDetection with split_external_local_origin_errors
    // config param set to true.
2636
    updateOutlierDetection(Upstream::Outlier::Result::LocalOriginConnectFailed, upstream_request,
2636
                           absl::nullopt);
2636
  }
2670
  if (maybeRetryReset(reset_reason, upstream_request, TimeoutRetry::No)) {
64
    return;
64
  }
2606
  const Http::Code error_code = (reset_reason == Http::StreamResetReason::ProtocolError)
2606
                                    ? Http::Code::BadGateway
2606
                                    : Http::Code::ServiceUnavailable;
2606
  chargeUpstreamAbort(error_code, dropped, upstream_request);
2606
  auto request_ptr = upstream_request.removeFromList(upstream_requests_);
2606
  callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
  // If there are other in-flight requests that might see an upstream response,
  // don't return anything downstream.
2606
  if (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0) {
2
    return;
2
  }
2604
  const StreamInfo::CoreResponseFlag response_flags = streamResetReasonToResponseFlag(reset_reason);
2604
  const std::string body =
2604
      absl::StrCat("upstream connect error or disconnect/reset before headers. ",
2604
                   (is_retry_ ? "retried and the latest " : ""),
2604
                   "reset reason: ", Http::Utility::resetReasonToString(reset_reason),
2604
                   !transport_failure_reason.empty() ? ", transport failure reason: " : "",
2604
                   transport_failure_reason);
2604
  const std::string& basic_details =
2604
      downstream_response_started_ ? StreamInfo::ResponseCodeDetails::get().LateUpstreamReset
2604
                                   : StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset;
2604
  const std::string details = StringUtil::replaceAllEmptySpace(absl::StrCat(
2604
      basic_details, "{", Http::Utility::resetReasonToString(reset_reason),
2604
      transport_failure_reason.empty() ? "" : absl::StrCat("|", transport_failure_reason), "}"));
2604
  onUpstreamAbort(error_code, response_flags, body, dropped, details);
2604
}
void Filter::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
46730
                                    bool pool_success) {
46730
  if (retry_state_ && host) {
3399
    retry_state_->onHostAttempted(host);
3399
  }
46730
  if (!pool_success) {
293
    return;
293
  }
  // Track the attempted host in upstream info for access logging purposes.
46437
  if (host && callbacks_->streamInfo().upstreamInfo()) {
46437
    callbacks_->streamInfo().upstreamInfo()->addUpstreamHostAttempted(host);
46437
  }
46437
  if (request_vcluster_) {
    // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
    // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat
    // here.
237
    request_vcluster_->stats().upstream_rq_total_.inc();
237
  }
46437
  if (route_stats_context_.has_value()) {
    // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
    // callback. Hence, the upstream request increases the route level upstream_rq_total_ stat
    // here.
12
    route_stats_context_->stats().upstream_rq_total_.inc();
12
  }
46437
}
StreamInfo::CoreResponseFlag
5457
Filter::streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason) {
5457
  switch (reset_reason) {
88
  case Http::StreamResetReason::LocalConnectionFailure:
483
  case Http::StreamResetReason::RemoteConnectionFailure:
487
  case Http::StreamResetReason::ConnectionTimeout:
487
    return StreamInfo::CoreResponseFlag::UpstreamConnectionFailure;
3321
  case Http::StreamResetReason::ConnectionTermination:
3321
    return StreamInfo::CoreResponseFlag::UpstreamConnectionTermination;
5
  case Http::StreamResetReason::LocalReset:
5
  case Http::StreamResetReason::LocalRefusedStreamReset:
13
  case Http::StreamResetReason::Http1PrematureUpstreamHalfClose:
13
    return StreamInfo::CoreResponseFlag::LocalReset;
68
  case Http::StreamResetReason::Overflow:
68
    return StreamInfo::CoreResponseFlag::UpstreamOverflow;
1210
  case Http::StreamResetReason::RemoteReset:
1218
  case Http::StreamResetReason::RemoteRefusedStreamReset:
1218
  case Http::StreamResetReason::ConnectError:
1218
    return StreamInfo::CoreResponseFlag::UpstreamRemoteReset;
350
  case Http::StreamResetReason::ProtocolError:
350
    return StreamInfo::CoreResponseFlag::UpstreamProtocolError;
  case Http::StreamResetReason::OverloadManager:
    return StreamInfo::CoreResponseFlag::OverloadManager;
5457
  }
  PANIC_DUE_TO_CORRUPT_ENUM;
}
void Filter::handleNon5xxResponseHeaders(absl::optional<Grpc::Status::GrpcStatus> grpc_status,
                                         UpstreamRequest& upstream_request, bool end_stream,
39328
                                         uint64_t grpc_to_http_status) {
  // We need to defer gRPC success until after we have processed grpc-status in
  // the trailers.
39328
  if (grpc_request_) {
2220
    if (end_stream) {
170
      if (grpc_status && !Http::CodeUtility::is5xx(grpc_to_http_status)) {
118
        upstream_request.upstreamHost()->stats().rq_success_.inc();
166
      } else {
52
        upstream_request.upstreamHost()->stats().rq_error_.inc();
52
      }
2093
    } else {
2050
      upstream_request.grpcRqSuccessDeferred(true);
2050
    }
37772
  } else {
37108
    upstream_request.upstreamHost()->stats().rq_success_.inc();
37108
  }
39328
}
void Filter::onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers,
150
                                  UpstreamRequest& upstream_request) {
150
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
150
  chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
150
  ENVOY_STREAM_LOG(debug, "upstream 1xx ({}).", *callbacks_, response_code);
150
  downstream_response_started_ = true;
150
  final_upstream_request_ = &upstream_request;
150
  resetOtherUpstreams(upstream_request);
  // Don't send retries after 100-Continue has been sent on. Arguably we could attempt to do a
  // retry, assume the next upstream would also send an 100-Continue and swallow the second one
  // but it's sketchy (as the subsequent upstream might not send a 100-Continue) and not worth
  // the complexity until someone asks for it.
150
  retry_state_.reset();
150
  callbacks_->encode1xxHeaders(std::move(headers));
150
}
102784
void Filter::resetAll() {
109577
  while (!upstream_requests_.empty()) {
6793
    auto request_ptr = upstream_requests_.back()->removeFromList(upstream_requests_);
6793
    request_ptr->resetStream();
6793
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
6793
  }
102784
}
39776
void Filter::resetOtherUpstreams(UpstreamRequest& upstream_request) {
  // Pop each upstream request on the list and reset it if it's not the one
  // provided. At the end we'll move it back into the list.
39776
  UpstreamRequestPtr final_upstream_request;
79563
  while (!upstream_requests_.empty()) {
39787
    UpstreamRequestPtr upstream_request_tmp =
39787
        upstream_requests_.back()->removeFromList(upstream_requests_);
39787
    if (upstream_request_tmp.get() != &upstream_request) {
11
      upstream_request_tmp->resetStream();
      // TODO: per-host stat for hedge abandoned.
      // TODO: cluster stat for hedge abandoned.
39776
    } else {
39776
      final_upstream_request = std::move(upstream_request_tmp);
39776
    }
39787
  }
39776
  ASSERT(final_upstream_request);
  // Now put the final request back on this list.
39776
  LinkedList::moveIntoList(std::move(final_upstream_request), upstream_requests_);
39776
}
void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
39963
                               UpstreamRequest& upstream_request, bool end_stream) {
39963
  ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream);
39963
  ASSERT(!host_selection_cancelable_);
  // When grpc-status appears in response headers, convert grpc-status to HTTP status code
  // for outlier detection. This does not currently change any stats or logging and does not
  // handle the case when an error grpc-status is sent as a trailer.
39963
  absl::optional<Grpc::Status::GrpcStatus> grpc_status;
39963
  uint64_t grpc_to_http_status = 0;
39963
  uint64_t response_code_for_outlier_detection = response_code;
39963
  if (grpc_request_) {
2237
    grpc_status = Grpc::Common::getGrpcStatus(*headers);
2237
    if (grpc_status.has_value()) {
135
      grpc_to_http_status = Grpc::Utility::grpcToHttpStatus(grpc_status.value());
135
      response_code_for_outlier_detection = grpc_to_http_status;
135
    }
38399
  } else {
    // Check cluster's http_protocol_options if different code should be reported to
    // outlier detector.
37726
    absl::optional<bool> matched = cluster_->processHttpForOutlierDetection(*headers);
37726
    if (matched.has_value()) {
      // Outlier detector distinguishes only two values:
      // Anything >= 500 is error.
      // Anything < 500 is success.
28
      if (!matched.value()) {
        // Matcher returned non-match.
        // report success to outlier detector.
2
        response_code_for_outlier_detection = 200;
26
      } else {
        // Matcher returned match (treat the response as error).
        // If the original status code was error (>= 500), then report the
        // original status code to the outlier detector.
26
        if (response_code < 500) {
13
          response_code_for_outlier_detection = 500;
13
        }
26
      }
28
    }
37726
  }
39963
  maybeProcessOrcaLoadReport(*headers, upstream_request);
  // Check for degraded header
39963
  const bool is_degraded = headers->EnvoyDegraded() != nullptr;
  // Ejection has priority over degradation: 5xx errors always trigger ejection logic.
39963
  if (response_code_for_outlier_detection >= 500) {
485
    upstream_request.upstreamHost()->outlierDetector().putResult(
485
        Upstream::Outlier::Result::ExtOriginRequestFailed, response_code_for_outlier_detection);
39577
  } else if (is_degraded) {
    // Only mark as degraded if response is successful (not 5xx).
2
    upstream_request.upstreamHost()->outlierDetector().putResult(
2
        Upstream::Outlier::Result::ExtOriginRequestDegraded, response_code_for_outlier_detection);
39476
  } else {
39476
    upstream_request.upstreamHost()->outlierDetector().putResult(
39476
        Upstream::Outlier::Result::ExtOriginRequestSuccess, response_code_for_outlier_detection);
39476
  }
39963
  if (headers->EnvoyImmediateHealthCheckFail() != nullptr) {
1
    upstream_request.upstreamHost()->healthChecker().setUnhealthy(
1
        Upstream::HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail);
1
  }
39963
  bool could_not_retry = false;
  // Check if this upstream request was already retried, for instance after
  // hitting a per try timeout. Don't retry it if we already have.
39963
  if (retry_state_) {
3019
    if (upstream_request.retried()) {
      // We already retried this request (presumably for a per try timeout) so
      // we definitely won't retry it again. Check if we would have retried it
      // if we could.
11
      bool retry_as_early_data; // Not going to be used as we are not retrying.
11
      could_not_retry = retry_state_->wouldRetryFromHeaders(*headers, *downstream_headers_,
11
                                                            retry_as_early_data) !=
11
                        RetryState::RetryDecision::NoRetry;
3008
    } else {
3008
      const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
3008
          *headers, *downstream_headers_,
3008
          [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
3008
           had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
3008
              bool disable_early_data) -> void {
161
            doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
161
          });
3008
      if (retry_status == RetryStatus::Yes) {
174
        runRetryOptionsPredicates(upstream_request);
174
        pending_retries_++;
174
        upstream_request.upstreamHost()->stats().rq_error_.inc();
174
        Http::CodeStats& code_stats = httpContext().codeStats();
174
        code_stats.chargeBasicResponseStat(cluster_->statsScope(), stats_.stat_names_.retry_,
174
                                           static_cast<Http::Code>(response_code),
174
                                           exclude_http_code_stats_);
174
        if (!end_stream || !upstream_request.encodeComplete()) {
130
          upstream_request.resetStream();
130
        }
174
        auto request_ptr = upstream_request.removeFromList(upstream_requests_);
174
        callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
174
        return;
2839
      } else if (retry_status == RetryStatus::NoOverflow) {
6
        callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
6
        could_not_retry = true;
2828
      } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
3
        callbacks_->streamInfo().setResponseFlag(
3
            StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
3
        could_not_retry = true;
3
      }
3008
    }
3019
  }
39789
  if (route_entry_->internalRedirectPolicy().enabled() &&
39789
      route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode(
216
          static_cast<Http::Code>(response_code)) &&
39789
      setupRedirect(*headers)) {
159
    return;
    // If the redirect could not be handled, fail open and let it pass to the
    // next downstream.
159
  }
  // Check if we got a "bad" response, but there are still upstream requests in
  // flight awaiting headers or scheduled retries. If so, exit to give them a
  // chance to return before returning a response downstream.
39630
  if (could_not_retry && (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0)) {
4
    upstream_request.upstreamHost()->stats().rq_error_.inc();
    // Reset the stream because there are other in-flight requests that we'll
    // wait around for and we're not interested in consuming any body/trailers.
4
    auto request_ptr = upstream_request.removeFromList(upstream_requests_);
4
    request_ptr->resetStream();
4
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
4
    return;
4
  }
  // Make sure any retry timers are destroyed since we may not call cleanup() if end_stream is
  // false.
39626
  if (retry_state_) {
2833
    retry_state_.reset();
2833
  }
  // Only send upstream service time if we received the complete request and this is not a
  // premature response.
39626
  if (DateUtil::timePointValid(downstream_request_complete_time_)) {
36268
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
36268
    MonotonicTime response_received_time = dispatcher.timeSource().monotonicTime();
36268
    std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(
36268
        response_received_time - downstream_request_complete_time_);
36268
    if (!config_->suppress_envoy_headers_) {
36242
      headers->setEnvoyUpstreamServiceTime(ms.count());
36242
    }
36268
  }
39626
  upstream_request.upstreamCanary(
39626
      (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") ||
39626
      upstream_request.upstreamHost()->canary());
39626
  chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
39626
  if (!Http::CodeUtility::is5xx(response_code)) {
39328
    handleNon5xxResponseHeaders(grpc_status, upstream_request, end_stream, grpc_to_http_status);
39328
  }
  // Append routing cookies
39626
  for (const auto& header_value : downstream_set_cookies_) {
454
    headers->addReferenceKey(Http::Headers::get().SetCookie, header_value);
454
  }
39626
  callbacks_->streamInfo().setResponseCodeDetails(
39626
      StreamInfo::ResponseCodeDetails::get().ViaUpstream);
39626
  callbacks_->streamInfo().setResponseCode(response_code);
39626
  downstream_response_started_ = true;
39626
  final_upstream_request_ = &upstream_request;
  // Make sure that for request hedging, we end up with the correct final upstream info.
39626
  callbacks_->streamInfo().setUpstreamInfo(final_upstream_request_->streamInfo().upstreamInfo());
39626
  resetOtherUpstreams(upstream_request);
  // Modify response headers after we have set the final upstream info because we may need to
  // modify the headers based on the upstream host.
39626
  const Formatter::Context formatter_context(downstream_headers_, headers.get(), {}, {}, {},
39626
                                             &callbacks_->activeSpan());
39626
  route_entry_->finalizeResponseHeaders(*headers, formatter_context, callbacks_->streamInfo());
39626
  modify_headers_(*headers);
39626
  if (end_stream) {
24146
    onUpstreamComplete(upstream_request);
24146
  }
39626
  callbacks_->encodeHeaders(std::move(headers), end_stream,
39626
                            StreamInfo::ResponseCodeDetails::get().ViaUpstream);
39626
}
void Filter::onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request,
173482
                            bool end_stream) {
  // When route retry policy is configured and an upstream filter is returning StopIteration
  // in it's encodeHeaders() method, upstream_requests_.size() is equal to 0 in this case,
  // and we should just return.
173482
  if (upstream_requests_.size() == 0) {
3
    return;
3
  }
  // Other than above case, this should be true because when we saw headers we
  // either reset the stream (hence wouldn't have made it to onUpstreamData) or
  // all other in-flight streams.
173479
  ASSERT(upstream_requests_.size() == 1);
173479
  if (end_stream) {
    // gRPC request termination without trailers is an error.
12455
    if (upstream_request.grpcRqSuccessDeferred()) {
52
      upstream_request.upstreamHost()->stats().rq_error_.inc();
52
    }
12455
    onUpstreamComplete(upstream_request);
12455
  }
173479
  callbacks_->encodeData(data, end_stream);
173479
}
void Filter::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers,
768
                                UpstreamRequest& upstream_request) {
  // When route retry policy is configured and an upstream filter is returning StopIteration
  // in it's encodeHeaders() method, upstream_requests_.size() is equal to 0 in this case,
  // and we should just return.
768
  if (upstream_requests_.size() == 0) {
3
    return;
3
  }
  // Other than above case, this should be true because when we saw headers we
  // either reset the stream (hence wouldn't have made it to onUpstreamTrailers) or
  // all other in-flight streams.
765
  ASSERT(upstream_requests_.size() == 1);
765
  if (upstream_request.grpcRqSuccessDeferred()) {
412
    absl::optional<Grpc::Status::GrpcStatus> grpc_status = Grpc::Common::getGrpcStatus(*trailers);
412
    if (grpc_status &&
412
        !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) {
352
      upstream_request.upstreamHost()->stats().rq_success_.inc();
401
    } else {
60
      upstream_request.upstreamHost()->stats().rq_error_.inc();
60
    }
412
  }
765
  maybeProcessOrcaLoadReport(*trailers, upstream_request);
765
  onUpstreamComplete(upstream_request);
765
  callbacks_->encodeTrailers(std::move(trailers));
765
}
1110
void Filter::onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) {
1110
  callbacks_->encodeMetadata(std::move(metadata_map));
1110
}
37366
void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) {
37366
  if (!downstream_end_stream_) {
412
    if (allow_multiplexed_upstream_half_close_) {
      // Continue request if downstream is not done yet.
50
      return;
50
    }
362
    upstream_request.resetStream();
362
  }
37316
  Event::Dispatcher& dispatcher = callbacks_->dispatcher();
37316
  std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
37316
      dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
37316
  Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
37316
  if (tb_stats.has_value()) {
200
    tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
200
        FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
200
  }
37316
  if (config_->emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck() &&
37316
      DateUtil::timePointValid(downstream_request_complete_time_)) {
36027
    upstream_request.upstreamHost()->outlierDetector().putResponseTime(response_time);
36027
    const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
36027
    Http::CodeStats& code_stats = httpContext().codeStats();
36027
    Http::CodeStats::ResponseTimingInfo info{
36027
        config_->scope_,
36027
        cluster_->statsScope(),
36027
        config_->empty_stat_name_,
36027
        response_time,
36027
        upstream_request.upstreamCanary(),
36027
        internal_request,
36027
        route_->virtualHost()->statName(),
36027
        request_vcluster_ ? request_vcluster_->statName() : config_->empty_stat_name_,
36027
        route_stats_context_.has_value() ? route_stats_context_->statName()
36027
                                         : config_->empty_stat_name_,
36027
        config_->zone_name_,
36027
        upstreamZone(upstream_request.upstreamHost())};
36027
    code_stats.chargeResponseTiming(info);
36027
    if (alt_stat_prefix_ != nullptr) {
3
      Http::CodeStats::ResponseTimingInfo info{config_->scope_,
3
                                               cluster_->statsScope(),
3
                                               alt_stat_prefix_->statName(),
3
                                               response_time,
3
                                               upstream_request.upstreamCanary(),
3
                                               internal_request,
3
                                               config_->empty_stat_name_,
3
                                               config_->empty_stat_name_,
3
                                               config_->empty_stat_name_,
3
                                               config_->zone_name_,
3
                                               upstreamZone(upstream_request.upstreamHost())};
3
      code_stats.chargeResponseTiming(info);
3
    }
36027
  }
  // Defer deletion as this is generally called under the stack of the upstream
  // request, and immediate deletion is dangerous.
37316
  callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_));
37316
  cleanup();
37316
}
216
bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers) {
216
  ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_);
216
  const Http::HeaderEntry* location = headers.Location();
216
  const uint64_t status_code = Http::Utility::getResponseStatus(headers);
  // Redirects are not supported for streaming requests yet.
216
  if (downstream_end_stream_ && (!request_buffer_overflowed_ || !callbacks_->decodingBuffer()) &&
216
      location != nullptr &&
216
      convertRequestHeadersForInternalRedirect(*downstream_headers_, headers, *location,
198
                                               status_code) &&
216
      callbacks_->recreateStream(&headers)) {
159
    ENVOY_STREAM_LOG(debug, "Internal redirect succeeded", *callbacks_);
159
    cluster_->trafficStats()->upstream_internal_redirect_succeeded_total_.inc();
159
    return true;
159
  }
  // convertRequestHeadersForInternalRedirect logs failure reasons but log
  // details for other failure modes here.
57
  if (!downstream_end_stream_) {
9
    ENVOY_STREAM_LOG(debug, "Internal redirect failed: request incomplete", *callbacks_);
49
  } else if (request_buffer_overflowed_) {
8
    ENVOY_STREAM_LOG(debug, "Internal redirect failed: request body overflow", *callbacks_);
40
  } else if (location == nullptr) {
1
    ENVOY_STREAM_LOG(debug, "Internal redirect failed: missing location header", *callbacks_);
1
  }
57
  cluster_->trafficStats()->upstream_internal_redirect_failed_total_.inc();
57
  return false;
216
}
bool Filter::convertRequestHeadersForInternalRedirect(
    Http::RequestHeaderMap& downstream_headers, const Http::ResponseHeaderMap& upstream_headers,
198
    const Http::HeaderEntry& internal_redirect, uint64_t status_code) {
198
  if (!downstream_headers.Path()) {
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: no path in downstream_headers", *callbacks_);
    return false;
  }
198
  absl::string_view redirect_url = internal_redirect.value().getStringView();
  // Make sure the redirect response contains a URL to redirect to.
198
  if (redirect_url.empty()) {
1
    stats_.passthrough_internal_redirect_bad_location_.inc();
1
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: empty location", *callbacks_);
1
    return false;
1
  }
197
  Http::Utility::Url absolute_url;
197
  if (!absolute_url.initialize(redirect_url, false)) {
9
    stats_.passthrough_internal_redirect_bad_location_.inc();
9
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: invalid location {}", *callbacks_,
9
                     redirect_url);
9
    return false;
9
  }
188
  const auto& policy = route_entry_->internalRedirectPolicy();
  // Don't change the scheme from the original request
188
  const bool scheme_is_http = schemeIsHttp(downstream_headers, callbacks_->connection());
188
  const bool target_is_http = Http::Utility::schemeIsHttp(absolute_url.scheme());
188
  if (!policy.isCrossSchemeRedirectAllowed() && scheme_is_http != target_is_http) {
1
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: incorrect scheme for {}", *callbacks_,
1
                     redirect_url);
1
    stats_.passthrough_internal_redirect_unsafe_scheme_.inc();
1
    return false;
1
  }
187
  const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
  // Make sure that performing the redirect won't result in exceeding the configured number of
  // redirects allowed for this route.
187
  StreamInfo::UInt32Accessor* num_internal_redirect{};
187
  if (num_internal_redirect = filter_state->getDataMutable<StreamInfo::UInt32Accessor>(
187
          NumInternalRedirectsFilterStateName);
187
      num_internal_redirect == nullptr) {
124
    auto state = std::make_shared<StreamInfo::UInt32AccessorImpl>(0);
124
    num_internal_redirect = state.get();
124
    filter_state->setData(NumInternalRedirectsFilterStateName, std::move(state),
124
                          StreamInfo::FilterState::StateType::Mutable,
124
                          StreamInfo::FilterState::LifeSpan::Request);
124
  }
187
  if (num_internal_redirect->value() >= policy.maxInternalRedirects()) {
9
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: redirect limits exceeded.", *callbacks_);
9
    stats_.passthrough_internal_redirect_too_many_redirects_.inc();
9
    return false;
9
  }
  // Copy the old values, so they can be restored if the redirect fails.
178
  const bool scheme_is_set = (downstream_headers.Scheme() != nullptr);
178
  std::unique_ptr<Http::RequestHeaderMapImpl> saved_headers = Http::RequestHeaderMapImpl::create();
178
  Http::RequestHeaderMapImpl::copyFrom(*saved_headers, downstream_headers);
178
  for (const Http::LowerCaseString& header :
178
       route_entry_->internalRedirectPolicy().responseHeadersToCopy()) {
18
    Http::HeaderMap::GetResult result = upstream_headers.get(header);
18
    Http::HeaderMap::GetResult downstream_result = downstream_headers.get(header);
18
    if (result.empty()) {
      // Clear headers if present, else do nothing:
9
      if (downstream_result.empty()) {
        continue;
      }
9
      downstream_headers.remove(header);
9
    } else {
      // The header exists in the response, copy into the downstream headers
9
      if (!downstream_result.empty()) {
        downstream_headers.remove(header);
      }
18
      for (size_t idx = 0; idx < result.size(); idx++) {
9
        downstream_headers.addCopy(header, result[idx]->value().getStringView());
9
      }
9
    }
18
  }
178
  Cleanup restore_original_headers(
178
      [&downstream_headers, scheme_is_set, scheme_is_http, &saved_headers]() {
19
        downstream_headers.clear();
19
        if (scheme_is_set) {
19
          downstream_headers.setScheme(scheme_is_http ? Http::Headers::get().SchemeValues.Http
19
                                                      : Http::Headers::get().SchemeValues.Https);
19
        }
19
        Http::RequestHeaderMapImpl::copyFrom(downstream_headers, *saved_headers);
19
      });
  // Replace the original host, scheme and path.
178
  downstream_headers.setScheme(absolute_url.scheme());
178
  downstream_headers.setHost(absolute_url.hostAndPort());
178
  auto path_and_query = absolute_url.pathAndQueryParams();
178
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http_reject_path_with_fragment")) {
    // Envoy treats internal redirect as a new request and will reject it if URI path
    // contains #fragment. However the Location header is allowed to have #fragment in URI path. To
    // prevent Envoy from rejecting internal redirect, strip the #fragment from Location URI if it
    // is present.
177
    auto fragment_pos = path_and_query.find('#');
177
    path_and_query = path_and_query.substr(0, fragment_pos);
177
  }
178
  downstream_headers.setPath(path_and_query);
  // Only clear the route cache if there are downstream callbacks. There aren't, for example,
  // for async connections.
178
  if (callbacks_->downstreamCallbacks()) {
178
    callbacks_->downstreamCallbacks()->clearRouteCache();
178
  }
178
  const auto route = callbacks_->route();
  // Don't allow a redirect to a non existing route.
178
  if (!route) {
    stats_.passthrough_internal_redirect_no_route_.inc();
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: no route found", *callbacks_);
    return false;
  }
178
  const auto& route_name = route->routeName();
178
  for (const auto& predicate : policy.predicates()) {
37
    if (!predicate->acceptTargetRoute(*filter_state, route_name, !scheme_is_http,
37
                                      !target_is_http)) {
19
      stats_.passthrough_internal_redirect_predicate_.inc();
19
      ENVOY_STREAM_LOG(trace,
19
                       "Internal redirect failed: rejecting redirect targeting {}, by {} predicate",
19
                       *callbacks_, route_name, predicate->name());
19
      return false;
19
    }
37
  }
  // See https://tools.ietf.org/html/rfc7231#section-6.4.4.
159
  if (status_code == enumToInt(Http::Code::SeeOther) &&
159
      downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Get &&
159
      downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Head) {
8
    downstream_headers.setMethod(Http::Headers::get().MethodValues.Get);
8
    downstream_headers.remove(Http::Headers::get().ContentLength);
8
    callbacks_->modifyDecodingBuffer([](Buffer::Instance& data) { data.drain(data.length()); });
8
  }
159
  num_internal_redirect->increment();
159
  restore_original_headers.cancel();
  // Preserve the original request URL for the second pass.
159
  downstream_headers.setEnvoyOriginalUrl(
159
      absl::StrCat(scheme_is_http ? Http::Headers::get().SchemeValues.Http
159
                                  : Http::Headers::get().SchemeValues.Https,
159
                   "://", saved_headers->getHostValue(), saved_headers->getPathValue()));
159
  return true;
178
}
280
void Filter::runRetryOptionsPredicates(UpstreamRequest& retriable_request) {
280
  for (const auto& options_predicate : getEffectiveRetryPolicy()->retryOptionsPredicates()) {
5
    const Upstream::RetryOptionsPredicate::UpdateOptionsParameters parameters{
5
        retriable_request.streamInfo(), upstreamSocketOptions()};
5
    auto ret = options_predicate->updateOptions(parameters);
5
    if (ret.new_upstream_socket_options_.has_value()) {
3
      upstream_options_ = ret.new_upstream_socket_options_.value();
3
    }
5
  }
280
}
246
void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) {
246
  ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_);
246
  is_retry_ = true;
246
  attempt_count_++;
246
  callbacks_->streamInfo().setAttemptCount(attempt_count_);
246
  ASSERT(pending_retries_ > 0);
246
  pending_retries_--;
246
  if (host_selection_cancelable_) {
    // If there was a timeout during host selection, cancel this selection
    // attempt before potentially creating a new one.
    host_selection_cancelable_->cancel();
    host_selection_cancelable_.reset();
  }
  // Clusters can technically get removed by CDS during a retry. Make sure it still exists.
246
  const auto cluster = config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
246
  std::unique_ptr<GenericConnPool> generic_conn_pool;
246
  if (cluster == nullptr) {
    sendNoHealthyUpstreamResponse({});
    cleanup();
    return;
  }
246
  callbacks_->streamInfo().downstreamTiming().setValue(
246
      "envoy.router.host_selection_start_ms",
246
      callbacks_->dispatcher().timeSource().monotonicTime());
246
  auto host_selection_response = cluster->chooseHost(this);
246
  if (!host_selection_response.cancelable ||
246
      !Runtime::runtimeFeatureEnabled("envoy.reloadable_features.async_host_selection")) {
121
    if (host_selection_response.cancelable) {
      host_selection_response.cancelable->cancel();
    }
    // This branch handles the common case of synchronous host selection, as
    // well as handling unsupported asynchronous host selection (by treating it
    // as host selection failure).
121
    continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry,
121
                    std::move(host_selection_response.host), *cluster,
121
                    host_selection_response.details);
121
  }
246
  ENVOY_STREAM_LOG(debug, "Handling asynchronous host selection for retry\n", *callbacks_);
  // Again latch the cancel handle, and set up the callback to be called when host
  // selection is complete.
246
  host_selection_cancelable_ = std::move(host_selection_response.cancelable);
246
  on_host_selected_ =
246
      ([this, can_send_early_data, can_use_http3, is_timeout_retry, cluster](
246
           Upstream::HostConstSharedPtr&& host, absl::string_view host_selection_details) -> void {
125
        continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry, std::move(host),
125
                        *cluster, host_selection_details);
125
      });
246
}
void Filter::continueDoRetry(bool can_send_early_data, bool can_use_http3,
                             TimeoutRetry is_timeout_retry, Upstream::HostConstSharedPtr&& host,
                             Upstream::ThreadLocalCluster& cluster,
246
                             absl::string_view host_selection_details) {
246
  callbacks_->streamInfo().downstreamTiming().setValue(
246
      "envoy.router.host_selection_end_ms", callbacks_->dispatcher().timeSource().monotonicTime());
246
  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(cluster, host);
246
  if (!generic_conn_pool) {
2
    sendNoHealthyUpstreamResponse(host_selection_details);
2
    cleanup();
2
    return;
2
  }
244
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
244
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3,
244
      allow_multiplexed_upstream_half_close_ /*enable_half_close*/);
244
  if (include_attempt_count_in_request_) {
28
    downstream_headers_->setEnvoyAttemptCount(attempt_count_);
28
  }
244
  if (include_timeout_retry_header_in_request_) {
4
    downstream_headers_->setEnvoyIsTimeoutRetry(is_timeout_retry == TimeoutRetry::Yes ? "true"
4
                                                                                      : "false");
4
  }
  // The request timeouts only account for time elapsed since the downstream request completed
  // which might not have happened yet, in which case zero time has elapsed.
244
  std::chrono::milliseconds elapsed_time = std::chrono::milliseconds::zero();
244
  if (DateUtil::timePointValid(downstream_request_complete_time_)) {
162
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
162
    elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(
162
        dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
162
  }
244
  FilterUtility::setTimeoutHeaders(elapsed_time.count(), timeout_, *route_entry_,
244
                                   *downstream_headers_, !config_->suppress_envoy_headers_,
244
                                   grpc_request_, hedging_params_.hedge_on_per_try_timeout_);
244
  UpstreamRequest* upstream_request_tmp = upstream_request.get();
244
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
244
  upstream_requests_.front()->acceptHeadersFromRouter(
244
      !callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_);
  // It's possible we got immediately reset which means the upstream request we just
  // added to the front of the list might have been removed, so we need to check to make
  // sure we don't send data on the wrong request.
244
  if (!upstream_requests_.empty() && (upstream_requests_.front().get() == upstream_request_tmp)) {
243
    if (callbacks_->decodingBuffer()) {
      // If we are doing a retry we need to make a copy.
160
      Buffer::OwnedImpl copy(*callbacks_->decodingBuffer());
160
      upstream_requests_.front()->acceptDataFromRouter(copy, !downstream_trailers_ &&
160
                                                                 downstream_end_stream_);
160
    }
243
    if (downstream_trailers_) {
6
      upstream_requests_.front()->acceptTrailersFromRouter(*downstream_trailers_);
6
    }
243
  }
244
}
2620
uint32_t Filter::numRequestsAwaitingHeaders() {
2620
  return std::count_if(upstream_requests_.begin(), upstream_requests_.end(),
2620
                       [](const auto& req) -> bool { return req->awaitingHeaders(); });
2620
}
47535
bool Filter::checkDropOverload(Upstream::ThreadLocalCluster& cluster) {
47535
  if (cluster.dropOverload().value()) {
15
    ENVOY_STREAM_LOG(debug, "Router filter: cluster DROP_OVERLOAD configuration: {}", *callbacks_,
15
                     cluster.dropOverload().value());
15
    if (cluster.dropOverload().value() == 1.0) {
13
      ENVOY_STREAM_LOG(
13
          debug, "The configured DROP_OVERLOAD ratio is 100%, drop everything unconditionally.",
13
          *callbacks_);
13
      callbacks_->streamInfo().setResponseFlag(
13
          StreamInfo::CoreResponseFlag::UnconditionalDropOverload);
13
      chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, true);
13
      callbacks_->sendLocalReply(
13
          Http::Code::ServiceUnavailable, "unconditional drop overload",
13
          [this](Http::ResponseHeaderMap& headers) {
13
            if (!config_->suppress_envoy_headers_) {
13
              headers.addReference(Http::Headers::get().EnvoyUnconditionalDropOverload,
13
                                   Http::Headers::get().EnvoyUnconditionalDropOverloadValues.True);
13
            }
13
            modify_headers_(headers);
13
          },
13
          absl::nullopt, StreamInfo::ResponseCodeDetails::get().UnconditionalDropOverload);
13
      cluster.info()->loadReportStats().upstream_rq_drop_overload_.inc();
13
      return true;
13
    }
2
    if (config_->random_.bernoulli(cluster.dropOverload())) {
1
      ENVOY_STREAM_LOG(debug, "The request is dropped by DROP_OVERLOAD", *callbacks_);
1
      callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::DropOverLoad);
1
      chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, true);
1
      callbacks_->sendLocalReply(
1
          Http::Code::ServiceUnavailable, "drop overload",
1
          [this](Http::ResponseHeaderMap& headers) {
1
            if (!config_->suppress_envoy_headers_) {
1
              headers.addReference(Http::Headers::get().EnvoyDropOverload,
1
                                   Http::Headers::get().EnvoyDropOverloadValues.True);
1
            }
1
            modify_headers_(headers);
1
          },
1
          absl::nullopt, StreamInfo::ResponseCodeDetails::get().DropOverload);
1
      cluster.info()->loadReportStats().upstream_rq_drop_overload_.inc();
1
      return true;
1
    }
2
  }
47521
  return false;
47535
}
void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or_trailers,
40728
                                        UpstreamRequest& upstream_request) {
  // Process the load report only once, so if response has report in headers,
  // then don't process it in trailers.
40728
  if (orca_load_report_received_) {
1
    return;
1
  }
  // Check whether we need to send the load report to the LRS or invoke the ORCA
  // callbacks.
40727
  Upstream::HostDescriptionOptConstRef upstream_host = upstream_request.upstreamHost();
  // The upstream host should always be available because the upstream request has got
  // the response headers/trailers from the upstream host.
40727
  ASSERT(upstream_host.has_value(), "upstream host is not available for upstream request");
40727
  OptRef<Upstream::HostLbPolicyData> host_lb_policy_data = upstream_host->lbPolicyData();
40727
  if (!cluster_->lrsReportMetricNames().has_value() && !host_lb_policy_data.has_value()) {
    // If the cluster doesn't have LRS metric names configured then there is no need to
    // extract the stats for LRS.
    // If the host doesn't have LB policy data then that means the LB policy doesn't care
    // about the ORCA load report.
    // Return early here to avoid parsing the ORCA load report because no one is interested
    // in it.
36913
    return;
36913
  }
3814
  absl::StatusOr<xds::data::orca::v3::OrcaLoadReport> orca_load_report =
3814
      Envoy::Orca::parseOrcaLoadReportHeaders(headers_or_trailers);
3814
  if (!orca_load_report.ok()) {
1125
    ENVOY_STREAM_LOG(trace, "Headers don't have orca load report: {}", *callbacks_,
1125
                     orca_load_report.status().message());
1125
    return;
1125
  }
2689
  orca_load_report_received_ = true;
2689
  if (cluster_->lrsReportMetricNames().has_value()) {
37
    ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_,
37
                     orca_load_report->DebugString());
37
    Envoy::Orca::addOrcaLoadReportToLoadMetricStats(
37
        *cluster_->lrsReportMetricNames(), *orca_load_report, upstream_host->loadMetricStats());
37
  }
2689
  if (host_lb_policy_data.has_value()) {
2652
    ENVOY_STREAM_LOG(trace, "orca_load_report for {} report = {}", *callbacks_,
2652
                     upstream_host->address()->asString(), orca_load_report->DebugString());
2652
    const absl::Status status =
2652
        host_lb_policy_data->onOrcaLoadReport(*orca_load_report, callbacks_->streamInfo());
2652
    if (!status.ok()) {
1
      ENVOY_LOG_PERIODIC(error, std::chrono::seconds(10),
1
                         "LB policy onOrcaLoadReport failed: {} for load report {}",
1
                         status.message(), orca_load_report->DebugString());
1
    }
2652
  }
2689
}
47266
const Router::RetryPolicy* Filter::getEffectiveRetryPolicy() const {
  // Use cluster-level retry policy if available. The most specific policy wins.
  // If no cluster-level policy is configured, fall back to route-level policy.
47266
  const Router::RetryPolicy* retry_policy = cluster_->httpProtocolOptions().retryPolicy();
47266
  if (retry_policy == nullptr) {
47262
    retry_policy = route_entry_->retryPolicy().get();
47262
  }
47266
  return retry_policy;
47266
}
RetryStatePtr
ProdFilter::createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers,
                             const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster,
                             RouteStatsContextOptRef route_stats_context,
                             Server::Configuration::CommonFactoryContext& context,
46763
                             Event::Dispatcher& dispatcher, Upstream::ResourcePriority priority) {
46763
  std::unique_ptr<RetryStateImpl> retry_state =
46763
      RetryStateImpl::create(policy, request_headers, cluster, vcluster, route_stats_context,
46763
                             context, dispatcher, priority);
46763
  if (retry_state != nullptr && retry_state->isAutomaticallyConfiguredForHttp3()) {
    // Since doing retry will make Envoy to buffer the request body, if upstream using HTTP/3 is the
    // only reason for doing retry, set the buffer limit to 0 so that we don't retry or
    // buffer safe requests with body which is not common.
671
    setRequestBodyBufferLimit(0);
671
  }
46763
  return retry_state;
46763
}
} // namespace Router
} // namespace Envoy