1
#include "source/common/router/router.h"
2

            
3
#include <algorithm>
4
#include <chrono>
5
#include <cstdint>
6
#include <functional>
7
#include <memory>
8
#include <string>
9

            
10
#include "envoy/event/dispatcher.h"
11
#include "envoy/event/timer.h"
12
#include "envoy/grpc/status.h"
13
#include "envoy/http/conn_pool.h"
14
#include "envoy/runtime/runtime.h"
15
#include "envoy/upstream/cluster_manager.h"
16
#include "envoy/upstream/health_check_host_monitor.h"
17
#include "envoy/upstream/upstream.h"
18

            
19
#include "source/common/access_log/access_log_impl.h"
20
#include "source/common/common/assert.h"
21
#include "source/common/common/cleanup.h"
22
#include "source/common/common/enum_to_int.h"
23
#include "source/common/common/utility.h"
24
#include "source/common/config/utility.h"
25
#include "source/common/grpc/common.h"
26
#include "source/common/http/codes.h"
27
#include "source/common/http/header_map_impl.h"
28
#include "source/common/http/headers.h"
29
#include "source/common/http/message_impl.h"
30
#include "source/common/http/utility.h"
31
#include "source/common/network/transport_socket_options_impl.h"
32
#include "source/common/network/upstream_server_name.h"
33
#include "source/common/network/upstream_socket_options_filter_state.h"
34
#include "source/common/network/upstream_subject_alt_names.h"
35
#include "source/common/orca/orca_load_metrics.h"
36
#include "source/common/orca/orca_parser.h"
37
#include "source/common/router/debug_config.h"
38
#include "source/common/router/retry_state_impl.h"
39
#include "source/common/runtime/runtime_features.h"
40
#include "source/common/stream_info/uint32_accessor_impl.h"
41

            
42
namespace Envoy {
43
namespace Router {
44
namespace {
45
constexpr absl::string_view NumInternalRedirectsFilterStateName = "num_internal_redirects";
46

            
47
186112
uint32_t getLength(const Buffer::Instance* instance) { return instance ? instance->length() : 0; }
48

            
49
bool schemeIsHttp(const Http::RequestHeaderMap& downstream_headers,
50
188
                  OptRef<const Network::Connection> connection) {
51
188
  if (Http::Utility::schemeIsHttp(downstream_headers.getSchemeValue())) {
52
186
    return true;
53
186
  }
54
2
  if (connection.has_value() && !connection->ssl()) {
55
    return true;
56
  }
57
2
  return false;
58
2
}
59

            
60
constexpr uint64_t TimeoutPrecisionFactor = 100;
61

            
62
} // namespace
63

            
64
absl::StatusOr<std::unique_ptr<FilterConfig>>
65
FilterConfig::create(Stats::StatName stat_prefix, Server::Configuration::FactoryContext& context,
66
                     ShadowWriterPtr&& shadow_writer,
67
9867
                     const envoy::extensions::filters::http::router::v3::Router& config) {
68
9867
  absl::Status creation_status = absl::OkStatus();
69
9867
  auto ret = std::unique_ptr<FilterConfig>(
70
9867
      new FilterConfig(stat_prefix, context, std::move(shadow_writer), config, creation_status));
71
9867
  RETURN_IF_NOT_OK(creation_status);
72
9867
  return ret;
73
9867
}
74

            
75
FilterConfig::FilterConfig(Stats::StatName stat_prefix,
76
                           Server::Configuration::FactoryContext& context,
77
                           ShadowWriterPtr&& shadow_writer,
78
                           const envoy::extensions::filters::http::router::v3::Router& config,
79
                           absl::Status& creation_status)
80
9867
    : FilterConfig(
81
9867
          context.serverFactoryContext(), stat_prefix, context.scope(),
82
9867
          context.serverFactoryContext().clusterManager(), context.serverFactoryContext().runtime(),
83
9867
          context.serverFactoryContext().api().randomGenerator(), std::move(shadow_writer),
84
9867
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, dynamic_stats, true), config.start_child_span(),
85
9867
          config.suppress_envoy_headers(), config.respect_expected_rq_timeout(),
86
9867
          config.suppress_grpc_request_failure_code_stats(),
87
9867
          config.has_upstream_log_options()
88
9867
              ? config.upstream_log_options().flush_upstream_log_on_upstream_stream()
89
9867
              : false,
90
9867
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, reject_connect_request_early_data, false),
91
9867
          config.strict_check_headers(), context.serverFactoryContext().api().timeSource(),
92
9867
          context.serverFactoryContext().httpContext(),
93
9867
          context.serverFactoryContext().routerContext()) {
94
9867
  for (const auto& upstream_log : config.upstream_log()) {
95
19
    upstream_logs_.push_back(AccessLog::AccessLogFactory::fromProto(upstream_log, context));
96
19
  }
97

            
98
9867
  if (config.has_upstream_log_options() &&
99
9867
      config.upstream_log_options().has_upstream_log_flush_interval()) {
100
2
    upstream_log_flush_interval_ = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
101
2
        config.upstream_log_options().upstream_log_flush_interval()));
102
2
  }
103

            
104
9867
  if (!config.upstream_http_filters().empty()) {
105
    // TODO(wbpcode): To validate the terminal filter is upstream codec filter by the proto.
106
35
    Server::Configuration::ServerFactoryContext& server_factory_ctx =
107
35
        context.serverFactoryContext();
108
35
    std::shared_ptr<Http::UpstreamFilterConfigProviderManager> filter_config_provider_manager =
109
35
        Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
110
35
            server_factory_ctx);
111
35
    std::string prefix = context.scope().symbolTable().toString(context.scope().prefix());
112
35
    upstream_ctx_ = std::make_unique<Upstream::UpstreamFactoryContextImpl>(
113
35
        server_factory_ctx, context.initManager(), context.scope());
114
35
    Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
115
35
                            Server::Configuration::UpstreamHttpFilterConfigFactory>
116
35
        helper(*filter_config_provider_manager, server_factory_ctx,
117
35
               context.serverFactoryContext().clusterManager(), *upstream_ctx_, prefix);
118
35
    SET_AND_RETURN_IF_NOT_OK(helper.processFilters(config.upstream_http_filters(),
119
35
                                                   "router upstream http", "router upstream http",
120
35
                                                   upstream_http_filter_factories_),
121
35
                             creation_status);
122
35
  }
123
9867
}
124

            
125
// Express percentage as [0, TimeoutPrecisionFactor] because stats do not accept floating point
126
// values, and getting multiple significant figures on the histogram would be nice.
127
uint64_t FilterUtility::percentageOfTimeout(const std::chrono::milliseconds response_time,
128
595
                                            const std::chrono::milliseconds timeout) {
129
  // Timeouts of 0 are considered infinite. Any portion of an infinite timeout used is still
130
  // none of it.
131
595
  if (timeout.count() == 0) {
132
440
    return 0;
133
440
  }
134

            
135
155
  return static_cast<uint64_t>(response_time.count() * TimeoutPrecisionFactor / timeout.count());
136
595
}
137

            
138
void FilterUtility::setUpstreamScheme(Http::RequestHeaderMap& headers, bool downstream_ssl,
139
47851
                                      bool upstream_ssl, bool use_upstream) {
140
47851
  if (use_upstream) {
141
1048
    if (upstream_ssl) {
142
4
      headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https);
143
1044
    } else {
144
1044
      headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
145
1044
    }
146
1048
    return;
147
1048
  }
148

            
149
46803
  if (Http::Utility::schemeIsValid(headers.getSchemeValue())) {
150
44239
    return;
151
44239
  }
152
  // After all the changes in https://github.com/envoyproxy/envoy/issues/14587
153
  // this path should only occur if a buggy filter has removed the :scheme
154
  // header. In that case best-effort set from X-Forwarded-Proto.
155
2564
  absl::string_view xfp = headers.getForwardedProtoValue();
156
2564
  if (Http::Utility::schemeIsValid(xfp)) {
157
4
    headers.setScheme(xfp);
158
4
    return;
159
4
  }
160

            
161
2560
  if (downstream_ssl) {
162
2
    headers.setReferenceScheme(Http::Headers::get().SchemeValues.Https);
163
2559
  } else {
164
2558
    headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http);
165
2558
  }
166
2560
}
167

            
168
bool FilterUtility::shouldShadow(const ShadowPolicy& policy, Runtime::Loader& runtime,
169
103
                                 uint64_t stable_random) {
170

            
171
  // The policy's default value is set correctly regardless of whether there is a runtime key
172
  // or not, thus this call is sufficient for all cases (100% if no runtime set, otherwise
173
  // using the default value within the runtime fractional percent setting).
174
103
  return runtime.snapshot().featureEnabled(policy.runtimeKey(), policy.defaultValue(),
175
103
                                           stable_random);
176
103
}
177

            
178
TimeoutData FilterUtility::finalTimeout(const RouteEntry& route,
179
                                        Http::RequestHeaderMap& request_headers,
180
                                        bool insert_envoy_expected_request_timeout_ms,
181
                                        bool grpc_request, bool per_try_timeout_hedging_enabled,
182
47363
                                        bool respect_expected_rq_timeout) {
183
  // See if there is a user supplied timeout in a request header. If there is we take that.
184
  // Otherwise if the request is gRPC and a maximum gRPC timeout is configured we use the timeout
185
  // in the gRPC headers (or infinity when gRPC headers have no timeout), but cap that timeout to
186
  // the configured maximum gRPC timeout (which may also be infinity, represented by a 0 value),
187
  // or the default from the route config otherwise.
188
47363
  TimeoutData timeout;
189
47363
  if (!route.usingNewTimeouts()) {
190
47357
    if (grpc_request && route.maxGrpcTimeout()) {
191
14
      const std::chrono::milliseconds max_grpc_timeout = route.maxGrpcTimeout().value();
192
14
      auto header_timeout = Grpc::Common::getGrpcTimeout(request_headers);
193
14
      std::chrono::milliseconds grpc_timeout =
194
14
          header_timeout ? header_timeout.value() : std::chrono::milliseconds(0);
195
14
      if (route.grpcTimeoutOffset()) {
196
        // We only apply the offset if it won't result in grpc_timeout hitting 0 or below, as
197
        // setting it to 0 means infinity and a negative timeout makes no sense.
198
2
        const auto offset = *route.grpcTimeoutOffset();
199
2
        if (offset < grpc_timeout) {
200
1
          grpc_timeout -= offset;
201
1
        }
202
2
      }
203

            
204
      // Cap gRPC timeout to the configured maximum considering that 0 means infinity.
205
14
      if (max_grpc_timeout != std::chrono::milliseconds(0) &&
206
14
          (grpc_timeout == std::chrono::milliseconds(0) || grpc_timeout > max_grpc_timeout)) {
207
4
        grpc_timeout = max_grpc_timeout;
208
4
      }
209
14
      timeout.global_timeout_ = grpc_timeout;
210
47343
    } else {
211
47343
      timeout.global_timeout_ = route.timeout();
212
47343
    }
213
47357
  }
214
47363
  timeout.per_try_timeout_ = route.retryPolicy()->perTryTimeout();
215
47363
  timeout.per_try_idle_timeout_ = route.retryPolicy()->perTryIdleTimeout();
216

            
217
47363
  uint64_t header_timeout;
218

            
219
47363
  if (respect_expected_rq_timeout) {
220
    // Check if there is timeout set by egress Envoy.
221
    // If present, use that value as route timeout and don't override
222
    // *x-envoy-expected-rq-timeout-ms* header. At this point *x-envoy-upstream-rq-timeout-ms*
223
    // header should have been sanitized by egress Envoy.
224
5
    const Http::HeaderEntry* header_expected_timeout_entry =
225
5
        request_headers.EnvoyExpectedRequestTimeoutMs();
226
5
    if (header_expected_timeout_entry) {
227
3
      trySetGlobalTimeout(*header_expected_timeout_entry, timeout);
228
3
    } else {
229
2
      const Http::HeaderEntry* header_timeout_entry =
230
2
          request_headers.EnvoyUpstreamRequestTimeoutMs();
231

            
232
2
      if (header_timeout_entry) {
233
2
        trySetGlobalTimeout(*header_timeout_entry, timeout);
234
2
        request_headers.removeEnvoyUpstreamRequestTimeoutMs();
235
2
      }
236
2
    }
237
47358
  } else {
238
47358
    const Http::HeaderEntry* header_timeout_entry = request_headers.EnvoyUpstreamRequestTimeoutMs();
239

            
240
47358
    if (header_timeout_entry) {
241
41
      trySetGlobalTimeout(*header_timeout_entry, timeout);
242
41
      request_headers.removeEnvoyUpstreamRequestTimeoutMs();
243
41
    }
244
47358
  }
245

            
246
  // See if there is a per try/retry timeout. If it's >= global we just ignore it.
247
47363
  const absl::string_view per_try_timeout_entry =
248
47363
      request_headers.getEnvoyUpstreamRequestPerTryTimeoutMsValue();
249
47363
  if (!per_try_timeout_entry.empty()) {
250
45
    if (absl::SimpleAtoi(per_try_timeout_entry, &header_timeout)) {
251
43
      timeout.per_try_timeout_ = std::chrono::milliseconds(header_timeout);
252
43
    }
253
45
    request_headers.removeEnvoyUpstreamRequestPerTryTimeoutMs();
254
45
  }
255

            
256
47363
  if (timeout.per_try_timeout_ >= timeout.global_timeout_ && timeout.global_timeout_.count() != 0) {
257
33
    timeout.per_try_timeout_ = std::chrono::milliseconds(0);
258
33
  }
259

            
260
47363
  setTimeoutHeaders(0, timeout, route, request_headers, insert_envoy_expected_request_timeout_ms,
261
47363
                    grpc_request, per_try_timeout_hedging_enabled);
262

            
263
47363
  return timeout;
264
47363
}
265

            
266
void FilterUtility::setTimeoutHeaders(uint64_t elapsed_time, const TimeoutData& timeout,
267
                                      const RouteEntry& route,
268
                                      Http::RequestHeaderMap& request_headers,
269
                                      bool insert_envoy_expected_request_timeout_ms,
270
47617
                                      bool grpc_request, bool per_try_timeout_hedging_enabled) {
271

            
272
47617
  const uint64_t global_timeout = timeout.global_timeout_.count();
273

            
274
  // See if there is any timeout to write in the expected timeout header.
275
47617
  uint64_t expected_timeout = timeout.per_try_timeout_.count();
276

            
277
  // Use the global timeout if no per try timeout was specified or if we're
278
  // doing hedging when there are per try timeouts. Either of these scenarios
279
  // mean that the upstream server can use the full global timeout.
280
47617
  if (per_try_timeout_hedging_enabled || expected_timeout == 0) {
281
45312
    expected_timeout = global_timeout;
282
45312
  }
283

            
284
  // If the expected timeout is 0 set no timeout, as Envoy treats 0 as infinite timeout.
285
47617
  if (expected_timeout > 0) {
286

            
287
45103
    if (global_timeout > 0) {
288
45091
      if (elapsed_time >= global_timeout) {
289
        // We are out of time, but 0 would be an infinite timeout. So instead we send a 1ms timeout
290
        // and assume the timers armed by onRequestComplete() will fire very soon.
291
2
        expected_timeout = 1;
292
45089
      } else {
293
45089
        expected_timeout = std::min(expected_timeout, global_timeout - elapsed_time);
294
45089
      }
295
45091
    }
296

            
297
45103
    if (insert_envoy_expected_request_timeout_ms) {
298
45075
      request_headers.setEnvoyExpectedRequestTimeoutMs(expected_timeout);
299
45075
    }
300

            
301
    // If we've configured max_grpc_timeout, override the grpc-timeout header with
302
    // the expected timeout. This ensures that the optional per try timeout is reflected
303
    // in grpc-timeout, ensuring that the upstream gRPC server is aware of the actual timeout.
304
45103
    if (grpc_request && !route.usingNewTimeouts() && route.maxGrpcTimeout()) {
305
13
      Grpc::Common::toGrpcTimeout(std::chrono::milliseconds(expected_timeout), request_headers);
306
13
    }
307
45103
  }
308
47617
}
309

            
310
absl::optional<std::chrono::milliseconds>
311
60
FilterUtility::tryParseHeaderTimeout(const Http::HeaderEntry& header_timeout_entry) {
312
60
  uint64_t header_timeout;
313
60
  if (absl::SimpleAtoi(header_timeout_entry.value().getStringView(), &header_timeout)) {
314
58
    return std::chrono::milliseconds(header_timeout);
315
58
  }
316
2
  return absl::nullopt;
317
60
}
318

            
319
void FilterUtility::trySetGlobalTimeout(const Http::HeaderEntry& header_timeout_entry,
320
46
                                        TimeoutData& timeout) {
321
46
  const auto timeout_ms = tryParseHeaderTimeout(header_timeout_entry);
322
46
  if (timeout_ms.has_value()) {
323
44
    timeout.global_timeout_ = timeout_ms.value();
324
44
  }
325
46
}
326

            
327
FilterUtility::HedgingParams
328
FilterUtility::finalHedgingParams(const RouteEntry& route,
329
47342
                                  Http::RequestHeaderMap& request_headers) {
330
47342
  HedgingParams hedging_params;
331
47342
  hedging_params.hedge_on_per_try_timeout_ = route.hedgePolicy().hedgeOnPerTryTimeout();
332

            
333
47342
  const Http::HeaderEntry* hedge_on_per_try_timeout_entry =
334
47342
      request_headers.EnvoyHedgeOnPerTryTimeout();
335
47342
  if (hedge_on_per_try_timeout_entry) {
336
16
    if (hedge_on_per_try_timeout_entry->value() == "true") {
337
11
      hedging_params.hedge_on_per_try_timeout_ = true;
338
11
    }
339
16
    if (hedge_on_per_try_timeout_entry->value() == "false") {
340
2
      hedging_params.hedge_on_per_try_timeout_ = false;
341
2
    }
342

            
343
16
    request_headers.removeEnvoyHedgeOnPerTryTimeout();
344
16
  }
345

            
346
47342
  return hedging_params;
347
47342
}
348

            
349
95367
Filter::~Filter() {
350
  // Upstream resources should already have been cleaned.
351
95367
  ASSERT(upstream_requests_.empty());
352
95367
  ASSERT(!retry_state_);
353
95367
}
354

            
355
const FilterUtility::StrictHeaderChecker::HeaderCheckResult
356
FilterUtility::StrictHeaderChecker::checkHeader(Http::RequestHeaderMap& headers,
357
72
                                                const Http::LowerCaseString& target_header) {
358
72
  if (target_header == Http::Headers::get().EnvoyUpstreamRequestTimeoutMs) {
359
29
    return isInteger(headers.EnvoyUpstreamRequestTimeoutMs());
360
47
  } else if (target_header == Http::Headers::get().EnvoyUpstreamRequestPerTryTimeoutMs) {
361
19
    return isInteger(headers.EnvoyUpstreamRequestPerTryTimeoutMs());
362
27
  } else if (target_header == Http::Headers::get().EnvoyMaxRetries) {
363
5
    return isInteger(headers.EnvoyMaxRetries());
364
20
  } else if (target_header == Http::Headers::get().EnvoyRetryOn) {
365
12
    return hasValidRetryFields(headers.EnvoyRetryOn(), &Router::RetryStateImpl::parseRetryOn);
366
12
  } else if (target_header == Http::Headers::get().EnvoyRetryGrpcOn) {
367
7
    return hasValidRetryFields(headers.EnvoyRetryGrpcOn(),
368
7
                               &Router::RetryStateImpl::parseRetryGrpcOn);
369
7
  }
370
  // Should only validate headers for which we have implemented a validator.
371
  PANIC("unexpectedly reached");
372
}
373

            
374
77025
Stats::StatName Filter::upstreamZone(Upstream::HostDescriptionOptConstRef upstream_host) {
375
77025
  return upstream_host ? upstream_host->localityZoneStatName() : config_->empty_stat_name_;
376
77025
}
377

            
378
void Filter::chargeUpstreamCode(uint64_t response_status_code,
379
                                const Http::ResponseHeaderMap& response_headers,
380
41131
                                Upstream::HostDescriptionOptConstRef upstream_host, bool dropped) {
381
  // Passing the response_status_code explicitly is an optimization to avoid
382
  // multiple calls to slow Http::Utility::getResponseStatus.
383
41131
  ASSERT(response_status_code == Http::Utility::getResponseStatus(response_headers));
384
41131
  if (config_->emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck()) {
385
41131
    const Http::HeaderEntry* upstream_canary_header = response_headers.EnvoyUpstreamCanary();
386
41131
    const bool is_canary = (upstream_canary_header && upstream_canary_header->value() == "true") ||
387
41131
                           (upstream_host ? upstream_host->canary() : false);
388
41131
    const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
389

            
390
41131
    Stats::StatName upstream_zone = upstreamZone(upstream_host);
391
41131
    Http::CodeStats::ResponseStatInfo info{
392
41131
        config_->scope_,
393
41131
        cluster_->statsScope(),
394
41131
        config_->empty_stat_name_,
395
41131
        response_status_code,
396
41131
        internal_request,
397
41131
        route_->virtualHost()->statName(),
398
41131
        request_vcluster_ ? request_vcluster_->statName() : config_->empty_stat_name_,
399
41131
        route_stats_context_.has_value() ? route_stats_context_->statName()
400
41131
                                         : config_->empty_stat_name_,
401
41131
        config_->zone_name_,
402
41131
        upstream_zone,
403
41131
        is_canary};
404

            
405
41131
    Http::CodeStats& code_stats = httpContext().codeStats();
406
41131
    code_stats.chargeResponseStat(info, exclude_http_code_stats_);
407

            
408
41131
    if (alt_stat_prefix_ != nullptr) {
409
3
      Http::CodeStats::ResponseStatInfo alt_info{config_->scope_,
410
3
                                                 cluster_->statsScope(),
411
3
                                                 alt_stat_prefix_->statName(),
412
3
                                                 response_status_code,
413
3
                                                 internal_request,
414
3
                                                 config_->empty_stat_name_,
415
3
                                                 config_->empty_stat_name_,
416
3
                                                 config_->empty_stat_name_,
417
3
                                                 config_->zone_name_,
418
3
                                                 upstream_zone,
419
3
                                                 is_canary};
420
3
      code_stats.chargeResponseStat(alt_info, exclude_http_code_stats_);
421
3
    }
422

            
423
41131
    if (dropped) {
424
51
      cluster_->loadReportStats().upstream_rq_dropped_.inc();
425
51
    }
426
41131
    if (upstream_host && Http::CodeUtility::is5xx(response_status_code)) {
427
1723
      upstream_host->stats().rq_error_.inc();
428
1723
    }
429
41131
  }
430
41131
}
431

            
432
void Filter::chargeUpstreamCode(Http::Code code, Upstream::HostDescriptionOptConstRef upstream_host,
433
1536
                                bool dropped) {
434
1536
  const uint64_t response_status_code = enumToInt(code);
435
1536
  const auto fake_response_headers = Http::createHeaderMap<Http::ResponseHeaderMapImpl>(
436
1536
      {{Http::Headers::get().Status, std::to_string(response_status_code)}});
437
1536
  chargeUpstreamCode(response_status_code, *fake_response_headers, upstream_host, dropped);
438
1536
}
439

            
440
49821
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
441
49821
  downstream_headers_ = &headers;
442

            
443
  // Initialize the `modify_headers_` function that will be used to modify the response headers for
444
  // all upstream responses or local responses.
445
49821
  modify_headers_ = [this](Http::ResponseHeaderMap& headers) {
446
41175
    if (route_entry_ == nullptr) {
447
      return;
448
    }
449

            
450
41175
    if (modify_headers_from_upstream_lb_) {
451
1
      modify_headers_from_upstream_lb_(headers);
452
1
    }
453

            
454
41175
    if (attempt_count_ == 0 || !route_entry_->includeAttemptCountInResponse()) {
455
41159
      return;
456
41159
    }
457
    // This header is added without checking for config_->suppress_envoy_headers_ to mirror what
458
    // is done for upstream requests.
459
16
    headers.setEnvoyAttemptCount(attempt_count_);
460
16
  };
461

            
462
  // TODO: Maybe add a filter API for this.
463
49821
  grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
464
49821
  exclude_http_code_stats_ = grpc_request_ && config_->suppress_grpc_request_failure_code_stats_;
465

            
466
  // Only increment rq total stat if we actually decode headers here. This does not count requests
467
  // that get handled by earlier filters.
468
49821
  stats_.rq_total_.inc();
469

            
470
  // Determine if there is a route entry or a direct response for the request.
471
49821
  route_ = callbacks_->route();
472
49821
  if (!route_) {
473
2204
    stats_.no_route_.inc();
474
2204
    ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue());
475

            
476
2204
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoRouteFound);
477
2204
    callbacks_->sendLocalReply(Http::Code::NotFound, "", nullptr, absl::nullopt,
478
2204
                               StreamInfo::ResponseCodeDetails::get().RouteNotFound);
479
2204
    return Http::FilterHeadersStatus::StopIteration;
480
2204
  }
481

            
482
  // Determine if there is a direct response for the request.
483
47617
  const auto* direct_response = route_->directResponseEntry();
484
47617
  if (direct_response != nullptr) {
485
148
    stats_.rq_direct_response_.inc();
486
148
    direct_response->rewritePathHeader(headers, !config_->suppress_envoy_headers_);
487
148
    std::string body;
488
148
    absl::string_view direct_response_body = direct_response->formatBody(
489
148
        (downstream_headers_ == nullptr) ? *Http::StaticEmptyHeaders::get().request_headers
490
148
                                         : *downstream_headers_,
491
148
        *Http::StaticEmptyHeaders::get().response_headers, callbacks_->streamInfo(), body);
492

            
493
148
    callbacks_->sendLocalReply(
494
148
        direct_response->responseCode(), direct_response_body,
495
148
        [this, direct_response](Http::ResponseHeaderMap& response_headers) -> void {
496
148
          std::string new_uri;
497
148
          ASSERT(downstream_headers_ != nullptr);
498
148
          if (downstream_headers_->Path()) {
499
148
            new_uri = direct_response->newUri(*downstream_headers_);
500
148
          }
501
          // See https://tools.ietf.org/html/rfc7231#section-7.1.2.
502
148
          const auto add_location =
503
148
              direct_response->responseCode() == Http::Code::Created ||
504
148
              Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode()));
505
148
          if (!new_uri.empty() && add_location) {
506
19
            response_headers.addReferenceKey(Http::Headers::get().Location, new_uri);
507
19
          }
508
148
          const Formatter::Context formatter_context(downstream_headers_, &response_headers, {}, {},
509
148
                                                     {}, &callbacks_->activeSpan());
510
148
          direct_response->finalizeResponseHeaders(response_headers, formatter_context,
511
148
                                                   callbacks_->streamInfo());
512
          // Apply content_type from body_format if configured.
513
148
          const absl::string_view content_type = direct_response->responseContentType();
514
148
          if (!content_type.empty()) {
515
1
            response_headers.setReferenceKey(Http::Headers::get().ContentType, content_type);
516
1
          }
517
148
        },
518
148
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse);
519
148
    return Http::FilterHeadersStatus::StopIteration;
520
148
  }
521

            
522
  // A route entry matches for the request.
523
47469
  route_entry_ = route_->routeEntry();
524
  // Store buffer limits from the route entry.
525
  // The requestBodyBufferLimit() method handles both legacy per_request_buffer_limit_bytes
526
  // and new request_body_buffer_limit configurations automatically.
527
47469
  request_body_buffer_limit_ = route_entry_->requestBodyBufferLimit();
528
47469
  Upstream::ThreadLocalCluster* cluster =
529
47469
      config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
530
47469
  if (!cluster) {
531
88
    stats_.no_cluster_.inc();
532
88
    ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName());
533

            
534
88
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoClusterFound);
535
88
    callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers_,
536
88
                               absl::nullopt,
537
88
                               StreamInfo::ResponseCodeDetails::get().ClusterNotFound);
538
88
    return Http::FilterHeadersStatus::StopIteration;
539
88
  }
540
47381
  cluster_ = cluster->info();
541

            
542
  // Set up stat prefixes, etc.
543
47381
  request_vcluster_ = route_->virtualHost()->virtualCluster(headers);
544
47381
  if (request_vcluster_ != nullptr) {
545
285
    callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name());
546
285
  }
547
47381
  route_stats_context_ = route_entry_->routeStatsContext();
548
47381
  ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_,
549
47381
                   route_entry_->clusterName(), headers.getPathValue());
550

            
551
47381
  if (config_->strict_check_headers_ != nullptr) {
552
62
    for (const auto& header : *config_->strict_check_headers_) {
553
62
      const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header);
554
62
      if (!res.valid_) {
555
30
        callbacks_->streamInfo().setResponseFlag(
556
30
            StreamInfo::CoreResponseFlag::InvalidEnvoyRequestHeaders);
557
30
        const std::string body = fmt::format("invalid header '{}' with value '{}'",
558
30
                                             std::string(res.entry_->key().getStringView()),
559
30
                                             std::string(res.entry_->value().getStringView()));
560
30
        const std::string details =
561
30
            absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{",
562
30
                         StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}");
563
30
        callbacks_->sendLocalReply(Http::Code::BadRequest, body, modify_headers_, absl::nullopt,
564
30
                                   details);
565
30
        return Http::FilterHeadersStatus::StopIteration;
566
30
      }
567
62
    }
568
31
  }
569

            
570
47351
  const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName();
571
47351
  if (request_alt_name) {
572
3
    alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>(
573
3
        request_alt_name->value().getStringView(), config_->scope_.symbolTable());
574
3
    headers.removeEnvoyUpstreamAltStatName();
575
3
  }
576

            
577
  // See if we are supposed to immediately kill some percentage of this cluster's traffic.
578
47351
  if (cluster_->maintenanceMode()) {
579
3
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
580
3
    chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, true);
581
3
    callbacks_->sendLocalReply(
582
3
        Http::Code::ServiceUnavailable, "maintenance mode",
583
3
        [this](Http::ResponseHeaderMap& headers) {
584
3
          if (!config_->suppress_envoy_headers_) {
585
1
            headers.addReference(Http::Headers::get().EnvoyOverloaded,
586
1
                                 Http::Headers::get().EnvoyOverloadedValues.True);
587
1
          }
588
          // Note: append_cluster_info does not respect suppress_envoy_headers.
589
3
          modify_headers_(headers);
590
3
        },
591
3
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
592
3
    cluster_->trafficStats()->upstream_rq_maintenance_mode_.inc();
593
3
    return Http::FilterHeadersStatus::StopIteration;
594
3
  }
595

            
596
  // Support DROP_OVERLOAD config from control plane to drop certain percentage of traffic.
597
47348
  if (checkDropOverload(*cluster)) {
598
14
    return Http::FilterHeadersStatus::StopIteration;
599
14
  }
600

            
601
  // If large request buffering is enabled and its size is more than current buffer limit, update
602
  // the buffer limit to a new larger value.
603
47334
  uint64_t effective_buffer_limit = calculateEffectiveBufferLimit();
604
47334
  if (effective_buffer_limit != std::numeric_limits<uint64_t>::max() &&
605
47334
      effective_buffer_limit > callbacks_->bufferLimit()) {
606
4
    ENVOY_STREAM_LOG(debug, "Setting new filter manager buffer limit: {}", *callbacks_,
607
4
                     effective_buffer_limit);
608
4
    callbacks_->setBufferLimit(effective_buffer_limit);
609
4
  }
610

            
611
  // Increment the attempt count from 0 to 1 at the first upstream request.
612
47334
  attempt_count_++;
613
47334
  callbacks_->streamInfo().setAttemptCount(attempt_count_);
614

            
615
  // Set hedging params before finalizeRequestHeaders so timeout calculation is correct.
616
47334
  hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers);
617

            
618
  // Calculate timeout and set x-envoy-expected-rq-timeout-ms before finalizeRequestHeaders.
619
  // This allows request_headers_to_add to reference the timeout header.
620
47334
  timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_->suppress_envoy_headers_,
621
47334
                                         grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
622
47334
                                         config_->respect_expected_rq_timeout_);
623

            
624
  // Set x-envoy-attempt-count before finalizeRequestHeaders so it can be referenced.
625
47334
  include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest();
626
47334
  if (include_attempt_count_in_request_) {
627
30
    headers.setEnvoyAttemptCount(attempt_count_);
628
30
  }
629

            
630
  // Finalize request headers (host/path rewriting + request_headers_to_add) before host selection.
631
  // At this point, router-set headers (x-envoy-expected-rq-timeout-ms, x-envoy-attempt-count)
632
  // are already set, so request_headers_to_add can reference them and affect load balancing.
633
47334
  const Formatter::Context formatter_context(&headers, {}, {}, {}, {}, &callbacks_->activeSpan());
634
47334
  route_entry_->finalizeRequestHeaders(headers, formatter_context, callbacks_->streamInfo(),
635
47334
                                       !config_->suppress_envoy_headers_);
636

            
637
  // Fetch a connection pool for the upstream cluster.
638
47334
  const auto& upstream_http_protocol_options =
639
47334
      cluster_->httpProtocolOptions().upstreamHttpProtocolOptions();
640
47334
  if (upstream_http_protocol_options && (upstream_http_protocol_options->auto_sni() ||
641
2304
                                         upstream_http_protocol_options->auto_san_validation())) {
642
    // Default the header to Host/Authority header.
643
2299
    absl::string_view header_value = headers.getHostValue();
644

            
645
    // Check whether `override_auto_sni_header` is specified.
646
2299
    if (const auto& override_header = upstream_http_protocol_options->override_auto_sni_header();
647
2299
        !override_header.empty()) {
648
      // Use the header value from `override_auto_sni_header` to set the SNI value.
649
13
      const auto override_header_value = headers.get(Http::LowerCaseString(override_header));
650
13
      if (override_header_value.size() > 1) {
651
1
        ENVOY_STREAM_LOG(info, "Multiple values for auto sni header '{}' and use the first one",
652
1
                         *callbacks_, override_header);
653
1
      }
654
13
      if (!override_header_value.empty() && !override_header_value[0]->value().empty()) {
655
11
        header_value = override_header_value[0]->value().getStringView();
656
11
      }
657
13
    }
658

            
659
2299
    auto& filter_state = callbacks_->streamInfo().filterState();
660

            
661
    // Parse the authority header value to extract the host and check if it's an IP address.
662
2299
    const auto parsed_authority = Http::Utility::parseAuthority(header_value);
663

            
664
2299
    if (!parsed_authority.is_ip_address_ && upstream_http_protocol_options->auto_sni() &&
665
2299
        !filter_state->hasDataWithName(Network::UpstreamServerName::key())) {
666
2283
      filter_state->setData(Network::UpstreamServerName::key(),
667
2283
                            std::make_unique<Network::UpstreamServerName>(parsed_authority.host_),
668
2283
                            StreamInfo::FilterState::StateType::Mutable);
669
2283
    }
670

            
671
2299
    if (upstream_http_protocol_options->auto_san_validation() &&
672
2299
        !filter_state->hasDataWithName(Network::UpstreamSubjectAltNames::key())) {
673
157
      filter_state->setData(Network::UpstreamSubjectAltNames::key(),
674
157
                            std::make_unique<Network::UpstreamSubjectAltNames>(
675
157
                                std::vector<std::string>{std::string(parsed_authority.host_)}),
676
157
                            StreamInfo::FilterState::StateType::Mutable);
677
157
    }
678
2299
  }
679

            
680
47334
  transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
681
47334
      *callbacks_->streamInfo().filterState());
682

            
683
47334
  if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) {
684
44251
    if (auto typed_state = downstream_connection->streamInfo()
685
44251
                               .filterState()
686
44251
                               .getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
687
44251
                                   Network::UpstreamSocketOptionsFilterState::key());
688
44251
        typed_state != nullptr) {
689
1
      auto downstream_options = typed_state->value();
690
1
      if (!upstream_options_) {
691
1
        upstream_options_ = std::make_shared<Network::Socket::Options>();
692
1
      }
693
1
      Network::Socket::appendOptions(upstream_options_, downstream_options);
694
1
    }
695
44251
  }
696

            
697
47334
  if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) {
698
    Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
699
  }
700

            
701
47334
  callbacks_->streamInfo().downstreamTiming().setValue(
702
47334
      "envoy.router.host_selection_start_ms",
703
47334
      callbacks_->dispatcher().timeSource().monotonicTime());
704

            
705
47334
  auto host_selection_response = cluster->chooseHost(this);
706
47334
  if (!host_selection_response.cancelable ||
707
47334
      !Runtime::runtimeFeatureEnabled("envoy.reloadable_features.async_host_selection")) {
708
33497
    if (host_selection_response.cancelable) {
709
7
      host_selection_response.cancelable->cancel();
710
7
    }
711
    // This branch handles the common case of synchronous host selection, as
712
    // well as handling unsupported asynchronous host selection by treating it
713
    // as host selection failure and calling sendNoHealthyUpstreamResponse.
714
33497
    continueDecodeHeaders(cluster, headers, end_stream, std::move(host_selection_response.host),
715
33497
                          host_selection_response.details);
716
33497
    return Http::FilterHeadersStatus::StopIteration;
717
33497
  }
718

            
719
13837
  ENVOY_STREAM_LOG(debug, "Doing asynchronous host selection\n", *callbacks_);
720
  // Latch the cancel handle and call it in Filter::onDestroy to avoid any use-after-frees for cases
721
  // like stream timeout.
722
13837
  host_selection_cancelable_ = std::move(host_selection_response.cancelable);
723
  // Configure a callback to be called on asynchronous host selection.
724
13837
  on_host_selected_ = ([this, cluster,
725
13837
                        end_stream](Upstream::HostConstSharedPtr&& host,
726
13837
                                    absl::string_view host_selection_details) -> void {
727
    // It should always be safe to call continueDecodeHeaders. In the case the
728
    // stream had a local reply before host selection completed,
729
    // the lookup should be canceled.
730
13381
    const bool should_continue_decoding = continueDecodeHeaders(
731
13381
        cluster, *downstream_headers_, end_stream, std::move(host), host_selection_details);
732
    // continueDecodeHeaders can itself send a local reply, in which case should_continue_decoding
733
    // should be false. If this is not the case, we can continue the filter chain due to successful
734
    // asynchronous host selection.
735
13381
    if (should_continue_decoding) {
736
13341
      ENVOY_STREAM_LOG(debug, "Continuing stream now that host resolution is complete\n",
737
13341
                       *callbacks_);
738
13341
      callbacks_->continueDecoding();
739
13341
    } else {
740
40
      ENVOY_STREAM_LOG(debug, "Aborting stream after host resolution is complete\n", *callbacks_);
741
40
    }
742
13381
  });
743
13837
  return Http::FilterHeadersStatus::StopAllIterationAndWatermark;
744
47334
}
745

            
746
// When asynchronous host selection is complete, call the pre-configured on_host_selected_function.
747
13506
void Filter::onAsyncHostSelection(Upstream::HostConstSharedPtr&& host, std::string&& details) {
748
13506
  ENVOY_STREAM_LOG(debug, "Completing asynchronous host selection [{}]\n", *callbacks_, details);
749
13506
  std::unique_ptr<Upstream::AsyncHostSelectionHandle> local_scope =
750
13506
      std::move(host_selection_cancelable_);
751
13506
  on_host_selected_(std::move(host), details);
752
13506
}
753

            
754
bool Filter::continueDecodeHeaders(Upstream::ThreadLocalCluster* cluster,
755
                                   Http::RequestHeaderMap& headers, bool end_stream,
756
                                   Upstream::HostConstSharedPtr&& selected_host,
757
46878
                                   absl::string_view host_selection_details) {
758
46878
  callbacks_->streamInfo().downstreamTiming().setValue(
759
46878
      "envoy.router.host_selection_end_ms", callbacks_->dispatcher().timeSource().monotonicTime());
760

            
761
46878
  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster, selected_host);
762
46878
  if (!generic_conn_pool) {
763
73
    sendNoHealthyUpstreamResponse(host_selection_details);
764
73
    return false;
765
73
  }
766
46805
  Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();
767

            
768
  // If we've been instructed not to forward the request upstream, send an empty local response.
769
46805
  if (auto* debug_config =
770
46805
          callbacks_->streamInfo().filterState()->getDataReadOnly<DebugConfig>(DebugConfig::key());
771
46805
      debug_config != nullptr && debug_config->do_not_forward_) {
772
1
    callbacks_->sendLocalReply(
773
1
        Http::Code::NoContent, "",
774
1
        [this](Http::ResponseHeaderMap& headers) {
775
1
          headers.addReference(Http::Headers::get().EnvoyNotForwarded, "true");
776
1
          modify_headers_(headers);
777
1
        },
778
1
        absl::nullopt, "");
779
1
    return false;
780
1
  }
781

            
782
46804
  if (callbacks_->shouldLoadShed()) {
783
5
    callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", modify_headers_,
784
5
                               absl::nullopt, StreamInfo::ResponseCodeDetails::get().Overload);
785
5
    stats_.rq_overload_local_reply_.inc();
786
5
    return false;
787
5
  }
788

            
789
  // Handle additional header processing.
790
46799
  const Http::HeaderEntry* header_max_stream_duration_entry =
791
46799
      headers.EnvoyUpstreamStreamDurationMs();
792
46799
  if (header_max_stream_duration_entry) {
793
14
    dynamic_max_stream_duration_ =
794
14
        FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry);
795
14
    headers.removeEnvoyUpstreamStreamDurationMs();
796
14
  }
797

            
798
  // If this header is set with any value, use an alternate response code on timeout.
799
46799
  if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) {
800
2
    timeout_response_code_ = Http::Code::NoContent;
801
2
    headers.removeEnvoyUpstreamRequestTimeoutAltResponse();
802
2
  }
803

            
804
46799
  FilterUtility::setUpstreamScheme(
805
46799
      headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr,
806
46799
      host->transportSocketFactory().sslCtx() != nullptr,
807
46799
      callbacks_->streamInfo().shouldSchemeMatchUpstream());
808

            
809
  // Ensure an http transport scheme is selected before continuing with decoding.
810
46799
  ASSERT(headers.Scheme());
811

            
812
46799
  retry_state_ = createRetryState(*getEffectiveRetryPolicy(), headers, *cluster_, request_vcluster_,
813
46799
                                  route_stats_context_, config_->factory_context_,
814
46799
                                  callbacks_->dispatcher(), route_entry_->priority());
815

            
816
  // Determine which shadow policies to use. It's possible that we don't do any shadowing due to
817
  // runtime keys. Also the method CONNECT doesn't support shadowing.
818
46799
  auto method = headers.getMethodValue();
819
46799
  if (method != Http::Headers::get().MethodValues.Connect) {
820
    // Use cluster-level shadow policies if they are available (most specific wins).
821
    // If no cluster-level policies are configured, fall back to route-level policies.
822
46363
    const auto& cluster_shadow_policies = cluster_->httpProtocolOptions().shadowPolicies();
823
46363
    const auto& policies_to_evaluate =
824
46363
        !cluster_shadow_policies.empty() ? cluster_shadow_policies : route_entry_->shadowPolicies();
825

            
826
46363
    for (const auto& shadow_policy : policies_to_evaluate) {
827
98
      const auto& policy_ref = *shadow_policy;
828
98
      if (FilterUtility::shouldShadow(policy_ref, config_->runtime_, callbacks_->streamId())) {
829
97
        active_shadow_policies_.push_back(std::cref(policy_ref));
830
97
        shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_);
831
97
      }
832
98
    }
833
46363
  }
834

            
835
46799
  ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);
836

            
837
46799
  const bool can_send_early_data =
838
46799
      route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_);
839

            
840
46799
  include_timeout_retry_header_in_request_ = route_->virtualHost()->includeIsTimeoutRetryHeader();
841

            
842
  // Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config.
843
  // For retries etc, HTTP/3 usability may transition from true to false, but
844
  // will never transition from false to true.
845
46799
  bool can_use_http3 =
846
46799
      !transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value();
847
46799
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
848
46799
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3,
849
46799
      allow_multiplexed_upstream_half_close_ /*enable_half_close*/);
850
46799
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
851
46799
  upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
852
  // Start the shadow streams.
853
46799
  for (const auto& shadow_policy_wrapper : active_shadow_policies_) {
854
97
    const auto& shadow_policy = shadow_policy_wrapper.get();
855
97
    const absl::optional<absl::string_view> shadow_cluster_name =
856
97
        getShadowCluster(shadow_policy, *downstream_headers_);
857
97
    if (!shadow_cluster_name.has_value()) {
858
4
      continue;
859
4
    }
860
93
    auto shadow_headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_);
861
93
    applyShadowPolicyHeaders(shadow_policy, *shadow_headers);
862
93
    const auto options =
863
93
        Http::AsyncClient::RequestOptions()
864
93
            .setTimeout(timeout_.global_timeout_)
865
93
            .setParentSpan(callbacks_->activeSpan())
866
93
            .setChildSpanName("mirror")
867
93
            .setSampled(shadow_policy.traceSampled())
868
93
            .setIsShadow(true)
869
93
            .setIsShadowSuffixDisabled(shadow_policy.disableShadowHostSuffixAppend())
870
93
            .setBufferAccount(callbacks_->account())
871
            // Calculate effective buffer limit for shadow streams using the same logic as main
872
            // request. A buffer limit of 1 is set in the case that the effective limit == 0,
873
            // because a buffer limit of zero on async clients is interpreted as no buffer limit.
874
93
            .setBufferLimit([this]() -> uint64_t {
875
93
              const uint64_t effective_limit = calculateEffectiveBufferLimit();
876
93
              return effective_limit == 0 ? 1 : effective_limit;
877
93
            }())
878
93
            .setDiscardResponseBody(true)
879
93
            .setFilterConfig(config_)
880
93
            .setParentContext(Http::AsyncClient::ParentContext{&callbacks_->streamInfo()});
881
93
    if (end_stream) {
882
      // This is a header-only request, and can be dispatched immediately to the shadow
883
      // without waiting.
884
50
      Http::RequestMessagePtr request(new Http::RequestMessageImpl(
885
50
          Http::createHeaderMap<Http::RequestHeaderMapImpl>(*shadow_headers_)));
886
50
      applyShadowPolicyHeaders(shadow_policy, request->headers());
887
50
      config_->shadowWriter().shadow(std::string(shadow_cluster_name.value()), std::move(request),
888
50
                                     options);
889
84
    } else {
890
43
      Http::AsyncClient::OngoingRequest* shadow_stream = config_->shadowWriter().streamingShadow(
891
43
          std::string(shadow_cluster_name.value()), std::move(shadow_headers), options);
892
43
      if (shadow_stream != nullptr) {
893
43
        shadow_streams_.insert(shadow_stream);
894
43
        shadow_stream->setDestructorCallback(
895
43
            [this, shadow_stream]() { shadow_streams_.erase(shadow_stream); });
896
43
        shadow_stream->setWatermarkCallbacks(watermark_callbacks_);
897
43
      }
898
43
    }
899
93
  }
900
46799
  if (end_stream) {
901
32447
    onRequestComplete();
902
32447
  }
903

            
904
  // If this was called due to asynchronous host selection, the caller should continueDecoding.
905
46799
  return true;
906
46804
}
907

            
908
std::unique_ptr<GenericConnPool> Filter::createConnPool(Upstream::ThreadLocalCluster& cluster,
909
47125
                                                        Upstream::HostConstSharedPtr host) {
910
47125
  if (host == nullptr) {
911
72
    return nullptr;
912
72
  }
913
47053
  GenericConnPoolFactory* factory = nullptr;
914
47053
  ProtobufTypes::MessagePtr message;
915
47053
  if (cluster_->upstreamConfig().has_value()) {
916
15
    factory = Envoy::Config::Utility::getFactory<GenericConnPoolFactory>(
917
15
        cluster_->upstreamConfig().ref());
918
15
    ENVOY_BUG(factory != nullptr,
919
15
              fmt::format("invalid factory type '{}', failing over to default upstream",
920
15
                          cluster_->upstreamConfig().ref().DebugString()));
921
15
    if (!cluster_->upstreamConfig()->typed_config().value().empty()) {
922
2
      message = Envoy::Config::Utility::translateToFactoryConfig(
923
2
          *cluster_->upstreamConfig(), config_->factory_context_.messageValidationVisitor(),
924
2
          *factory);
925
2
    }
926
15
  }
927
47053
  if (!factory) {
928
47039
    factory = &config_->router_context_.genericConnPoolFactory();
929
47039
  }
930

            
931
47053
  if (!message) {
932
47051
    message = factory->createEmptyConfigProto();
933
47051
  }
934

            
935
47053
  using UpstreamProtocol = Envoy::Router::GenericConnPoolFactory::UpstreamProtocol;
936
47053
  UpstreamProtocol upstream_protocol = UpstreamProtocol::HTTP;
937
47053
  if (route_entry_->connectConfig().has_value()) {
938
402
    auto method = downstream_headers_->getMethodValue();
939
402
    if (Http::HeaderUtility::isConnectUdpRequest(*downstream_headers_)) {
940
32
      upstream_protocol = UpstreamProtocol::UDP;
941
402
    } else if (method == Http::Headers::get().MethodValues.Connect ||
942
370
               (route_entry_->connectConfig()->allow_post() &&
943
369
                method == Http::Headers::get().MethodValues.Post)) {
944
      // Allow POST for proxying raw TCP if it is configured.
945
366
      upstream_protocol = UpstreamProtocol::TCP;
946
366
    }
947
402
  }
948

            
949
47053
  return factory->createGenericConnPool(host, cluster, upstream_protocol, route_entry_->priority(),
950
47053
                                        callbacks_->streamInfo().protocol(), this, *message);
951
47125
}
952

            
953
75
void Filter::sendNoHealthyUpstreamResponse(absl::string_view optional_details) {
954
75
  callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::NoHealthyUpstream);
955
75
  chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, false);
956
75
  absl::string_view details = optional_details.empty()
957
75
                                  ? StreamInfo::ResponseCodeDetails::get().NoHealthyUpstream
958
75
                                  : optional_details;
959
75
  callbacks_->sendLocalReply(Http::Code::ServiceUnavailable, "no healthy upstream", modify_headers_,
960
75
                             absl::nullopt, details);
961
75
}
962

            
963
233539
uint64_t Filter::calculateEffectiveBufferLimit() const {
964
  // Use requestBodyBufferLimit() method which handles both legacy and new
965
  // configurations. If no buffer limit is configured, fall back to connection limit.
966
233539
  if (request_body_buffer_limit_ != std::numeric_limits<uint64_t>::max()) {
967
975
    return request_body_buffer_limit_;
968
975
  }
969

            
970
  // If no route-level buffer limit is set, use the stream buffer limit.
971
232564
  const uint32_t current_stream_limit = callbacks_->bufferLimit();
972
232564
  if (current_stream_limit != 0) {
973
231834
    return static_cast<uint64_t>(current_stream_limit);
974
231834
  }
975

            
976
  // If no limits are set at all, return unlimited.
977
730
  return std::numeric_limits<uint64_t>::max();
978
232564
}
979

            
980
182729
bool Filter::isEarlyConnectData() {
981
182729
  return reject_early_connect_data_enabled_ && downstream_headers_ != nullptr &&
982
182729
         Http::HeaderUtility::isConnect(*downstream_headers_) && !downstream_response_started_;
983
182729
}
984

            
985
186112
Http::FilterDataStatus Filter::decodeData(Buffer::Instance& data, bool end_stream) {
986
186112
  if (data.length() > 0 && isEarlyConnectData()) {
987
    callbacks_->sendLocalReply(Http::Code::BadRequest, "", nullptr, absl::nullopt,
988
                               StreamInfo::ResponseCodeDetails::get().EarlyConnectData);
989
    return Http::FilterDataStatus::StopIterationNoBuffer;
990
  }
991
  // upstream_requests_.size() cannot be > 1 because that only happens when a per
992
  // try timeout occurs with hedge_on_per_try_timeout enabled but the per
993
  // try timeout timer is not started until onRequestComplete(). It could be zero
994
  // if the first request attempt has already failed and a retry is waiting for
995
  // a backoff timer..
996
186112
  ASSERT(upstream_requests_.size() <= 1);
997

            
998
186112
  const bool retry_enabled = retry_state_ && retry_state_->enabled();
999
186112
  const bool redirect_enabled = route_entry_ && route_entry_->internalRedirectPolicy().enabled();
186112
  const bool is_redirect_only = redirect_enabled && !retry_enabled;
186112
  const uint64_t effective_buffer_limit = calculateEffectiveBufferLimit();
186112
  bool buffering = retry_enabled || redirect_enabled;
  // Check if we would exceed buffer limits, regardless of current buffering state
  // This ensures error details are set even if retry state was cleared due to upstream reset.
186112
  const bool would_exceed_buffer =
186112
      (getLength(callbacks_->decodingBuffer()) + data.length() > effective_buffer_limit);
  // Handle retry buffer overflow, excluding redirect-only scenarios.
  // For redirect scenarios, buffer overflow should only affect redirect processing, not initial
  // request.
186112
  if (would_exceed_buffer && retry_enabled && !is_redirect_only && !request_buffer_overflowed_) {
313
    ENVOY_LOG(debug,
313
              "The request payload has at least {} bytes data which exceeds buffer limit {}. "
313
              "Giving up on buffering.",
313
              getLength(callbacks_->decodingBuffer()) + data.length(), effective_buffer_limit);
313
    cluster_->trafficStats()->retry_or_shadow_abandoned_.inc();
313
    retry_state_.reset();
313
    ENVOY_LOG(debug, "retry or shadow overflow: retry_state_ reset, buffering set to false");
313
    buffering = false;
313
    active_shadow_policies_.clear();
    // Only send local reply and cleanup if we're in a retry waiting state (no active upstream
    // requests). If there are active upstream requests, let the normal upstream failure handling
    // take precedence.
313
    if (upstream_requests_.empty()) {
20
      request_buffer_overflowed_ = true;
20
      ENVOY_LOG(debug,
20
                "retry or shadow overflow: No upstream requests, resetting and calling cleanup()");
20
      resetAll();
20
      cleanup();
20
      callbacks_->streamInfo().setResponseCodeDetails(
20
          StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit);
20
      callbacks_->sendLocalReply(
20
          Http::Code::InsufficientStorage, "exceeded request buffer limit while retrying upstream",
20
          modify_headers_, absl::nullopt,
20
          StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit);
20
      return Http::FilterDataStatus::StopIterationNoBuffer;
302
    } else {
293
      ENVOY_LOG(debug, "retry or shadow overflow: Upstream requests exist, deferring to normal "
293
                       "upstream failure handling");
293
    }
313
  }
  // Handle redirect-only buffer overflow when retry/shadow is not active.
  // For redirect scenarios, buffer overflow should only affect redirect processing, not initial
  // request.
186092
  if (would_exceed_buffer && is_redirect_only && !request_buffer_overflowed_) {
8
    ENVOY_LOG(debug,
8
              "The request payload has at least {} bytes data which exceeds buffer limit {}. "
8
              "Marking request as buffer overflowed to cancel internal redirects.",
8
              getLength(callbacks_->decodingBuffer()) + data.length(), effective_buffer_limit);
    // Set the flag to cancel internal redirect processing, but allow the request to proceed
    // normally.
8
    request_buffer_overflowed_ = true;
8
  }
186092
  for (auto* shadow_stream : shadow_streams_) {
164
    if (end_stream) {
32
      shadow_stream->removeDestructorCallback();
32
      shadow_stream->removeWatermarkCallbacks();
32
    }
164
    Buffer::OwnedImpl copy(data);
164
    shadow_stream->sendData(copy, end_stream);
164
  }
186092
  if (end_stream) {
6985
    shadow_streams_.clear();
6985
  }
186092
  if (buffering) {
1205
    if (!upstream_requests_.empty()) {
1193
      Buffer::OwnedImpl copy(data);
1193
      upstream_requests_.front()->acceptDataFromRouter(copy, end_stream);
1193
    }
    // If we are potentially going to retry or buffer shadow this request we need to buffer.
    // This will not cause the connection manager to 413 because before we hit the
    // buffer limit we give up on retries and buffering. We must buffer using addDecodedData()
    // so that all buffered data is available by the time we do request complete processing and
    // potentially shadow. Additionally, we can't do a copy here because there's a check down
    // this stack for whether `data` is the same buffer as already buffered data.
1205
    callbacks_->addDecodedData(data, true);
185398
  } else {
184887
    if (!upstream_requests_.empty()) {
184886
      upstream_requests_.front()->acceptDataFromRouter(data, end_stream);
184886
    } else {
      // not buffering any data for retry, shadow, and internal redirect, and there will be
      // no more upstream request, abort the request and clean up.
1
      cleanup();
1
      callbacks_->sendLocalReply(
1
          Http::Code::ServiceUnavailable,
1
          "upstream is closed prematurely during decoding data from downstream", modify_headers_,
1
          absl::nullopt, StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset);
1
      return Http::FilterDataStatus::StopIterationNoBuffer;
1
    }
184887
  }
186091
  if (end_stream) {
6985
    onRequestComplete();
6985
  }
186091
  return Http::FilterDataStatus::StopIterationNoBuffer;
186092
}
458
Http::FilterTrailersStatus Filter::decodeTrailers(Http::RequestTrailerMap& trailers) {
458
  ENVOY_STREAM_LOG(debug, "router decoding trailers:\n{}", *callbacks_, trailers);
458
  if (shadow_headers_) {
5
    shadow_trailers_ = Http::createHeaderMap<Http::RequestTrailerMapImpl>(trailers);
5
  }
  // upstream_requests_.size() cannot be > 1 because that only happens when a per
  // try timeout occurs with hedge_on_per_try_timeout enabled but the per
  // try timeout timer is not started until onRequestComplete(). It could be zero
  // if the first request attempt has already failed and a retry is waiting for
  // a backoff timer.
458
  ASSERT(upstream_requests_.size() <= 1);
458
  downstream_trailers_ = &trailers;
458
  if (!upstream_requests_.empty()) {
457
    upstream_requests_.front()->acceptTrailersFromRouter(trailers);
457
  }
458
  for (auto* shadow_stream : shadow_streams_) {
2
    shadow_stream->removeDestructorCallback();
2
    shadow_stream->removeWatermarkCallbacks();
2
    shadow_stream->captureAndSendTrailers(
2
        Http::createHeaderMap<Http::RequestTrailerMapImpl>(*shadow_trailers_));
2
  }
458
  shadow_streams_.clear();
458
  onRequestComplete();
458
  return Http::FilterTrailersStatus::StopIteration;
458
}
1281
Http::FilterMetadataStatus Filter::decodeMetadata(Http::MetadataMap& metadata_map) {
1281
  Http::MetadataMapPtr metadata_map_ptr = std::make_unique<Http::MetadataMap>(metadata_map);
1281
  if (!upstream_requests_.empty()) {
    // TODO(soya3129): Save metadata for retry, redirect and shadowing case.
1281
    upstream_requests_.front()->acceptMetadataFromRouter(std::move(metadata_map_ptr));
1281
  }
1281
  return Http::FilterMetadataStatus::Continue;
1281
}
95355
void Filter::setDecoderFilterCallbacks(Http::StreamDecoderFilterCallbacks& callbacks) {
95355
  callbacks_ = &callbacks;
95355
  watermark_callbacks_.setDecoderFilterCallbacks(callbacks_);
95355
}
135154
void Filter::cleanup() {
  // All callers of cleanup() should have cleaned out the upstream_requests_
  // list as appropriate.
135154
  ASSERT(upstream_requests_.empty());
135154
  ENVOY_LOG(debug, "Executing cleanup(): resetting retry_state_ and disabling timers");
135154
  retry_state_.reset();
135154
  if (response_timeout_) {
38528
    response_timeout_->disableTimer();
38528
    response_timeout_.reset();
38528
  }
135154
}
absl::optional<absl::string_view> Filter::getShadowCluster(const ShadowPolicy& policy,
97
                                                           const Http::HeaderMap& headers) const {
97
  if (!policy.cluster().empty()) {
91
    return policy.cluster();
91
  } else {
6
    ASSERT(!policy.clusterHeader().get().empty());
6
    const auto entry = headers.get(policy.clusterHeader());
6
    if (!entry.empty() && !entry[0]->value().empty()) {
2
      return entry[0]->value().getStringView();
2
    }
4
    ENVOY_STREAM_LOG(debug, "There is no cluster name in header: {}", *callbacks_,
4
                     policy.clusterHeader());
4
    return absl::nullopt;
6
  }
97
}
void Filter::applyShadowPolicyHeaders(const ShadowPolicy& shadow_policy,
143
                                      Http::RequestHeaderMap& headers) const {
143
  const Envoy::Formatter::Context formatter_context{&headers};
143
  shadow_policy.headerEvaluator().evaluateHeaders(headers, formatter_context,
143
                                                  callbacks_->streamInfo());
143
  if (!shadow_policy.hostRewriteLiteral().empty()) {
6
    headers.setHost(shadow_policy.hostRewriteLiteral());
6
  }
143
}
100
void Filter::setupRouteTimeoutForWebsocketUpgrade() {
  // Set up the route timeout early for websocket upgrades, since the upstream request
  // will be paused waiting for the upgrade response and we need the timeout active.
100
  if (!response_timeout_ && timeout_.global_timeout_.count() > 0) {
95
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
95
    response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
95
    response_timeout_->enableTimer(timeout_.global_timeout_);
95
  }
100
}
56
void Filter::disableRouteTimeoutForWebsocketUpgrade() {
  // Disable the route timeout after websocket upgrade completes successfully
  // to prevent timeout from firing after the upgrade is done.
56
  if (response_timeout_) {
56
    response_timeout_->disableTimer();
56
    response_timeout_.reset();
56
  }
56
}
39890
void Filter::onRequestComplete() {
  // This should be called exactly once, when the downstream request has been received in full.
39890
  ASSERT(!downstream_end_stream_);
39890
  downstream_end_stream_ = true;
39890
  Event::Dispatcher& dispatcher = callbacks_->dispatcher();
39890
  downstream_request_complete_time_ = dispatcher.timeSource().monotonicTime();
  // Possible that we got an immediate reset.
39890
  if (!upstream_requests_.empty()) {
    // Even if we got an immediate reset, we could still shadow, but that is a riskier change and
    // seems unnecessary right now.
39253
    if (timeout_.global_timeout_.count() > 0 && !response_timeout_) {
38489
      response_timeout_ = dispatcher.createTimer([this]() -> void { onResponseTimeout(); });
38489
      response_timeout_->enableTimer(timeout_.global_timeout_);
38489
    }
39253
    for (auto& upstream_request : upstream_requests_) {
39253
      if (upstream_request->createPerTryTimeoutOnRequestComplete()) {
14507
        upstream_request->setupPerTryTimeout();
14507
      }
39253
    }
39253
  }
39890
}
95167
void Filter::onDestroy() {
  // Cancel any in-flight host selection
95167
  if (host_selection_cancelable_) {
425
    host_selection_cancelable_->cancel();
425
  }
  // Reset any in-flight upstream requests.
95167
  resetAll();
  // Unregister from shadow stream notifications and cancel active streams.
95167
  for (auto* shadow_stream : shadow_streams_) {
6
    shadow_stream->removeDestructorCallback();
6
    shadow_stream->removeWatermarkCallbacks();
6
    shadow_stream->cancel();
6
  }
95167
  cleanup();
95167
}
164
void Filter::onResponseTimeout() {
164
  ENVOY_STREAM_LOG(debug, "upstream timeout", *callbacks_);
  // Reset any upstream requests that are still in flight.
328
  while (!upstream_requests_.empty()) {
164
    UpstreamRequestPtr upstream_request =
164
        upstream_requests_.back()->removeFromList(upstream_requests_);
    // We want to record the upstream timeouts and increase the stats counters in all the cases.
    // For example, we also want to record the stats in the case of BiDi streaming APIs where we
    // might have already seen the headers.
164
    cluster_->trafficStats()->upstream_rq_timeout_.inc();
164
    if (request_vcluster_) {
12
      request_vcluster_->stats().upstream_rq_timeout_.inc();
12
    }
164
    if (route_stats_context_.has_value()) {
      route_stats_context_->stats().upstream_rq_timeout_.inc();
    }
164
    if (upstream_request->upstreamHost()) {
159
      upstream_request->upstreamHost()->stats().rq_timeout_.inc();
159
    }
164
    if (upstream_request->awaitingHeaders()) {
159
      if (cluster_->timeoutBudgetStats().has_value()) {
        // Cancel firing per-try timeout information, because the per-try timeout did not come into
        // play when the global timeout was hit.
14
        upstream_request->recordTimeoutBudget(false);
14
      }
      // If this upstream request already hit a "soft" timeout, then it
      // already recorded a timeout into outlier detection. Don't do it again.
159
      if (!upstream_request->outlierDetectionTimeoutRecorded()) {
158
        updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, *upstream_request,
158
                               absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
158
      }
159
      chargeUpstreamAbort(timeout_response_code_, false, *upstream_request);
159
    }
164
    upstream_request->resetStream();
164
  }
164
  onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout,
164
                         StreamInfo::ResponseCodeDetails::get().ResponseTimeout);
164
}
// Called when the per try timeout is hit but we didn't reset the request
// (hedge_on_per_try_timeout enabled).
18
void Filter::onSoftPerTryTimeout(UpstreamRequest& upstream_request) {
18
  ASSERT(!upstream_request.retried());
  // Track this as a timeout for outlier detection purposes even though we didn't
  // cancel the request yet and might get a 2xx later.
18
  updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
18
                         absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
18
  upstream_request.outlierDetectionTimeoutRecorded(true);
18
  if (!downstream_response_started_ && retry_state_) {
18
    RetryStatus retry_status = retry_state_->shouldHedgeRetryPerTryTimeout(
18
        [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_]() -> void {
          // Without any knowledge about what's going on in the connection pool, retry the request
          // with the safest settings which is no early data but keep using or not using alt-svc as
          // before. In this way, QUIC won't be falsely marked as broken.
18
          doRetry(/*can_send_early_data*/ false, can_use_http3, TimeoutRetry::Yes);
18
        });
18
    if (retry_status == RetryStatus::Yes) {
18
      runRetryOptionsPredicates(upstream_request);
18
      pending_retries_++;
      // Don't increment upstream_host->stats().rq_error_ here, we'll do that
      // later if 1) we hit global timeout or 2) we get bad response headers
      // back.
18
      upstream_request.retried(true);
      // TODO: cluster stat for hedge attempted.
18
    } else if (retry_status == RetryStatus::NoOverflow) {
      callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
    } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
      callbacks_->streamInfo().setResponseFlag(
          StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
    }
18
  }
18
}
13
void Filter::onPerTryIdleTimeout(UpstreamRequest& upstream_request) {
13
  onPerTryTimeoutCommon(upstream_request,
13
                        cluster_->trafficStats()->upstream_rq_per_try_idle_timeout_,
13
                        StreamInfo::ResponseCodeDetails::get().UpstreamPerTryIdleTimeout);
13
}
37
void Filter::onPerTryTimeout(UpstreamRequest& upstream_request) {
37
  onPerTryTimeoutCommon(upstream_request, cluster_->trafficStats()->upstream_rq_per_try_timeout_,
37
                        StreamInfo::ResponseCodeDetails::get().UpstreamPerTryTimeout);
37
}
void Filter::onPerTryTimeoutCommon(UpstreamRequest& upstream_request, Stats::Counter& error_counter,
50
                                   const std::string& response_code_details) {
50
  if (hedging_params_.hedge_on_per_try_timeout_) {
18
    onSoftPerTryTimeout(upstream_request);
18
    return;
18
  }
32
  error_counter.inc();
32
  if (upstream_request.upstreamHost()) {
32
    upstream_request.upstreamHost()->stats().rq_timeout_.inc();
32
  }
32
  upstream_request.resetStream();
32
  updateOutlierDetection(Upstream::Outlier::Result::LocalOriginTimeout, upstream_request,
32
                         absl::optional<uint64_t>(enumToInt(timeout_response_code_)));
32
  if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::Yes)) {
5
    return;
5
  }
27
  chargeUpstreamAbort(timeout_response_code_, false, upstream_request);
  // Remove this upstream request from the list now that we're done with it.
27
  upstream_request.removeFromList(upstream_requests_);
27
  onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag::UpstreamRequestTimeout,
27
                         response_code_details);
27
}
66
void Filter::onStreamMaxDurationReached(UpstreamRequest& upstream_request) {
66
  upstream_request.resetStream();
66
  if (maybeRetryReset(Http::StreamResetReason::LocalReset, upstream_request, TimeoutRetry::No)) {
19
    return;
19
  }
47
  upstream_request.removeFromList(upstream_requests_);
47
  cleanup();
47
  callbacks_->streamInfo().setResponseFlag(
47
      StreamInfo::CoreResponseFlag::UpstreamMaxStreamDurationReached);
  // Grab the const ref to call the const method of StreamInfo.
47
  const auto& stream_info = callbacks_->streamInfo();
47
  const bool downstream_decode_complete =
47
      stream_info.downstreamTiming().has_value() &&
47
      stream_info.downstreamTiming().value().get().lastDownstreamRxByteReceived().has_value();
  // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
47
  callbacks_->sendLocalReply(
47
      Http::Utility::maybeRequestTimeoutCode(downstream_decode_complete),
47
      "upstream max stream duration reached", modify_headers_, absl::nullopt,
47
      StreamInfo::ResponseCodeDetails::get().UpstreamMaxStreamDurationReached);
47
}
void Filter::updateOutlierDetection(Upstream::Outlier::Result result,
                                    UpstreamRequest& upstream_request,
2785
                                    absl::optional<uint64_t> code) {
2785
  if (upstream_request.upstreamHost()) {
2776
    upstream_request.upstreamHost()->outlierDetector().putResult(result, code);
2776
  }
2785
}
2734
void Filter::chargeUpstreamAbort(Http::Code code, bool dropped, UpstreamRequest& upstream_request) {
2734
  if (downstream_response_started_) {
1290
    if (upstream_request.grpcRqSuccessDeferred()) {
1083
      upstream_request.upstreamHost()->stats().rq_error_.inc();
1083
      stats_.rq_reset_after_downstream_response_started_.inc();
1083
    }
2328
  } else {
1444
    Upstream::HostDescriptionOptConstRef upstream_host = upstream_request.upstreamHost();
1444
    chargeUpstreamCode(code, upstream_host, dropped);
    // If we had non-5xx but still have been reset by backend or timeout before
    // starting response, we treat this as an error. We only get non-5xx when
    // timeout_response_code_ is used for code above, where this member can
    // assume values such as 204 (NoContent).
1444
    if (upstream_host.has_value() && !Http::CodeUtility::is5xx(enumToInt(code))) {
1
      upstream_host->stats().rq_error_.inc();
1
    }
1444
  }
2734
}
void Filter::onUpstreamTimeoutAbort(StreamInfo::CoreResponseFlag response_flags,
191
                                    absl::string_view details) {
191
  Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
191
  if (tb_stats.has_value()) {
21
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
21
    std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
21
        dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
21
    tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
21
        FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
21
  }
191
  const absl::string_view body =
191
      timeout_response_code_ == Http::Code::GatewayTimeout ? "upstream request timeout" : "";
191
  onUpstreamAbort(timeout_response_code_, response_flags, body, false, details);
191
}
void Filter::onUpstreamAbort(Http::Code code, StreamInfo::CoreResponseFlag response_flags,
2737
                             absl::string_view body, bool dropped, absl::string_view details) {
  // If we have not yet sent anything downstream, send a response with an appropriate status code.
  // Otherwise just reset the ongoing response.
2737
  callbacks_->streamInfo().setResponseFlag(response_flags);
  // Check if buffer overflow occurred and override error details accordingly
2737
  if (request_buffer_overflowed_) {
    code = Http::Code::InsufficientStorage;
    body = "exceeded request buffer limit while retrying upstream";
    details = StreamInfo::ResponseCodeDetails::get().RequestPayloadExceededRetryBufferLimit;
  }
  // This will destroy any created retry timers.
2737
  cleanup();
  // sendLocalReply may instead reset the stream if downstream_response_started_ is true.
2737
  callbacks_->sendLocalReply(
2737
      code, body,
2737
      [dropped, this](Http::ResponseHeaderMap& headers) {
1454
        if (dropped && !config_->suppress_envoy_headers_) {
34
          headers.addReference(Http::Headers::get().EnvoyOverloaded,
34
                               Http::Headers::get().EnvoyOverloadedValues.True);
34
        }
1454
        modify_headers_(headers);
1454
      },
2737
      absl::nullopt, details);
2737
}
bool Filter::maybeRetryReset(Http::StreamResetReason reset_reason,
2709
                             UpstreamRequest& upstream_request, TimeoutRetry is_timeout_retry) {
  // We don't retry if we already started the response, don't have a retry policy defined,
  // or if we've already retried this upstream request (currently only possible if a per
  // try timeout occurred and hedge_on_per_try_timeout is enabled).
2709
  if (downstream_response_started_ || !retry_state_ || upstream_request.retried()) {
2540
    return false;
2540
  }
169
  RetryState::Http3Used was_using_http3 = RetryState::Http3Used::Unknown;
169
  if (upstream_request.hadUpstream()) {
105
    was_using_http3 = (upstream_request.streamInfo().protocol().has_value() &&
105
                       upstream_request.streamInfo().protocol().value() == Http::Protocol::Http3)
105
                          ? RetryState::Http3Used::Yes
105
                          : RetryState::Http3Used::No;
105
  }
  // If the current request in this router has sent data to the upstream, we consider the request
  // started.
169
  upstream_request_started_ |= upstream_request.streamInfo()
169
                                   .upstreamInfo()
169
                                   ->upstreamTiming()
169
                                   .first_upstream_tx_byte_sent_.has_value();
169
  const RetryStatus retry_status = retry_state_->shouldRetryReset(
169
      reset_reason, was_using_http3,
169
      [this, can_send_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_,
169
       can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
169
       is_timeout_retry](bool disable_http3) -> void {
        // This retry might be because of ConnectionFailure of 0-RTT handshake. In this case, though
        // the original request is retried with the same can_send_early_data setting, it will not be
        // sent as early data by the underlying connection pool grid.
68
        doRetry(can_send_early_data, disable_http3 ? false : can_use_http3, is_timeout_retry);
68
      },
169
      upstream_request_started_);
169
  if (retry_status == RetryStatus::Yes) {
87
    runRetryOptionsPredicates(upstream_request);
87
    pending_retries_++;
87
    if (upstream_request.upstreamHost()) {
86
      upstream_request.upstreamHost()->stats().rq_error_.inc();
86
    }
87
    auto request_ptr = upstream_request.removeFromList(upstream_requests_);
87
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
87
    return true;
139
  } else if (retry_status == RetryStatus::NoOverflow) {
    callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
82
  } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
18
    callbacks_->streamInfo().setResponseFlag(
18
        StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
18
  }
82
  return false;
169
}
void Filter::onUpstreamReset(Http::StreamResetReason reset_reason,
                             absl::string_view transport_failure_reason,
2611
                             UpstreamRequest& upstream_request) {
2611
  ENVOY_STREAM_LOG(debug, "upstream reset: reset reason: {}, transport failure reason: {}",
2611
                   *callbacks_, Http::Utility::resetReasonToString(reset_reason),
2611
                   transport_failure_reason);
2611
  const bool dropped = reset_reason == Http::StreamResetReason::Overflow;
  // Ignore upstream reset caused by a resource overflow.
  // Currently, circuit breakers can only produce this reset reason.
  // It means that this reason is cluster-wise, not upstream-related.
  // Therefore removing an upstream in the case of an overloaded cluster
  // would make the situation even worse.
  // https://github.com/envoyproxy/envoy/issues/25487
2611
  if (!dropped) {
    // TODO: The reset may also come from upstream over the wire. In this case it should be
    // treated as external origin error and distinguished from local origin error.
    // This matters only when running OutlierDetection with split_external_local_origin_errors
    // config param set to true.
2577
    updateOutlierDetection(Upstream::Outlier::Result::LocalOriginConnectFailed, upstream_request,
2577
                           absl::nullopt);
2577
  }
2611
  if (maybeRetryReset(reset_reason, upstream_request, TimeoutRetry::No)) {
63
    return;
63
  }
2548
  const Http::Code error_code = (reset_reason == Http::StreamResetReason::ProtocolError)
2548
                                    ? Http::Code::BadGateway
2548
                                    : Http::Code::ServiceUnavailable;
2548
  chargeUpstreamAbort(error_code, dropped, upstream_request);
2548
  auto request_ptr = upstream_request.removeFromList(upstream_requests_);
2548
  callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
  // If there are other in-flight requests that might see an upstream response,
  // don't return anything downstream.
2548
  if (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0) {
2
    return;
2
  }
2546
  const StreamInfo::CoreResponseFlag response_flags = streamResetReasonToResponseFlag(reset_reason);
2546
  const std::string body =
2546
      absl::StrCat("upstream connect error or disconnect/reset before headers. ",
2546
                   (is_retry_ ? "retried and the latest " : ""),
2546
                   "reset reason: ", Http::Utility::resetReasonToString(reset_reason),
2546
                   !transport_failure_reason.empty() ? ", transport failure reason: " : "",
2546
                   transport_failure_reason);
2546
  const std::string& basic_details =
2546
      downstream_response_started_ ? StreamInfo::ResponseCodeDetails::get().LateUpstreamReset
2546
                                   : StreamInfo::ResponseCodeDetails::get().EarlyUpstreamReset;
2546
  const std::string details = StringUtil::replaceAllEmptySpace(absl::StrCat(
2546
      basic_details, "{", Http::Utility::resetReasonToString(reset_reason),
2546
      transport_failure_reason.empty() ? "" : absl::StrCat("|", transport_failure_reason), "}"));
2546
  onUpstreamAbort(error_code, response_flags, body, dropped, details);
2546
}
void Filter::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
46544
                                    bool pool_success) {
46544
  if (retry_state_ && host) {
3400
    retry_state_->onHostAttempted(host);
3400
  }
46544
  if (!pool_success) {
290
    return;
290
  }
  // Track the attempted host in upstream info for access logging purposes.
46254
  if (host && callbacks_->streamInfo().upstreamInfo()) {
46254
    callbacks_->streamInfo().upstreamInfo()->addUpstreamHostAttempted(host);
46254
  }
46254
  if (request_vcluster_) {
    // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
    // callback. Hence, the upstream request increases the virtual cluster's upstream_rq_total_ stat
    // here.
237
    request_vcluster_->stats().upstream_rq_total_.inc();
237
  }
46254
  if (route_stats_context_.has_value()) {
    // The cluster increases its upstream_rq_total_ counter right before firing this onPoolReady
    // callback. Hence, the upstream request increases the route level upstream_rq_total_ stat
    // here.
12
    route_stats_context_->stats().upstream_rq_total_.inc();
12
  }
46254
}
StreamInfo::CoreResponseFlag
5340
Filter::streamResetReasonToResponseFlag(Http::StreamResetReason reset_reason) {
5340
  switch (reset_reason) {
90
  case Http::StreamResetReason::LocalConnectionFailure:
478
  case Http::StreamResetReason::RemoteConnectionFailure:
482
  case Http::StreamResetReason::ConnectionTimeout:
482
    return StreamInfo::CoreResponseFlag::UpstreamConnectionFailure;
3207
  case Http::StreamResetReason::ConnectionTermination:
3207
    return StreamInfo::CoreResponseFlag::UpstreamConnectionTermination;
5
  case Http::StreamResetReason::LocalReset:
5
  case Http::StreamResetReason::LocalRefusedStreamReset:
13
  case Http::StreamResetReason::Http1PrematureUpstreamHalfClose:
13
    return StreamInfo::CoreResponseFlag::LocalReset;
68
  case Http::StreamResetReason::Overflow:
68
    return StreamInfo::CoreResponseFlag::UpstreamOverflow;
1210
  case Http::StreamResetReason::RemoteReset:
1218
  case Http::StreamResetReason::RemoteRefusedStreamReset:
1218
  case Http::StreamResetReason::ConnectError:
1218
    return StreamInfo::CoreResponseFlag::UpstreamRemoteReset;
352
  case Http::StreamResetReason::ProtocolError:
352
    return StreamInfo::CoreResponseFlag::UpstreamProtocolError;
  case Http::StreamResetReason::OverloadManager:
    return StreamInfo::CoreResponseFlag::OverloadManager;
5340
  }
  PANIC_DUE_TO_CORRUPT_ENUM;
}
void Filter::handleNon5xxResponseHeaders(absl::optional<Grpc::Status::GrpcStatus> grpc_status,
                                         UpstreamRequest& upstream_request, bool end_stream,
39145
                                         uint64_t grpc_to_http_status) {
  // We need to defer gRPC success until after we have processed grpc-status in
  // the trailers.
39145
  if (grpc_request_) {
2173
    if (end_stream) {
170
      if (grpc_status && !Http::CodeUtility::is5xx(grpc_to_http_status)) {
118
        upstream_request.upstreamHost()->stats().rq_success_.inc();
166
      } else {
52
        upstream_request.upstreamHost()->stats().rq_error_.inc();
52
      }
2046
    } else {
2003
      upstream_request.grpcRqSuccessDeferred(true);
2003
    }
37641
  } else {
36972
    upstream_request.upstreamHost()->stats().rq_success_.inc();
36972
  }
39145
}
void Filter::onUpstream1xxHeaders(Http::ResponseHeaderMapPtr&& headers,
150
                                  UpstreamRequest& upstream_request) {
150
  const uint64_t response_code = Http::Utility::getResponseStatus(*headers);
150
  chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
150
  ENVOY_STREAM_LOG(debug, "upstream 1xx ({}).", *callbacks_, response_code);
150
  downstream_response_started_ = true;
150
  final_upstream_request_ = &upstream_request;
150
  resetOtherUpstreams(upstream_request);
  // Don't send retries after 100-Continue has been sent on. Arguably we could attempt to do a
  // retry, assume the next upstream would also send an 100-Continue and swallow the second one
  // but it's sketchy (as the subsequent upstream might not send a 100-Continue) and not worth
  // the complexity until someone asks for it.
150
  retry_state_.reset();
150
  callbacks_->encode1xxHeaders(std::move(headers));
150
}
102793
void Filter::resetAll() {
109595
  while (!upstream_requests_.empty()) {
6802
    auto request_ptr = upstream_requests_.back()->removeFromList(upstream_requests_);
6802
    request_ptr->resetStream();
6802
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
6802
  }
102793
}
39595
void Filter::resetOtherUpstreams(UpstreamRequest& upstream_request) {
  // Pop each upstream request on the list and reset it if it's not the one
  // provided. At the end we'll move it back into the list.
39595
  UpstreamRequestPtr final_upstream_request;
79201
  while (!upstream_requests_.empty()) {
39606
    UpstreamRequestPtr upstream_request_tmp =
39606
        upstream_requests_.back()->removeFromList(upstream_requests_);
39606
    if (upstream_request_tmp.get() != &upstream_request) {
11
      upstream_request_tmp->resetStream();
      // TODO: per-host stat for hedge abandoned.
      // TODO: cluster stat for hedge abandoned.
39595
    } else {
39595
      final_upstream_request = std::move(upstream_request_tmp);
39595
    }
39606
  }
39595
  ASSERT(final_upstream_request);
  // Now put the final request back on this list.
39595
  LinkedList::moveIntoList(std::move(final_upstream_request), upstream_requests_);
39595
}
void Filter::onUpstreamHeaders(uint64_t response_code, Http::ResponseHeaderMapPtr&& headers,
39782
                               UpstreamRequest& upstream_request, bool end_stream) {
39782
  ENVOY_STREAM_LOG(debug, "upstream headers complete: end_stream={}", *callbacks_, end_stream);
39782
  ASSERT(!host_selection_cancelable_);
  // When grpc-status appears in response headers, convert grpc-status to HTTP status code
  // for outlier detection. This does not currently change any stats or logging and does not
  // handle the case when an error grpc-status is sent as a trailer.
39782
  absl::optional<Grpc::Status::GrpcStatus> grpc_status;
39782
  uint64_t grpc_to_http_status = 0;
39782
  uint64_t response_code_for_outlier_detection = response_code;
39782
  if (grpc_request_) {
2190
    grpc_status = Grpc::Common::getGrpcStatus(*headers);
2190
    if (grpc_status.has_value()) {
135
      grpc_to_http_status = Grpc::Utility::grpcToHttpStatus(grpc_status.value());
135
      response_code_for_outlier_detection = grpc_to_http_status;
135
    }
38270
  } else {
    // Check cluster's http_protocol_options if different code should be reported to
    // outlier detector.
37592
    absl::optional<bool> matched = cluster_->processHttpForOutlierDetection(*headers);
37592
    if (matched.has_value()) {
      // Outlier detector distinguishes only two values:
      // Anything >= 500 is error.
      // Anything < 500 is success.
28
      if (!matched.value()) {
        // Matcher returned non-match.
        // report success to outlier detector.
2
        response_code_for_outlier_detection = 200;
26
      } else {
        // Matcher returned match (treat the response as error).
        // If the original status code was error (>= 500), then report the
        // original status code to the outlier detector.
26
        if (response_code < 500) {
13
          response_code_for_outlier_detection = 500;
13
        }
26
      }
28
    }
37592
  }
39782
  maybeProcessOrcaLoadReport(*headers, upstream_request);
  // Check for degraded header
39782
  const bool is_degraded = headers->EnvoyDegraded() != nullptr;
  // Ejection has priority over degradation: 5xx errors always trigger ejection logic.
39782
  if (response_code_for_outlier_detection >= 500) {
487
    upstream_request.upstreamHost()->outlierDetector().putResult(
487
        Upstream::Outlier::Result::ExtOriginRequestFailed, response_code_for_outlier_detection);
39394
  } else if (is_degraded) {
    // Only mark as degraded if response is successful (not 5xx).
2
    upstream_request.upstreamHost()->outlierDetector().putResult(
2
        Upstream::Outlier::Result::ExtOriginRequestDegraded, response_code_for_outlier_detection);
39293
  } else {
39293
    upstream_request.upstreamHost()->outlierDetector().putResult(
39293
        Upstream::Outlier::Result::ExtOriginRequestSuccess, response_code_for_outlier_detection);
39293
  }
39782
  if (headers->EnvoyImmediateHealthCheckFail() != nullptr) {
1
    upstream_request.upstreamHost()->healthChecker().setUnhealthy(
1
        Upstream::HealthCheckHostMonitor::UnhealthyType::ImmediateHealthCheckFail);
1
  }
39782
  bool could_not_retry = false;
  // Check if this upstream request was already retried, for instance after
  // hitting a per try timeout. Don't retry it if we already have.
39782
  if (retry_state_) {
3019
    if (upstream_request.retried()) {
      // We already retried this request (presumably for a per try timeout) so
      // we definitely won't retry it again. Check if we would have retried it
      // if we could.
11
      bool retry_as_early_data; // Not going to be used as we are not retrying.
11
      could_not_retry = retry_state_->wouldRetryFromHeaders(*headers, *downstream_headers_,
11
                                                            retry_as_early_data) !=
11
                        RetryState::RetryDecision::NoRetry;
3008
    } else {
3008
      const RetryStatus retry_status = retry_state_->shouldRetryHeaders(
3008
          *headers, *downstream_headers_,
3008
          [this, can_use_http3 = upstream_request.upstreamStreamOptions().can_use_http3_,
3008
           had_early_data = upstream_request.upstreamStreamOptions().can_send_early_data_](
3008
              bool disable_early_data) -> void {
161
            doRetry((disable_early_data ? false : had_early_data), can_use_http3, TimeoutRetry::No);
161
          });
3008
      if (retry_status == RetryStatus::Yes) {
174
        runRetryOptionsPredicates(upstream_request);
174
        pending_retries_++;
174
        upstream_request.upstreamHost()->stats().rq_error_.inc();
174
        Http::CodeStats& code_stats = httpContext().codeStats();
174
        code_stats.chargeBasicResponseStat(cluster_->statsScope(), stats_.stat_names_.retry_,
174
                                           static_cast<Http::Code>(response_code),
174
                                           exclude_http_code_stats_);
174
        if (!end_stream || !upstream_request.encodeComplete()) {
130
          upstream_request.resetStream();
130
        }
174
        auto request_ptr = upstream_request.removeFromList(upstream_requests_);
174
        callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
174
        return;
2839
      } else if (retry_status == RetryStatus::NoOverflow) {
6
        callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::UpstreamOverflow);
6
        could_not_retry = true;
2828
      } else if (retry_status == RetryStatus::NoRetryLimitExceeded) {
3
        callbacks_->streamInfo().setResponseFlag(
3
            StreamInfo::CoreResponseFlag::UpstreamRetryLimitExceeded);
3
        could_not_retry = true;
3
      }
3008
    }
3019
  }
39608
  if (route_entry_->internalRedirectPolicy().enabled() &&
39608
      route_entry_->internalRedirectPolicy().shouldRedirectForResponseCode(
216
          static_cast<Http::Code>(response_code)) &&
39608
      setupRedirect(*headers)) {
159
    return;
    // If the redirect could not be handled, fail open and let it pass to the
    // next downstream.
159
  }
  // Check if we got a "bad" response, but there are still upstream requests in
  // flight awaiting headers or scheduled retries. If so, exit to give them a
  // chance to return before returning a response downstream.
39449
  if (could_not_retry && (numRequestsAwaitingHeaders() > 0 || pending_retries_ > 0)) {
4
    upstream_request.upstreamHost()->stats().rq_error_.inc();
    // Reset the stream because there are other in-flight requests that we'll
    // wait around for and we're not interested in consuming any body/trailers.
4
    auto request_ptr = upstream_request.removeFromList(upstream_requests_);
4
    request_ptr->resetStream();
4
    callbacks_->dispatcher().deferredDelete(std::move(request_ptr));
4
    return;
4
  }
  // Make sure any retry timers are destroyed since we may not call cleanup() if end_stream is
  // false.
39445
  if (retry_state_) {
2833
    retry_state_.reset();
2833
  }
  // Only send upstream service time if we received the complete request and this is not a
  // premature response.
39445
  if (DateUtil::timePointValid(downstream_request_complete_time_)) {
36132
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
36132
    MonotonicTime response_received_time = dispatcher.timeSource().monotonicTime();
36132
    std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(
36132
        response_received_time - downstream_request_complete_time_);
36132
    if (!config_->suppress_envoy_headers_) {
36106
      headers->setEnvoyUpstreamServiceTime(ms.count());
36106
    }
36132
  }
39445
  upstream_request.upstreamCanary(
39445
      (headers->EnvoyUpstreamCanary() && headers->EnvoyUpstreamCanary()->value() == "true") ||
39445
      upstream_request.upstreamHost()->canary());
39445
  chargeUpstreamCode(response_code, *headers, upstream_request.upstreamHost(), false);
39445
  if (!Http::CodeUtility::is5xx(response_code)) {
39145
    handleNon5xxResponseHeaders(grpc_status, upstream_request, end_stream, grpc_to_http_status);
39145
  }
  // Append routing cookies
39445
  for (const auto& header_value : downstream_set_cookies_) {
454
    headers->addReferenceKey(Http::Headers::get().SetCookie, header_value);
454
  }
39445
  callbacks_->streamInfo().setResponseCodeDetails(
39445
      StreamInfo::ResponseCodeDetails::get().ViaUpstream);
39445
  callbacks_->streamInfo().setResponseCode(response_code);
39445
  downstream_response_started_ = true;
39445
  final_upstream_request_ = &upstream_request;
  // Make sure that for request hedging, we end up with the correct final upstream info.
39445
  callbacks_->streamInfo().setUpstreamInfo(final_upstream_request_->streamInfo().upstreamInfo());
39445
  resetOtherUpstreams(upstream_request);
  // Modify response headers after we have set the final upstream info because we may need to
  // modify the headers based on the upstream host.
39445
  const Formatter::Context formatter_context(downstream_headers_, headers.get(), {}, {}, {},
39445
                                             &callbacks_->activeSpan());
39445
  route_entry_->finalizeResponseHeaders(*headers, formatter_context, callbacks_->streamInfo());
39445
  modify_headers_(*headers);
39445
  if (end_stream) {
24104
    onUpstreamComplete(upstream_request);
24104
  }
39445
  callbacks_->encodeHeaders(std::move(headers), end_stream,
39445
                            StreamInfo::ResponseCodeDetails::get().ViaUpstream);
39445
}
void Filter::onUpstreamData(Buffer::Instance& data, UpstreamRequest& upstream_request,
173217
                            bool end_stream) {
  // When route retry policy is configured and an upstream filter is returning StopIteration
  // in it's encodeHeaders() method, upstream_requests_.size() is equal to 0 in this case,
  // and we should just return.
173217
  if (upstream_requests_.size() == 0) {
3
    return;
3
  }
  // Other than above case, this should be true because when we saw headers we
  // either reset the stream (hence wouldn't have made it to onUpstreamData) or
  // all other in-flight streams.
173214
  ASSERT(upstream_requests_.size() == 1);
173214
  if (end_stream) {
    // gRPC request termination without trailers is an error.
12363
    if (upstream_request.grpcRqSuccessDeferred()) {
52
      upstream_request.upstreamHost()->stats().rq_error_.inc();
52
    }
12363
    onUpstreamComplete(upstream_request);
12363
  }
173214
  callbacks_->encodeData(data, end_stream);
173214
}
void Filter::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers,
766
                                UpstreamRequest& upstream_request) {
  // When route retry policy is configured and an upstream filter is returning StopIteration
  // in it's encodeHeaders() method, upstream_requests_.size() is equal to 0 in this case,
  // and we should just return.
766
  if (upstream_requests_.size() == 0) {
3
    return;
3
  }
  // Other than above case, this should be true because when we saw headers we
  // either reset the stream (hence wouldn't have made it to onUpstreamTrailers) or
  // all other in-flight streams.
763
  ASSERT(upstream_requests_.size() == 1);
763
  if (upstream_request.grpcRqSuccessDeferred()) {
410
    absl::optional<Grpc::Status::GrpcStatus> grpc_status = Grpc::Common::getGrpcStatus(*trailers);
410
    if (grpc_status &&
410
        !Http::CodeUtility::is5xx(Grpc::Utility::grpcToHttpStatus(grpc_status.value()))) {
352
      upstream_request.upstreamHost()->stats().rq_success_.inc();
399
    } else {
58
      upstream_request.upstreamHost()->stats().rq_error_.inc();
58
    }
410
  }
763
  maybeProcessOrcaLoadReport(*trailers, upstream_request);
763
  onUpstreamComplete(upstream_request);
763
  callbacks_->encodeTrailers(std::move(trailers));
763
}
1111
void Filter::onUpstreamMetadata(Http::MetadataMapPtr&& metadata_map) {
1111
  callbacks_->encodeMetadata(std::move(metadata_map));
1111
}
37230
void Filter::onUpstreamComplete(UpstreamRequest& upstream_request) {
37230
  if (!downstream_end_stream_) {
410
    if (allow_multiplexed_upstream_half_close_) {
      // Continue request if downstream is not done yet.
50
      return;
50
    }
360
    upstream_request.resetStream();
360
  }
37180
  Event::Dispatcher& dispatcher = callbacks_->dispatcher();
37180
  std::chrono::milliseconds response_time = std::chrono::duration_cast<std::chrono::milliseconds>(
37180
      dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
37180
  Upstream::ClusterTimeoutBudgetStatsOptRef tb_stats = cluster()->timeoutBudgetStats();
37180
  if (tb_stats.has_value()) {
200
    tb_stats->get().upstream_rq_timeout_budget_percent_used_.recordValue(
200
        FilterUtility::percentageOfTimeout(response_time, timeout_.global_timeout_));
200
  }
37180
  if (config_->emit_dynamic_stats_ && !callbacks_->streamInfo().healthCheck() &&
37180
      DateUtil::timePointValid(downstream_request_complete_time_)) {
35891
    upstream_request.upstreamHost()->outlierDetector().putResponseTime(response_time);
35891
    const bool internal_request = Http::HeaderUtility::isEnvoyInternalRequest(*downstream_headers_);
35891
    Http::CodeStats& code_stats = httpContext().codeStats();
35891
    Http::CodeStats::ResponseTimingInfo info{
35891
        config_->scope_,
35891
        cluster_->statsScope(),
35891
        config_->empty_stat_name_,
35891
        response_time,
35891
        upstream_request.upstreamCanary(),
35891
        internal_request,
35891
        route_->virtualHost()->statName(),
35891
        request_vcluster_ ? request_vcluster_->statName() : config_->empty_stat_name_,
35891
        route_stats_context_.has_value() ? route_stats_context_->statName()
35891
                                         : config_->empty_stat_name_,
35891
        config_->zone_name_,
35891
        upstreamZone(upstream_request.upstreamHost())};
35891
    code_stats.chargeResponseTiming(info);
35891
    if (alt_stat_prefix_ != nullptr) {
3
      Http::CodeStats::ResponseTimingInfo info{config_->scope_,
3
                                               cluster_->statsScope(),
3
                                               alt_stat_prefix_->statName(),
3
                                               response_time,
3
                                               upstream_request.upstreamCanary(),
3
                                               internal_request,
3
                                               config_->empty_stat_name_,
3
                                               config_->empty_stat_name_,
3
                                               config_->empty_stat_name_,
3
                                               config_->zone_name_,
3
                                               upstreamZone(upstream_request.upstreamHost())};
3
      code_stats.chargeResponseTiming(info);
3
    }
35891
  }
  // Defer deletion as this is generally called under the stack of the upstream
  // request, and immediate deletion is dangerous.
37180
  callbacks_->dispatcher().deferredDelete(upstream_request.removeFromList(upstream_requests_));
37180
  cleanup();
37180
}
216
bool Filter::setupRedirect(const Http::ResponseHeaderMap& headers) {
216
  ENVOY_STREAM_LOG(debug, "attempting internal redirect", *callbacks_);
216
  const Http::HeaderEntry* location = headers.Location();
216
  const uint64_t status_code = Http::Utility::getResponseStatus(headers);
  // Redirects are not supported for streaming requests yet.
216
  if (downstream_end_stream_ && (!request_buffer_overflowed_ || !callbacks_->decodingBuffer()) &&
216
      location != nullptr &&
216
      convertRequestHeadersForInternalRedirect(*downstream_headers_, headers, *location,
198
                                               status_code) &&
216
      callbacks_->recreateStream(&headers)) {
159
    ENVOY_STREAM_LOG(debug, "Internal redirect succeeded", *callbacks_);
159
    cluster_->trafficStats()->upstream_internal_redirect_succeeded_total_.inc();
159
    return true;
159
  }
  // convertRequestHeadersForInternalRedirect logs failure reasons but log
  // details for other failure modes here.
57
  if (!downstream_end_stream_) {
9
    ENVOY_STREAM_LOG(debug, "Internal redirect failed: request incomplete", *callbacks_);
49
  } else if (request_buffer_overflowed_) {
8
    ENVOY_STREAM_LOG(debug, "Internal redirect failed: request body overflow", *callbacks_);
40
  } else if (location == nullptr) {
1
    ENVOY_STREAM_LOG(debug, "Internal redirect failed: missing location header", *callbacks_);
1
  }
57
  cluster_->trafficStats()->upstream_internal_redirect_failed_total_.inc();
57
  return false;
216
}
bool Filter::convertRequestHeadersForInternalRedirect(
    Http::RequestHeaderMap& downstream_headers, const Http::ResponseHeaderMap& upstream_headers,
198
    const Http::HeaderEntry& internal_redirect, uint64_t status_code) {
198
  if (!downstream_headers.Path()) {
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: no path in downstream_headers", *callbacks_);
    return false;
  }
198
  absl::string_view redirect_url = internal_redirect.value().getStringView();
  // Make sure the redirect response contains a URL to redirect to.
198
  if (redirect_url.empty()) {
1
    stats_.passthrough_internal_redirect_bad_location_.inc();
1
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: empty location", *callbacks_);
1
    return false;
1
  }
197
  Http::Utility::Url absolute_url;
197
  if (!absolute_url.initialize(redirect_url, false)) {
9
    stats_.passthrough_internal_redirect_bad_location_.inc();
9
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: invalid location {}", *callbacks_,
9
                     redirect_url);
9
    return false;
9
  }
188
  const auto& policy = route_entry_->internalRedirectPolicy();
  // Don't change the scheme from the original request
188
  const bool scheme_is_http = schemeIsHttp(downstream_headers, callbacks_->connection());
188
  const bool target_is_http = Http::Utility::schemeIsHttp(absolute_url.scheme());
188
  if (!policy.isCrossSchemeRedirectAllowed() && scheme_is_http != target_is_http) {
1
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: incorrect scheme for {}", *callbacks_,
1
                     redirect_url);
1
    stats_.passthrough_internal_redirect_unsafe_scheme_.inc();
1
    return false;
1
  }
187
  const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
  // Make sure that performing the redirect won't result in exceeding the configured number of
  // redirects allowed for this route.
187
  StreamInfo::UInt32Accessor* num_internal_redirect{};
187
  if (num_internal_redirect = filter_state->getDataMutable<StreamInfo::UInt32Accessor>(
187
          NumInternalRedirectsFilterStateName);
187
      num_internal_redirect == nullptr) {
124
    auto state = std::make_shared<StreamInfo::UInt32AccessorImpl>(0);
124
    num_internal_redirect = state.get();
124
    filter_state->setData(NumInternalRedirectsFilterStateName, std::move(state),
124
                          StreamInfo::FilterState::StateType::Mutable,
124
                          StreamInfo::FilterState::LifeSpan::Request);
124
  }
187
  if (num_internal_redirect->value() >= policy.maxInternalRedirects()) {
9
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: redirect limits exceeded.", *callbacks_);
9
    stats_.passthrough_internal_redirect_too_many_redirects_.inc();
9
    return false;
9
  }
  // Copy the old values, so they can be restored if the redirect fails.
178
  const bool scheme_is_set = (downstream_headers.Scheme() != nullptr);
178
  std::unique_ptr<Http::RequestHeaderMapImpl> saved_headers = Http::RequestHeaderMapImpl::create();
178
  Http::RequestHeaderMapImpl::copyFrom(*saved_headers, downstream_headers);
178
  for (const Http::LowerCaseString& header :
178
       route_entry_->internalRedirectPolicy().responseHeadersToCopy()) {
18
    Http::HeaderMap::GetResult result = upstream_headers.get(header);
18
    Http::HeaderMap::GetResult downstream_result = downstream_headers.get(header);
18
    if (result.empty()) {
      // Clear headers if present, else do nothing:
9
      if (downstream_result.empty()) {
        continue;
      }
9
      downstream_headers.remove(header);
9
    } else {
      // The header exists in the response, copy into the downstream headers
9
      if (!downstream_result.empty()) {
        downstream_headers.remove(header);
      }
18
      for (size_t idx = 0; idx < result.size(); idx++) {
9
        downstream_headers.addCopy(header, result[idx]->value().getStringView());
9
      }
9
    }
18
  }
178
  Cleanup restore_original_headers(
178
      [&downstream_headers, scheme_is_set, scheme_is_http, &saved_headers]() {
19
        downstream_headers.clear();
19
        if (scheme_is_set) {
19
          downstream_headers.setScheme(scheme_is_http ? Http::Headers::get().SchemeValues.Http
19
                                                      : Http::Headers::get().SchemeValues.Https);
19
        }
19
        Http::RequestHeaderMapImpl::copyFrom(downstream_headers, *saved_headers);
19
      });
  // Replace the original host, scheme and path.
178
  downstream_headers.setScheme(absolute_url.scheme());
178
  downstream_headers.setHost(absolute_url.hostAndPort());
178
  auto path_and_query = absolute_url.pathAndQueryParams();
178
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http_reject_path_with_fragment")) {
    // Envoy treats internal redirect as a new request and will reject it if URI path
    // contains #fragment. However the Location header is allowed to have #fragment in URI path. To
    // prevent Envoy from rejecting internal redirect, strip the #fragment from Location URI if it
    // is present.
177
    auto fragment_pos = path_and_query.find('#');
177
    path_and_query = path_and_query.substr(0, fragment_pos);
177
  }
178
  downstream_headers.setPath(path_and_query);
  // Only clear the route cache if there are downstream callbacks. There aren't, for example,
  // for async connections.
178
  if (callbacks_->downstreamCallbacks()) {
178
    callbacks_->downstreamCallbacks()->clearRouteCache();
178
  }
178
  const auto route = callbacks_->route();
  // Don't allow a redirect to a non existing route.
178
  if (!route) {
    stats_.passthrough_internal_redirect_no_route_.inc();
    ENVOY_STREAM_LOG(trace, "Internal redirect failed: no route found", *callbacks_);
    return false;
  }
178
  const auto& route_name = route->routeName();
178
  for (const auto& predicate : policy.predicates()) {
37
    if (!predicate->acceptTargetRoute(*filter_state, route_name, !scheme_is_http,
37
                                      !target_is_http)) {
19
      stats_.passthrough_internal_redirect_predicate_.inc();
19
      ENVOY_STREAM_LOG(trace,
19
                       "Internal redirect failed: rejecting redirect targeting {}, by {} predicate",
19
                       *callbacks_, route_name, predicate->name());
19
      return false;
19
    }
37
  }
  // See https://tools.ietf.org/html/rfc7231#section-6.4.4.
159
  if (status_code == enumToInt(Http::Code::SeeOther) &&
159
      downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Get &&
159
      downstream_headers.getMethodValue() != Http::Headers::get().MethodValues.Head) {
8
    downstream_headers.setMethod(Http::Headers::get().MethodValues.Get);
8
    downstream_headers.remove(Http::Headers::get().ContentLength);
8
    callbacks_->modifyDecodingBuffer([](Buffer::Instance& data) { data.drain(data.length()); });
8
  }
159
  num_internal_redirect->increment();
159
  restore_original_headers.cancel();
  // Preserve the original request URL for the second pass.
159
  downstream_headers.setEnvoyOriginalUrl(
159
      absl::StrCat(scheme_is_http ? Http::Headers::get().SchemeValues.Http
159
                                  : Http::Headers::get().SchemeValues.Https,
159
                   "://", saved_headers->getHostValue(), saved_headers->getPathValue()));
159
  return true;
178
}
279
void Filter::runRetryOptionsPredicates(UpstreamRequest& retriable_request) {
279
  for (const auto& options_predicate : getEffectiveRetryPolicy()->retryOptionsPredicates()) {
5
    const Upstream::RetryOptionsPredicate::UpdateOptionsParameters parameters{
5
        retriable_request.streamInfo(), upstreamSocketOptions()};
5
    auto ret = options_predicate->updateOptions(parameters);
5
    if (ret.new_upstream_socket_options_.has_value()) {
3
      upstream_options_ = ret.new_upstream_socket_options_.value();
3
    }
5
  }
279
}
247
void Filter::doRetry(bool can_send_early_data, bool can_use_http3, TimeoutRetry is_timeout_retry) {
247
  ENVOY_STREAM_LOG(debug, "performing retry", *callbacks_);
247
  is_retry_ = true;
247
  attempt_count_++;
247
  callbacks_->streamInfo().setAttemptCount(attempt_count_);
247
  ASSERT(pending_retries_ > 0);
247
  pending_retries_--;
247
  if (host_selection_cancelable_) {
    // If there was a timeout during host selection, cancel this selection
    // attempt before potentially creating a new one.
    host_selection_cancelable_->cancel();
    host_selection_cancelable_.reset();
  }
  // Clusters can technically get removed by CDS during a retry. Make sure it still exists.
247
  const auto cluster = config_->cm_.getThreadLocalCluster(route_entry_->clusterName());
247
  std::unique_ptr<GenericConnPool> generic_conn_pool;
247
  if (cluster == nullptr) {
    sendNoHealthyUpstreamResponse({});
    cleanup();
    return;
  }
247
  callbacks_->streamInfo().downstreamTiming().setValue(
247
      "envoy.router.host_selection_start_ms",
247
      callbacks_->dispatcher().timeSource().monotonicTime());
247
  auto host_selection_response = cluster->chooseHost(this);
247
  if (!host_selection_response.cancelable ||
247
      !Runtime::runtimeFeatureEnabled("envoy.reloadable_features.async_host_selection")) {
122
    if (host_selection_response.cancelable) {
      host_selection_response.cancelable->cancel();
    }
    // This branch handles the common case of synchronous host selection, as
    // well as handling unsupported asynchronous host selection (by treating it
    // as host selection failure).
122
    continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry,
122
                    std::move(host_selection_response.host), *cluster,
122
                    host_selection_response.details);
122
  }
247
  ENVOY_STREAM_LOG(debug, "Handling asynchronous host selection for retry\n", *callbacks_);
  // Again latch the cancel handle, and set up the callback to be called when host
  // selection is complete.
247
  host_selection_cancelable_ = std::move(host_selection_response.cancelable);
247
  on_host_selected_ =
247
      ([this, can_send_early_data, can_use_http3, is_timeout_retry, cluster](
247
           Upstream::HostConstSharedPtr&& host, absl::string_view host_selection_details) -> void {
125
        continueDoRetry(can_send_early_data, can_use_http3, is_timeout_retry, std::move(host),
125
                        *cluster, host_selection_details);
125
      });
247
}
void Filter::continueDoRetry(bool can_send_early_data, bool can_use_http3,
                             TimeoutRetry is_timeout_retry, Upstream::HostConstSharedPtr&& host,
                             Upstream::ThreadLocalCluster& cluster,
247
                             absl::string_view host_selection_details) {
247
  callbacks_->streamInfo().downstreamTiming().setValue(
247
      "envoy.router.host_selection_end_ms", callbacks_->dispatcher().timeSource().monotonicTime());
247
  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(cluster, host);
247
  if (!generic_conn_pool) {
2
    sendNoHealthyUpstreamResponse(host_selection_details);
2
    cleanup();
2
    return;
2
  }
245
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
245
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3,
245
      allow_multiplexed_upstream_half_close_ /*enable_half_close*/);
245
  if (include_attempt_count_in_request_) {
28
    downstream_headers_->setEnvoyAttemptCount(attempt_count_);
28
  }
245
  if (include_timeout_retry_header_in_request_) {
4
    downstream_headers_->setEnvoyIsTimeoutRetry(is_timeout_retry == TimeoutRetry::Yes ? "true"
4
                                                                                      : "false");
4
  }
  // The request timeouts only account for time elapsed since the downstream request completed
  // which might not have happened yet, in which case zero time has elapsed.
245
  std::chrono::milliseconds elapsed_time = std::chrono::milliseconds::zero();
245
  if (DateUtil::timePointValid(downstream_request_complete_time_)) {
163
    Event::Dispatcher& dispatcher = callbacks_->dispatcher();
163
    elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(
163
        dispatcher.timeSource().monotonicTime() - downstream_request_complete_time_);
163
  }
245
  FilterUtility::setTimeoutHeaders(elapsed_time.count(), timeout_, *route_entry_,
245
                                   *downstream_headers_, !config_->suppress_envoy_headers_,
245
                                   grpc_request_, hedging_params_.hedge_on_per_try_timeout_);
245
  UpstreamRequest* upstream_request_tmp = upstream_request.get();
245
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
245
  upstream_requests_.front()->acceptHeadersFromRouter(
245
      !callbacks_->decodingBuffer() && !downstream_trailers_ && downstream_end_stream_);
  // It's possible we got immediately reset which means the upstream request we just
  // added to the front of the list might have been removed, so we need to check to make
  // sure we don't send data on the wrong request.
245
  if (!upstream_requests_.empty() && (upstream_requests_.front().get() == upstream_request_tmp)) {
244
    if (callbacks_->decodingBuffer()) {
      // If we are doing a retry we need to make a copy.
165
      Buffer::OwnedImpl copy(*callbacks_->decodingBuffer());
165
      upstream_requests_.front()->acceptDataFromRouter(copy, !downstream_trailers_ &&
165
                                                                 downstream_end_stream_);
165
    }
244
    if (downstream_trailers_) {
6
      upstream_requests_.front()->acceptTrailersFromRouter(*downstream_trailers_);
6
    }
244
  }
245
}
2562
uint32_t Filter::numRequestsAwaitingHeaders() {
2562
  return std::count_if(upstream_requests_.begin(), upstream_requests_.end(),
2562
                       [](const auto& req) -> bool { return req->awaitingHeaders(); });
2562
}
47348
bool Filter::checkDropOverload(Upstream::ThreadLocalCluster& cluster) {
47348
  if (cluster.dropOverload().value()) {
15
    ENVOY_STREAM_LOG(debug, "Router filter: cluster DROP_OVERLOAD configuration: {}", *callbacks_,
15
                     cluster.dropOverload().value());
15
    if (cluster.dropOverload().value() == 1.0) {
13
      ENVOY_STREAM_LOG(
13
          debug, "The configured DROP_OVERLOAD ratio is 100%, drop everything unconditionally.",
13
          *callbacks_);
13
      callbacks_->streamInfo().setResponseFlag(
13
          StreamInfo::CoreResponseFlag::UnconditionalDropOverload);
13
      chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, true);
13
      callbacks_->sendLocalReply(
13
          Http::Code::ServiceUnavailable, "unconditional drop overload",
13
          [this](Http::ResponseHeaderMap& headers) {
13
            if (!config_->suppress_envoy_headers_) {
13
              headers.addReference(Http::Headers::get().EnvoyUnconditionalDropOverload,
13
                                   Http::Headers::get().EnvoyUnconditionalDropOverloadValues.True);
13
            }
13
            modify_headers_(headers);
13
          },
13
          absl::nullopt, StreamInfo::ResponseCodeDetails::get().UnconditionalDropOverload);
13
      cluster.info()->loadReportStats().upstream_rq_drop_overload_.inc();
13
      return true;
13
    }
2
    if (config_->random_.bernoulli(cluster.dropOverload())) {
1
      ENVOY_STREAM_LOG(debug, "The request is dropped by DROP_OVERLOAD", *callbacks_);
1
      callbacks_->streamInfo().setResponseFlag(StreamInfo::CoreResponseFlag::DropOverLoad);
1
      chargeUpstreamCode(Http::Code::ServiceUnavailable, {}, true);
1
      callbacks_->sendLocalReply(
1
          Http::Code::ServiceUnavailable, "drop overload",
1
          [this](Http::ResponseHeaderMap& headers) {
1
            if (!config_->suppress_envoy_headers_) {
1
              headers.addReference(Http::Headers::get().EnvoyDropOverload,
1
                                   Http::Headers::get().EnvoyDropOverloadValues.True);
1
            }
1
            modify_headers_(headers);
1
          },
1
          absl::nullopt, StreamInfo::ResponseCodeDetails::get().DropOverload);
1
      cluster.info()->loadReportStats().upstream_rq_drop_overload_.inc();
1
      return true;
1
    }
2
  }
47334
  return false;
47348
}
void Filter::maybeProcessOrcaLoadReport(const Envoy::Http::HeaderMap& headers_or_trailers,
40545
                                        UpstreamRequest& upstream_request) {
  // Process the load report only once, so if response has report in headers,
  // then don't process it in trailers.
40545
  if (orca_load_report_received_) {
1
    return;
1
  }
  // Check whether we need to send the load report to the LRS or invoke the ORCA
  // callbacks.
40544
  Upstream::HostDescriptionOptConstRef upstream_host = upstream_request.upstreamHost();
  // The upstream host should always be available because the upstream request has got
  // the response headers/trailers from the upstream host.
40544
  ASSERT(upstream_host.has_value(), "upstream host is not available for upstream request");
40544
  OptRef<Upstream::HostLbPolicyData> host_lb_policy_data = upstream_host->lbPolicyData();
40544
  if (!cluster_->lrsReportMetricNames().has_value() && !host_lb_policy_data.has_value()) {
    // If the cluster doesn't have LRS metric names configured then there is no need to
    // extract the stats for LRS.
    // If the host doesn't have LB policy data then that means the LB policy doesn't care
    // about the ORCA load report.
    // Return early here to avoid parsing the ORCA load report because no one is interested
    // in it.
36730
    return;
36730
  }
3814
  absl::StatusOr<xds::data::orca::v3::OrcaLoadReport> orca_load_report =
3814
      Envoy::Orca::parseOrcaLoadReportHeaders(headers_or_trailers);
3814
  if (!orca_load_report.ok()) {
1125
    ENVOY_STREAM_LOG(trace, "Headers don't have orca load report: {}", *callbacks_,
1125
                     orca_load_report.status().message());
1125
    return;
1125
  }
2689
  orca_load_report_received_ = true;
2689
  if (cluster_->lrsReportMetricNames().has_value()) {
37
    ENVOY_STREAM_LOG(trace, "Adding ORCA load report {} to load metrics", *callbacks_,
37
                     orca_load_report->DebugString());
37
    Envoy::Orca::addOrcaLoadReportToLoadMetricStats(
37
        *cluster_->lrsReportMetricNames(), *orca_load_report, upstream_host->loadMetricStats());
37
  }
2689
  if (host_lb_policy_data.has_value()) {
2652
    ENVOY_STREAM_LOG(trace, "orca_load_report for {} report = {}", *callbacks_,
2652
                     upstream_host->address()->asString(), orca_load_report->DebugString());
2652
    const absl::Status status =
2652
        host_lb_policy_data->onOrcaLoadReport(*orca_load_report, callbacks_->streamInfo());
2652
    if (!status.ok()) {
1
      ENVOY_LOG_PERIODIC(error, std::chrono::seconds(10),
1
                         "LB policy onOrcaLoadReport failed: {} for load report {}",
1
                         status.message(), orca_load_report->DebugString());
1
    }
2652
  }
2689
}
47078
const Router::RetryPolicy* Filter::getEffectiveRetryPolicy() const {
  // Use cluster-level retry policy if available. The most specific policy wins.
  // If no cluster-level policy is configured, fall back to route-level policy.
47078
  const Router::RetryPolicy* retry_policy = cluster_->httpProtocolOptions().retryPolicy();
47078
  if (retry_policy == nullptr) {
47074
    retry_policy = route_entry_->retryPolicy().get();
47074
  }
47078
  return retry_policy;
47078
}
RetryStatePtr
ProdFilter::createRetryState(const RetryPolicy& policy, Http::RequestHeaderMap& request_headers,
                             const Upstream::ClusterInfo& cluster, const VirtualCluster* vcluster,
                             RouteStatsContextOptRef route_stats_context,
                             Server::Configuration::CommonFactoryContext& context,
46576
                             Event::Dispatcher& dispatcher, Upstream::ResourcePriority priority) {
46576
  std::unique_ptr<RetryStateImpl> retry_state =
46576
      RetryStateImpl::create(policy, request_headers, cluster, vcluster, route_stats_context,
46576
                             context, dispatcher, priority);
46576
  if (retry_state != nullptr && retry_state->isAutomaticallyConfiguredForHttp3()) {
    // Since doing retry will make Envoy to buffer the request body, if upstream using HTTP/3 is the
    // only reason for doing retry, set the buffer limit to 0 so that we don't retry or
    // buffer safe requests with body which is not common.
671
    setRequestBodyBufferLimit(0);
671
  }
46576
  return retry_state;
46576
}
} // namespace Router
} // namespace Envoy